diff --git a/examples/server.conf b/examples/server.conf index 25ceb6d..67fbb05 100644 --- a/examples/server.conf +++ b/examples/server.conf @@ -6,3 +6,4 @@ key_path=/etc/pykmip/certs/server_private_key.pem ca_path=/etc/pykmip/certs/server_ca_cert.pem auth_suite=Basic policy_path=/etc/pykmip/policies +enable_tls_client_auth=True diff --git a/kmip/services/server/config.py b/kmip/services/server/config.py index 25bab5e..3176038 100644 --- a/kmip/services/server/config.py +++ b/kmip/services/server/config.py @@ -34,6 +34,7 @@ class KmipServerConfig(object): self._logger = logging.getLogger('kmip.server.config') self.settings = dict() + self.settings['enable_tls_client_auth'] = True self._expected_settings = [ 'hostname', @@ -44,7 +45,8 @@ class KmipServerConfig(object): 'auth_suite' ] self._optional_settings = [ - 'policy_path' + 'policy_path', + 'enable_tls_client_auth' ] def set_setting(self, setting, value): @@ -80,8 +82,10 @@ class KmipServerConfig(object): self._set_ca_path(value) elif setting == 'auth_suite': self._set_auth_suite(value) - else: + elif setting == 'policy_path': self._set_policy_path(value) + else: + self._set_enable_tls_client_auth(value) def load_settings(self, path): """ @@ -148,6 +152,10 @@ class KmipServerConfig(object): self._set_auth_suite(parser.get('server', 'auth_suite')) if parser.has_option('server', 'policy_path'): self._set_policy_path(parser.get('server', 'policy_path')) + if parser.has_option('server', 'enable_tls_client_auth'): + self._set_enable_tls_client_auth( + parser.getboolean('server', 'enable_tls_client_auth') + ) def _set_hostname(self, value): if isinstance(value, six.string_types): @@ -242,3 +250,14 @@ class KmipServerConfig(object): "The policy path, if specified, must be a valid string path " "to a filesystem directory." ) + + def _set_enable_tls_client_auth(self, value): + if value is None: + self.settings['enable_tls_client_auth'] = True + elif isinstance(value, bool): + self.settings['enable_tls_client_auth'] = value + else: + raise exceptions.ConfigurationError( + "The flag enabling the TLS certificate client auth flag check " + "must be a boolean." + ) diff --git a/kmip/services/server/server.py b/kmip/services/server/server.py index 7fec309..6acfee0 100644 --- a/kmip/services/server/server.py +++ b/kmip/services/server/server.py @@ -50,7 +50,8 @@ class KmipServer(object): auth_suite=None, config_path='/etc/pykmip/server.conf', log_path='/var/log/pykmip/server.log', - policy_path=None + policy_path=None, + enable_tls_client_auth=None ): """ Create a KmipServer. @@ -95,6 +96,10 @@ class KmipServer(object): policy_path (string): The path to the filesystem directory containing PyKMIP server operation policy JSON files. Optional, defaults to None. + enable_tls_client_auth (boolean): A boolean indicating if the TLS + certificate client auth flag should be required for client + certificates when establishing a new client session. Optional, + defaults to None. """ self._logger = logging.getLogger('kmip.server') self._setup_logging(log_path) @@ -108,7 +113,8 @@ class KmipServer(object): key_path, ca_path, auth_suite, - policy_path + policy_path, + enable_tls_client_auth ) if self.config.settings.get('auth_suite') == 'TLS1.2': @@ -152,7 +158,8 @@ class KmipServer(object): key_path=None, ca_path=None, auth_suite=None, - policy_path=None + policy_path=None, + enable_tls_client_auth=None ): if path: self.config.load_settings(path) @@ -171,6 +178,11 @@ class KmipServer(object): self.config.set_setting('auth_suite', auth_suite) if policy_path: self.config.set_setting('policy_path', policy_path) + if enable_tls_client_auth is not None: + self.config.set_setting( + 'enable_tls_client_auth', + enable_tls_client_auth + ) def start(self): """ @@ -343,7 +355,10 @@ class KmipServer(object): s = session.KmipSession( self._engine, connection, - name=session_name + name=session_name, + enable_tls_client_auth=self.config.settings.get( + 'enable_tls_client_auth' + ) ) s.daemon = True s.start() @@ -478,6 +493,18 @@ def build_argument_parser(): "directory. Optional, defaults to None." ), ) + parser.add_option( + "-i", + "--ignore_tls_client_auth", + action="store_true", + default=False, + dest="ignore_tls_client_auth", + help=( + "A boolean indicating whether or not the TLS certificate client " + "auth flag should be ignored when establishing client sessions. " + "Optional, defaults to None." + ) + ) return parser @@ -506,6 +533,8 @@ def main(args=None): kwargs['log_path'] = opts.log_path if opts.policy_path: kwargs['policy_path'] = opts.policy_path + if opts.ignore_tls_client_auth: + kwargs['enable_tls_client_auth'] = False # Create and start the server. s = KmipServer(**kwargs) diff --git a/kmip/services/server/session.py b/kmip/services/server/session.py index ffd5839..627ffcd 100644 --- a/kmip/services/server/session.py +++ b/kmip/services/server/session.py @@ -33,7 +33,11 @@ class KmipSession(threading.Thread): A session thread representing a single KMIP client/server interaction. """ - def __init__(self, engine, connection, name=None): + def __init__(self, + engine, + connection, + name=None, + enable_tls_client_auth=True): """ Create a KmipSession. @@ -44,6 +48,10 @@ class KmipSession(threading.Thread): representing a new KMIP connection. Required. name (str): The name of the KmipSession. Optional, defaults to None. + enable_tls_client_auth (bool): A flag that enables a strict check + for the client auth flag in the extended key usage extension + in client certificates when establishing the client/server TLS + connection. Optional, defaults to True. """ super(KmipSession, self).__init__( group=None, @@ -60,6 +68,8 @@ class KmipSession(threading.Thread): self._engine = engine self._connection = connection + self._enable_tls_client_auth = enable_tls_client_auth + self._max_buffer_size = 4096 self._max_request_size = 1048576 self._max_response_size = 1048576 @@ -89,7 +99,7 @@ class KmipSession(threading.Thread): def _get_client_identity(self): certificate_data = self._connection.getpeercert(binary_form=True) try: - certificate = x509.load_der_x509_certificate( + cert = x509.load_der_x509_certificate( certificate_data, backends.default_backend() ) @@ -102,42 +112,44 @@ class KmipSession(threading.Thread): "connection. Could not retrieve client identity." ) - try: - extended_key_usage = certificate.extensions.get_extension_for_oid( - x509.oid.ExtensionOID.EXTENDED_KEY_USAGE - ).value - except x509.ExtensionNotFound: - raise exceptions.PermissionDenied( - "The extended key usage extension is missing from the client " - "certificate. Session client identity unavailable." - ) - - if x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH in extended_key_usage: - client_identities = certificate.subject.get_attributes_for_oid( - x509.oid.NameOID.COMMON_NAME - ) - if len(client_identities) > 0: - if len(client_identities) > 1: - self._logger.warning( - "Multiple client identities found. Using the first " - "one processed." - ) - client_identity = client_identities[0].value - self._logger.info( - "Session client identity: {0}".format(client_identity) - ) - return client_identity - else: + if self._enable_tls_client_auth: + try: + extended_key_usage = cert.extensions.get_extension_for_oid( + x509.oid.ExtensionOID.EXTENDED_KEY_USAGE + ).value + except x509.ExtensionNotFound: raise exceptions.PermissionDenied( - "The client certificate does not define a subject common " - "name. Session client identity unavailable." + "The extended key usage extension is missing from the " + "client certificate. Session client identity unavailable." ) - raise exceptions.PermissionDenied( - "The extended key usage extension is not marked for client " - "authentication in the client certificate. Session client " - "identity unavailable." + if x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH not in \ + extended_key_usage: + raise exceptions.PermissionDenied( + "The extended key usage extension is not marked for " + "client authentication in the client certificate. Session " + "client identity unavailable." + ) + + client_identities = cert.subject.get_attributes_for_oid( + x509.oid.NameOID.COMMON_NAME ) + if len(client_identities) > 0: + if len(client_identities) > 1: + self._logger.warning( + "Multiple client identities found. Using the first " + "one processed." + ) + client_identity = client_identities[0].value + self._logger.info( + "Session client identity: {0}".format(client_identity) + ) + return client_identity + else: + raise exceptions.PermissionDenied( + "The client certificate does not define a subject common " + "name. Session client identity unavailable." + ) def _handle_message_loop(self): request_data = self._receive_request() diff --git a/kmip/tests/unit/services/server/test_config.py b/kmip/tests/unit/services/server/test_config.py index f9121dc..e882200 100644 --- a/kmip/tests/unit/services/server/test_config.py +++ b/kmip/tests/unit/services/server/test_config.py @@ -54,6 +54,7 @@ class TestKmipServerConfig(testtools.TestCase): c._set_key_path = mock.MagicMock() c._set_port = mock.MagicMock() c._set_policy_path = mock.MagicMock() + c._set_enable_tls_client_auth = mock.MagicMock() # Test the right error is generated when setting an unsupported # setting. @@ -92,6 +93,9 @@ class TestKmipServerConfig(testtools.TestCase): c.set_setting('policy_path', '/etc/pykmip/policies') c._set_policy_path.assert_called_once_with('/etc/pykmip/policies') + c.set_setting('enable_tls_client_auth', False) + c._set_enable_tls_client_auth.assert_called_once_with(False) + def test_load_settings(self): """ Test that the right calls are made and the right errors generated when @@ -144,6 +148,7 @@ class TestKmipServerConfig(testtools.TestCase): c._set_key_path = mock.MagicMock() c._set_port = mock.MagicMock() c._set_policy_path = mock.MagicMock() + c._set_enable_tls_client_auth = mock.MagicMock() # Test that the right calls are made when correctly parsing settings. parser = configparser.SafeConfigParser() @@ -155,6 +160,7 @@ class TestKmipServerConfig(testtools.TestCase): parser.set('server', 'ca_path', '/test/path/ca.crt') parser.set('server', 'auth_suite', 'Basic') parser.set('server', 'policy_path', '/test/path/policies') + parser.set('server', 'enable_tls_client_auth', 'False') c._parse_settings(parser) @@ -167,6 +173,7 @@ class TestKmipServerConfig(testtools.TestCase): c._set_ca_path.assert_called_once_with('/test/path/ca.crt') c._set_auth_suite.assert_called_once_with('Basic') c._set_policy_path.assert_called_once_with('/test/path/policies') + c._set_enable_tls_client_auth.assert_called_once_with(False) # Test that a ConfigurationError is generated when the expected # section is missing. @@ -520,3 +527,47 @@ class TestKmipServerConfig(testtools.TestCase): *args ) self.assertNotEqual(1, c.settings.get('policy_path')) + + def test_set_enable_tls_client_auth(self): + """ + Test that the enable_tls_client_auth configuration property can be set + correctly. + """ + c = config.KmipServerConfig() + c._logger = mock.MagicMock() + + self.assertIn('enable_tls_client_auth', c.settings.keys()) + self.assertEqual( + True, + c.settings.get('enable_tls_client_auth') + ) + + # Test that the setting is set correctly with a valid value + c._set_enable_tls_client_auth(False) + self.assertEqual( + False, + c.settings.get('enable_tls_client_auth') + ) + + c._set_enable_tls_client_auth(None) + self.assertEqual( + True, + c.settings.get('enable_tls_client_auth') + ) + + c._set_enable_tls_client_auth(True) + self.assertEqual( + True, + c.settings.get('enable_tls_client_auth') + ) + + # Test that a ConfigurationError is generated when setting the wrong + # value. + args = ('invalid',) + self.assertRaisesRegexp( + exceptions.ConfigurationError, + "The flag enabling the TLS certificate client auth flag check " + "must be a boolean.", + c._set_enable_tls_client_auth, + *args + ) diff --git a/kmip/tests/unit/services/server/test_server.py b/kmip/tests/unit/services/server/test_server.py index cf98641..4376b6d 100644 --- a/kmip/tests/unit/services/server/test_server.py +++ b/kmip/tests/unit/services/server/test_server.py @@ -119,7 +119,8 @@ class TestKmipServer(testtools.TestCase): '/etc/pykmip/certs/server.key', '/etc/pykmip/certs/ca.crt', 'Basic', - '/etc/pykmip/policies' + '/etc/pykmip/policies', + False ) s.config.load_settings.assert_called_with('/etc/pykmip/server.conf') @@ -142,6 +143,10 @@ class TestKmipServer(testtools.TestCase): 'policy_path', '/etc/pykmip/policies' ) + s.config.set_setting.assert_any_call( + 'enable_tls_client_auth', + False + ) # Test that an attempt is made to instantiate the TLS 1.2 auth suite s = server.KmipServer(