Merge pull request #335 from OpenKMIP/feat/add-config-extended-key-usage

Add server config option controlling certificate client auth
This commit is contained in:
Peter Hamilton 2017-09-14 13:32:52 -04:00 committed by GitHub
commit c3696a9877
6 changed files with 158 additions and 41 deletions

View File

@ -6,3 +6,4 @@ key_path=/etc/pykmip/certs/server_private_key.pem
ca_path=/etc/pykmip/certs/server_ca_cert.pem ca_path=/etc/pykmip/certs/server_ca_cert.pem
auth_suite=Basic auth_suite=Basic
policy_path=/etc/pykmip/policies policy_path=/etc/pykmip/policies
enable_tls_client_auth=True

View File

@ -34,6 +34,7 @@ class KmipServerConfig(object):
self._logger = logging.getLogger('kmip.server.config') self._logger = logging.getLogger('kmip.server.config')
self.settings = dict() self.settings = dict()
self.settings['enable_tls_client_auth'] = True
self._expected_settings = [ self._expected_settings = [
'hostname', 'hostname',
@ -44,7 +45,8 @@ class KmipServerConfig(object):
'auth_suite' 'auth_suite'
] ]
self._optional_settings = [ self._optional_settings = [
'policy_path' 'policy_path',
'enable_tls_client_auth'
] ]
def set_setting(self, setting, value): def set_setting(self, setting, value):
@ -80,8 +82,10 @@ class KmipServerConfig(object):
self._set_ca_path(value) self._set_ca_path(value)
elif setting == 'auth_suite': elif setting == 'auth_suite':
self._set_auth_suite(value) self._set_auth_suite(value)
else: elif setting == 'policy_path':
self._set_policy_path(value) self._set_policy_path(value)
else:
self._set_enable_tls_client_auth(value)
def load_settings(self, path): def load_settings(self, path):
""" """
@ -148,6 +152,10 @@ class KmipServerConfig(object):
self._set_auth_suite(parser.get('server', 'auth_suite')) self._set_auth_suite(parser.get('server', 'auth_suite'))
if parser.has_option('server', 'policy_path'): if parser.has_option('server', 'policy_path'):
self._set_policy_path(parser.get('server', 'policy_path')) self._set_policy_path(parser.get('server', 'policy_path'))
if parser.has_option('server', 'enable_tls_client_auth'):
self._set_enable_tls_client_auth(
parser.getboolean('server', 'enable_tls_client_auth')
)
def _set_hostname(self, value): def _set_hostname(self, value):
if isinstance(value, six.string_types): if isinstance(value, six.string_types):
@ -242,3 +250,14 @@ class KmipServerConfig(object):
"The policy path, if specified, must be a valid string path " "The policy path, if specified, must be a valid string path "
"to a filesystem directory." "to a filesystem directory."
) )
def _set_enable_tls_client_auth(self, value):
if value is None:
self.settings['enable_tls_client_auth'] = True
elif isinstance(value, bool):
self.settings['enable_tls_client_auth'] = value
else:
raise exceptions.ConfigurationError(
"The flag enabling the TLS certificate client auth flag check "
"must be a boolean."
)

View File

@ -50,7 +50,8 @@ class KmipServer(object):
auth_suite=None, auth_suite=None,
config_path='/etc/pykmip/server.conf', config_path='/etc/pykmip/server.conf',
log_path='/var/log/pykmip/server.log', log_path='/var/log/pykmip/server.log',
policy_path=None policy_path=None,
enable_tls_client_auth=None
): ):
""" """
Create a KmipServer. Create a KmipServer.
@ -95,6 +96,10 @@ class KmipServer(object):
policy_path (string): The path to the filesystem directory policy_path (string): The path to the filesystem directory
containing PyKMIP server operation policy JSON files. containing PyKMIP server operation policy JSON files.
Optional, defaults to None. Optional, defaults to None.
enable_tls_client_auth (boolean): A boolean indicating if the TLS
certificate client auth flag should be required for client
certificates when establishing a new client session. Optional,
defaults to None.
""" """
self._logger = logging.getLogger('kmip.server') self._logger = logging.getLogger('kmip.server')
self._setup_logging(log_path) self._setup_logging(log_path)
@ -108,7 +113,8 @@ class KmipServer(object):
key_path, key_path,
ca_path, ca_path,
auth_suite, auth_suite,
policy_path policy_path,
enable_tls_client_auth
) )
if self.config.settings.get('auth_suite') == 'TLS1.2': if self.config.settings.get('auth_suite') == 'TLS1.2':
@ -152,7 +158,8 @@ class KmipServer(object):
key_path=None, key_path=None,
ca_path=None, ca_path=None,
auth_suite=None, auth_suite=None,
policy_path=None policy_path=None,
enable_tls_client_auth=None
): ):
if path: if path:
self.config.load_settings(path) self.config.load_settings(path)
@ -171,6 +178,11 @@ class KmipServer(object):
self.config.set_setting('auth_suite', auth_suite) self.config.set_setting('auth_suite', auth_suite)
if policy_path: if policy_path:
self.config.set_setting('policy_path', policy_path) self.config.set_setting('policy_path', policy_path)
if enable_tls_client_auth is not None:
self.config.set_setting(
'enable_tls_client_auth',
enable_tls_client_auth
)
def start(self): def start(self):
""" """
@ -343,7 +355,10 @@ class KmipServer(object):
s = session.KmipSession( s = session.KmipSession(
self._engine, self._engine,
connection, connection,
name=session_name name=session_name,
enable_tls_client_auth=self.config.settings.get(
'enable_tls_client_auth'
)
) )
s.daemon = True s.daemon = True
s.start() s.start()
@ -478,6 +493,18 @@ def build_argument_parser():
"directory. Optional, defaults to None." "directory. Optional, defaults to None."
), ),
) )
parser.add_option(
"-i",
"--ignore_tls_client_auth",
action="store_true",
default=False,
dest="ignore_tls_client_auth",
help=(
"A boolean indicating whether or not the TLS certificate client "
"auth flag should be ignored when establishing client sessions. "
"Optional, defaults to None."
)
)
return parser return parser
@ -506,6 +533,8 @@ def main(args=None):
kwargs['log_path'] = opts.log_path kwargs['log_path'] = opts.log_path
if opts.policy_path: if opts.policy_path:
kwargs['policy_path'] = opts.policy_path kwargs['policy_path'] = opts.policy_path
if opts.ignore_tls_client_auth:
kwargs['enable_tls_client_auth'] = False
# Create and start the server. # Create and start the server.
s = KmipServer(**kwargs) s = KmipServer(**kwargs)

View File

@ -33,7 +33,11 @@ class KmipSession(threading.Thread):
A session thread representing a single KMIP client/server interaction. A session thread representing a single KMIP client/server interaction.
""" """
def __init__(self, engine, connection, name=None): def __init__(self,
engine,
connection,
name=None,
enable_tls_client_auth=True):
""" """
Create a KmipSession. Create a KmipSession.
@ -44,6 +48,10 @@ class KmipSession(threading.Thread):
representing a new KMIP connection. Required. representing a new KMIP connection. Required.
name (str): The name of the KmipSession. Optional, defaults to name (str): The name of the KmipSession. Optional, defaults to
None. None.
enable_tls_client_auth (bool): A flag that enables a strict check
for the client auth flag in the extended key usage extension
in client certificates when establishing the client/server TLS
connection. Optional, defaults to True.
""" """
super(KmipSession, self).__init__( super(KmipSession, self).__init__(
group=None, group=None,
@ -60,6 +68,8 @@ class KmipSession(threading.Thread):
self._engine = engine self._engine = engine
self._connection = connection self._connection = connection
self._enable_tls_client_auth = enable_tls_client_auth
self._max_buffer_size = 4096 self._max_buffer_size = 4096
self._max_request_size = 1048576 self._max_request_size = 1048576
self._max_response_size = 1048576 self._max_response_size = 1048576
@ -89,7 +99,7 @@ class KmipSession(threading.Thread):
def _get_client_identity(self): def _get_client_identity(self):
certificate_data = self._connection.getpeercert(binary_form=True) certificate_data = self._connection.getpeercert(binary_form=True)
try: try:
certificate = x509.load_der_x509_certificate( cert = x509.load_der_x509_certificate(
certificate_data, certificate_data,
backends.default_backend() backends.default_backend()
) )
@ -102,18 +112,26 @@ class KmipSession(threading.Thread):
"connection. Could not retrieve client identity." "connection. Could not retrieve client identity."
) )
if self._enable_tls_client_auth:
try: try:
extended_key_usage = certificate.extensions.get_extension_for_oid( extended_key_usage = cert.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.EXTENDED_KEY_USAGE x509.oid.ExtensionOID.EXTENDED_KEY_USAGE
).value ).value
except x509.ExtensionNotFound: except x509.ExtensionNotFound:
raise exceptions.PermissionDenied( raise exceptions.PermissionDenied(
"The extended key usage extension is missing from the client " "The extended key usage extension is missing from the "
"certificate. Session client identity unavailable." "client certificate. Session client identity unavailable."
) )
if x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH in extended_key_usage: if x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH not in \
client_identities = certificate.subject.get_attributes_for_oid( extended_key_usage:
raise exceptions.PermissionDenied(
"The extended key usage extension is not marked for "
"client authentication in the client certificate. Session "
"client identity unavailable."
)
client_identities = cert.subject.get_attributes_for_oid(
x509.oid.NameOID.COMMON_NAME x509.oid.NameOID.COMMON_NAME
) )
if len(client_identities) > 0: if len(client_identities) > 0:
@ -133,12 +151,6 @@ class KmipSession(threading.Thread):
"name. Session client identity unavailable." "name. Session client identity unavailable."
) )
raise exceptions.PermissionDenied(
"The extended key usage extension is not marked for client "
"authentication in the client certificate. Session client "
"identity unavailable."
)
def _handle_message_loop(self): def _handle_message_loop(self):
request_data = self._receive_request() request_data = self._receive_request()
request = messages.RequestMessage() request = messages.RequestMessage()

View File

@ -54,6 +54,7 @@ class TestKmipServerConfig(testtools.TestCase):
c._set_key_path = mock.MagicMock() c._set_key_path = mock.MagicMock()
c._set_port = mock.MagicMock() c._set_port = mock.MagicMock()
c._set_policy_path = mock.MagicMock() c._set_policy_path = mock.MagicMock()
c._set_enable_tls_client_auth = mock.MagicMock()
# Test the right error is generated when setting an unsupported # Test the right error is generated when setting an unsupported
# setting. # setting.
@ -92,6 +93,9 @@ class TestKmipServerConfig(testtools.TestCase):
c.set_setting('policy_path', '/etc/pykmip/policies') c.set_setting('policy_path', '/etc/pykmip/policies')
c._set_policy_path.assert_called_once_with('/etc/pykmip/policies') c._set_policy_path.assert_called_once_with('/etc/pykmip/policies')
c.set_setting('enable_tls_client_auth', False)
c._set_enable_tls_client_auth.assert_called_once_with(False)
def test_load_settings(self): def test_load_settings(self):
""" """
Test that the right calls are made and the right errors generated when Test that the right calls are made and the right errors generated when
@ -144,6 +148,7 @@ class TestKmipServerConfig(testtools.TestCase):
c._set_key_path = mock.MagicMock() c._set_key_path = mock.MagicMock()
c._set_port = mock.MagicMock() c._set_port = mock.MagicMock()
c._set_policy_path = mock.MagicMock() c._set_policy_path = mock.MagicMock()
c._set_enable_tls_client_auth = mock.MagicMock()
# Test that the right calls are made when correctly parsing settings. # Test that the right calls are made when correctly parsing settings.
parser = configparser.SafeConfigParser() parser = configparser.SafeConfigParser()
@ -155,6 +160,7 @@ class TestKmipServerConfig(testtools.TestCase):
parser.set('server', 'ca_path', '/test/path/ca.crt') parser.set('server', 'ca_path', '/test/path/ca.crt')
parser.set('server', 'auth_suite', 'Basic') parser.set('server', 'auth_suite', 'Basic')
parser.set('server', 'policy_path', '/test/path/policies') parser.set('server', 'policy_path', '/test/path/policies')
parser.set('server', 'enable_tls_client_auth', 'False')
c._parse_settings(parser) c._parse_settings(parser)
@ -167,6 +173,7 @@ class TestKmipServerConfig(testtools.TestCase):
c._set_ca_path.assert_called_once_with('/test/path/ca.crt') c._set_ca_path.assert_called_once_with('/test/path/ca.crt')
c._set_auth_suite.assert_called_once_with('Basic') c._set_auth_suite.assert_called_once_with('Basic')
c._set_policy_path.assert_called_once_with('/test/path/policies') c._set_policy_path.assert_called_once_with('/test/path/policies')
c._set_enable_tls_client_auth.assert_called_once_with(False)
# Test that a ConfigurationError is generated when the expected # Test that a ConfigurationError is generated when the expected
# section is missing. # section is missing.
@ -520,3 +527,47 @@ class TestKmipServerConfig(testtools.TestCase):
*args *args
) )
self.assertNotEqual(1, c.settings.get('policy_path')) self.assertNotEqual(1, c.settings.get('policy_path'))
def test_set_enable_tls_client_auth(self):
"""
Test that the enable_tls_client_auth configuration property can be set
correctly.
"""
c = config.KmipServerConfig()
c._logger = mock.MagicMock()
self.assertIn('enable_tls_client_auth', c.settings.keys())
self.assertEqual(
True,
c.settings.get('enable_tls_client_auth')
)
# Test that the setting is set correctly with a valid value
c._set_enable_tls_client_auth(False)
self.assertEqual(
False,
c.settings.get('enable_tls_client_auth')
)
c._set_enable_tls_client_auth(None)
self.assertEqual(
True,
c.settings.get('enable_tls_client_auth')
)
c._set_enable_tls_client_auth(True)
self.assertEqual(
True,
c.settings.get('enable_tls_client_auth')
)
# Test that a ConfigurationError is generated when setting the wrong
# value.
args = ('invalid',)
self.assertRaisesRegexp(
exceptions.ConfigurationError,
"The flag enabling the TLS certificate client auth flag check "
"must be a boolean.",
c._set_enable_tls_client_auth,
*args
)

View File

@ -119,7 +119,8 @@ class TestKmipServer(testtools.TestCase):
'/etc/pykmip/certs/server.key', '/etc/pykmip/certs/server.key',
'/etc/pykmip/certs/ca.crt', '/etc/pykmip/certs/ca.crt',
'Basic', 'Basic',
'/etc/pykmip/policies' '/etc/pykmip/policies',
False
) )
s.config.load_settings.assert_called_with('/etc/pykmip/server.conf') s.config.load_settings.assert_called_with('/etc/pykmip/server.conf')
@ -142,6 +143,10 @@ class TestKmipServer(testtools.TestCase):
'policy_path', 'policy_path',
'/etc/pykmip/policies' '/etc/pykmip/policies'
) )
s.config.set_setting.assert_any_call(
'enable_tls_client_auth',
False
)
# Test that an attempt is made to instantiate the TLS 1.2 auth suite # Test that an attempt is made to instantiate the TLS 1.2 auth suite
s = server.KmipServer( s = server.KmipServer(