Adding custom TLS cipher suite config option

This change adds a server configuration option, tls_cipher_suites,
allowing the server admin to specify a list of cipher suites to be
used when establishing TLS connections with clients. The custom
list supports both cipher suite specification and OpenSSL suite
naming conventions. The list is filtered through a KMIP-approved
set of cipher suites, and then through a set of cipher suites
suitable for the configured authentication suite. Additional debug
logging has been added to the server to provide transparency on
this process.
This commit is contained in:
Peter Hamilton 2017-09-15 13:07:00 -04:00
parent c3696a9877
commit ee857ca4a3
9 changed files with 520 additions and 67 deletions

View File

@ -7,3 +7,7 @@ ca_path=/etc/pykmip/certs/server_ca_cert.pem
auth_suite=Basic
policy_path=/etc/pykmip/policies
enable_tls_client_auth=True
tls_cipher_suites=
EXAMPLE_CIPHER_SUITE_1
EXAMPLE_CIPHER_SUITE_2
EXAMPLE_CIPHER_SUITE_3

View File

@ -26,13 +26,108 @@ class AuthenticationSuite(object):
Acts as the base of the suite hierarchy.
"""
# OpenSSL cipher suites
# Explicitly listed suites for Basic and TLSv1.2 authentication for KMIP
# profiles.
#
# Obtained from:
# https://www.openssl.org/docs/man1.1.0/apps/ciphers.html
# https://www.openssl.org/docs/man1.0.2/apps/ciphers.html
openssl_cipher_suite_map = {
# TLS v1.2 cipher suites
'TLS_RSA_WITH_AES_256_CBC_SHA256': 'AES256-SHA256',
'TLS_RSA_WITH_AES_128_CBC_SHA256': 'AES128-SHA256',
'TLS_DH_DSS_WITH_AES_128_CBC_SHA256': 'DH-DSS-AES128-SHA256',
'TLS_DH_RSA_WITH_AES_128_CBC_SHA256': 'DH-RSA-AES128-SHA256',
'TLS_DHE_DSS_WITH_AES_128_CBC_SHA256': 'DHE-DSS-AES128-SHA256',
'TLS_DHE_RSA_WITH_AES_128_CBC_SHA256': 'DHE-RSA-AES128-SHA256',
'TLS_DH_DSS_WITH_AES_256_CBC_SHA256': 'DH-DSS-AES256-SHA256',
'TLS_DH_RSA_WITH_AES_256_CBC_SHA256': 'DH-RSA-AES256-SHA256',
'TLS_DHE_DSS_WITH_AES_256_CBC_SHA256': 'DHE-DSS-AES256-SHA256',
'TLS_DHE_RSA_WITH_AES_256_CBC_SHA256': 'DHE-RSA-AES256-SHA256',
'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256': 'ECDHE-ECDSA-AES128-SHA256',
'TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384': 'ECDHE-ECDSA-AES256-SHA384',
'TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256': 'ECDHE-RSA-AES128-SHA256',
'TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384': 'ECDHE-RSA-AES256-SHA384',
'TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256':
'ECDHE-ECDSA-AES128-GCM-SHA256',
'TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384':
'ECDHE-ECDSA-AES256-GCM-SHA384',
'TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256': 'ECDH-ECDSA-AES128-SHA256',
'TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA384': 'ECDH-ECDSA-AES256-SHA384',
'TLS_ECDH_RSA_WITH_AES_128_CBC_SHA256': 'ECDH-RSA-AES128-SHA256',
'TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384': 'ECDH-RSA-AES256-SHA384',
# AES ciphersuites from RFC3268, extending TLS v1.0
'TLS_RSA_WITH_AES_128_CBC_SHA': 'AES128-SHA',
'TLS_RSA_WITH_AES_256_CBC_SHA': 'AES256-SHA',
'TLS_DH_DSS_WITH_AES_128_CBC_SHA': 'DH-DSS-AES128-SHA',
'TLS_DH_RSA_WITH_AES_128_CBC_SHA': 'DH-RSA-AES128-SHA',
'TLS_DHE_DSS_WITH_AES_128_CBC_SHA': 'DHE-DSS-AES128-SHA',
'TLS_DHE_RSA_WITH_AES_128_CBC_SHA': 'DHE-RSA-AES128-SHA',
'TLS_DH_DSS_WITH_AES_256_CBC_SHA': 'DH-DSS-AES256-SHA',
'TLS_DH_RSA_WITH_AES_256_CBC_SHA': 'DH-RSA-AES256-SHA',
'TLS_DHE_DSS_WITH_AES_256_CBC_SHA': 'DHE-DSS-AES256-SHA',
'TLS_DHE_RSA_WITH_AES_256_CBC_SHA': 'DHE-RSA-AES256-SHA',
# Elliptic curve cipher suites.
'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA': 'ECDHE-ECDSA-AES128-SHA',
'TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA': 'ECDHE-RSA-AES128-SHA',
# Pre shared keying (PSK) cipheruites
'TLS_PSK_WITH_AES_128_CBC_SHA': 'PSK-AES128-CBC-SHA',
'TLS_PSK_WITH_AES_256_CBC_SHA': 'PSK-AES256-CBC-SHA',
# No OpenSSL support
'TLS_DHE_PSK_WITH_AES_128_CBC_SHA': None,
'TLS_DHE_PSK_WITH_AES_256_CBC_SHA': None,
'TLS_RSA_PSK_WITH_AES_128_CBC_SHA': None,
'TLS_RSA_PSK_WITH_AES_256_CBC_SHA': None
}
_default_cipher_suites = []
@abc.abstractmethod
def __init__(self):
def __init__(self, cipher_suites=None):
"""
Create an AuthenticationSuite object.
Args:
cipher_suites (list): A list of strings representing the names of
cipher suites to use. Overrides the default set of cipher
suites. Optional, defaults to None.
"""
self._profile = []
self._ciphers = ''
self._custom_suites = []
# Compose a unique list of custom cipher suites if any were provided.
# Translate each suite name into its corresponding OpenSSL suite name,
# allowing for both specification and OpenSSL suite names in the
# provided list.
if cipher_suites:
for cipher_suite in cipher_suites:
if cipher_suite in self.openssl_cipher_suite_map.keys():
suite = self.openssl_cipher_suite_map.get(cipher_suite)
if suite:
self._custom_suites.append(suite)
elif cipher_suite in self.openssl_cipher_suite_map.values():
if cipher_suite:
self._custom_suites.append(cipher_suite)
self._custom_suites = list(set(self._custom_suites))
# Filter the custom suites to only include those from the default
# cipher suite list (provided for each subclass authentication suite).
# If no custom suites were specified, use the default cipher suites.
suites = []
if self._custom_suites:
for suite in self._custom_suites:
if suite in self._default_cipher_suites:
suites.append(suite)
else:
suites = self._default_cipher_suites
self._cipher_suites = ':'.join(suites)
if self._cipher_suites == '':
self._cipher_suites = ':'.join(self._default_cipher_suites)
@property
def protocol(self):
@ -53,7 +148,7 @@ class AuthenticationSuite(object):
string: A colon delimited string listing the valid ciphers for
the suite protocol.
"""
return self._ciphers
return self._cipher_suites
class BasicAuthenticationSuite(AuthenticationSuite):
@ -64,26 +159,32 @@ class BasicAuthenticationSuite(AuthenticationSuite):
in NIST 800-57, as defined by the KMIP specification.
"""
def __init__(self):
_default_cipher_suites = [
'AES128-SHA',
'DES-CBC3-SHA',
'AES256-SHA',
'DHE-DSS-DES-CBC3-SHA',
'DHE-RSA-DES-CBC3-SHA',
'DH-DSS-AES128-SHA',
'DH-RSA-AES128-SHA',
'DHE-DSS-AES128-SHA',
'DHE-RSA-AES128-SHA',
'DH-RSA-AES256-SHA',
'DHE-DSS-AES256-SHA',
'DHE-RSA-AES256-SHA'
]
def __init__(self, cipher_suites=None):
"""
Create a BasicAuthenticationSuite object.
Args:
cipher_suites (list): A list of strings representing the names of
cipher suites to use. Overrides the default set of cipher
suites. Optional, defaults to None.
"""
super(BasicAuthenticationSuite, self).__init__()
super(BasicAuthenticationSuite, self).__init__(cipher_suites)
self._protocol = ssl.PROTOCOL_TLSv1
self._ciphers = ':'.join((
'AES128-SHA',
'DES-CBC3-SHA',
'AES256-SHA',
'DHE-DSS-DES-CBC3-SHA',
'DHE-RSA-DES-CBC3-SHA',
'DH-DSS-AES128-SHA',
'DH-RSA-AES128-SHA',
'DHE-DSS-AES128-SHA',
'DHE-RSA-AES128-SHA',
'DH-RSA-AES256-SHA',
'DHE-DSS-AES256-SHA',
'DHE-RSA-AES256-SHA',
))
class TLS12AuthenticationSuite(AuthenticationSuite):
@ -94,34 +195,40 @@ class TLS12AuthenticationSuite(AuthenticationSuite):
in NIST 800-57, as defined by the KMIP specification.
"""
def __init__(self):
_default_cipher_suites = [
'AES128-SHA256',
'AES256-SHA256',
'DH-DSS-AES256-SHA256',
'DH-DSS-AES128-SHA256',
'DH-RSA-AES128-SHA256',
'DHE-DSS-AES128-SHA256',
'DHE-RSA-AES128-SHA256',
'DH-DSS-AES256-SHA256',
'DH-RSA-AES256-SHA256',
'DHE-DSS-AES256-SHA256',
'DHE-RSA-AES256-SHA256',
'ECDH-ECDSA-AES128-SHA256',
'ECDH-ECDSA-AES256-SHA256',
'ECDHE-ECDSA-AES128-SHA256',
'ECDHE-ECDSA-AES256-SHA384',
'ECDH-RSA-AES128-SHA256',
'ECDH-RSA-AES256-SHA384',
'ECDHE-RSA-AES128-SHA256',
'ECDHE-RSA-AES256-SHA384',
'ECDHE-ECDSA-AES128-GCM-SHA256',
'ECDHE-ECDSA-AES256-GCM-SHA384',
'ECDHE-ECDSA-AES128-SHA256',
'ECDHE-ECDSA-AES256-SHA384'
]
def __init__(self, cipher_suites=None):
"""
Create a TLS12AuthenticationSuite object.
Args:
cipher_suites (list): A list of strings representing the names of
cipher suites to use. Overrides the default set of cipher
suites. Optional, defaults to None.
"""
super(TLS12AuthenticationSuite, self).__init__()
super(TLS12AuthenticationSuite, self).__init__(cipher_suites)
self._protocol = ssl.PROTOCOL_TLSv1_2
self._ciphers = ':'.join((
'AES128-SHA256',
'AES256-SHA256',
'DH-DSS-AES256-SHA256',
'DH-DSS-AES128-SHA256',
'DH-RSA-AES128-SHA256',
'DHE-DSS-AES128-SHA256',
'DHE-RSA-AES128-SHA256',
'DH-DSS-AES256-SHA256',
'DH-RSA-AES256-SHA256',
'DHE-DSS-AES256-SHA256',
'DHE-RSA-AES256-SHA256',
'ECDH-ECDSA-AES128-SHA256',
'ECDH-ECDSA-AES256-SHA256',
'ECDHE-ECDSA-AES128-SHA256',
'ECDHE-ECDSA-AES256-SHA384',
'ECDH-RSA-AES128-SHA256',
'ECDH-RSA-AES256-SHA384',
'ECDHE-RSA-AES128-SHA256',
'ECDHE-RSA-AES256-SHA384',
'ECDHE-ECDSA-AES128-GCM-SHA256',
'ECDHE-ECDSA-AES256-GCM-SHA384',
'ECDHE-ECDSA-AES128-SHA256',
'ECDHE-ECDSA-AES256-SHA384',
))

View File

@ -35,6 +35,7 @@ class KmipServerConfig(object):
self.settings = dict()
self.settings['enable_tls_client_auth'] = True
self.settings['tls_cipher_suites'] = []
self._expected_settings = [
'hostname',
@ -46,7 +47,8 @@ class KmipServerConfig(object):
]
self._optional_settings = [
'policy_path',
'enable_tls_client_auth'
'enable_tls_client_auth',
'tls_cipher_suites'
]
def set_setting(self, setting, value):
@ -84,8 +86,10 @@ class KmipServerConfig(object):
self._set_auth_suite(value)
elif setting == 'policy_path':
self._set_policy_path(value)
else:
elif setting == 'enable_tls_client_auth':
self._set_enable_tls_client_auth(value)
else:
self._set_tls_cipher_suites(value)
def load_settings(self, path):
"""
@ -156,6 +160,10 @@ class KmipServerConfig(object):
self._set_enable_tls_client_auth(
parser.getboolean('server', 'enable_tls_client_auth')
)
if parser.has_option('server', 'tls_cipher_suites'):
self._set_tls_cipher_suites(
parser.get('server', 'tls_cipher_suites')
)
def _set_hostname(self, value):
if isinstance(value, six.string_types):
@ -261,3 +269,24 @@ class KmipServerConfig(object):
"The flag enabling the TLS certificate client auth flag check "
"must be a boolean."
)
def _set_tls_cipher_suites(self, value):
if not value:
self.settings['tls_cipher_suites'] = []
return
if isinstance(value, six.string_types):
value = value.split()
if isinstance(value, list):
for entry in value:
if not isinstance(entry, six.string_types):
raise exceptions.ConfigurationError(
"The TLS cipher suites must be a set of strings "
"representing cipher suite names."
)
self.settings['tls_cipher_suites'] = list(set(value))
else:
raise exceptions.ConfigurationError(
"The TLS cipher suites must be a set of strings representing "
"cipher suite names."
)

View File

@ -51,7 +51,8 @@ class KmipServer(object):
config_path='/etc/pykmip/server.conf',
log_path='/var/log/pykmip/server.log',
policy_path=None,
enable_tls_client_auth=None
enable_tls_client_auth=None,
tls_cipher_suites=None
):
"""
Create a KmipServer.
@ -100,6 +101,12 @@ class KmipServer(object):
certificate client auth flag should be required for client
certificates when establishing a new client session. Optional,
defaults to None.
tls_cipher_suites (string): A comma-delimited list of cipher suite
names (e.g., TLS_RSA_WITH_AES_256_CBC_SHA256,TLS_RSA_WITH_AES_
128_CBC_SHA256), indicating which specific cipher suites should
be used by the server when establishing a TLS connection with
a client. Optional, defaults to None. If None, the default set
of TLS cipher suites will be used.
"""
self._logger = logging.getLogger('kmip.server')
self._setup_logging(log_path)
@ -114,13 +121,15 @@ class KmipServer(object):
ca_path,
auth_suite,
policy_path,
enable_tls_client_auth
enable_tls_client_auth,
tls_cipher_suites
)
cipher_suites = self.config.settings.get('tls_cipher_suites')
if self.config.settings.get('auth_suite') == 'TLS1.2':
self.auth_suite = auth.TLS12AuthenticationSuite()
self.auth_suite = auth.TLS12AuthenticationSuite(cipher_suites)
else:
self.auth_suite = auth.BasicAuthenticationSuite()
self.auth_suite = auth.BasicAuthenticationSuite(cipher_suites)
self._engine = engine.KmipEngine(
self.config.settings.get('policy_path')
@ -147,7 +156,7 @@ class KmipServer(object):
)
)
self._logger.addHandler(handler)
self._logger.setLevel(logging.INFO)
self._logger.setLevel(logging.DEBUG)
def _setup_configuration(
self,
@ -159,7 +168,8 @@ class KmipServer(object):
ca_path=None,
auth_suite=None,
policy_path=None,
enable_tls_client_auth=None
enable_tls_client_auth=None,
tls_cipher_suites=None
):
if path:
self.config.load_settings(path)
@ -183,6 +193,11 @@ class KmipServer(object):
'enable_tls_client_auth',
enable_tls_client_auth
)
if tls_cipher_suites:
self.config.set_setting(
'tls_cipher_suites',
tls_cipher_suites.split(',')
)
def start(self):
"""
@ -202,6 +217,22 @@ class KmipServer(object):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._logger.debug(
"Configured cipher suites: {0}".format(
len(self.config.settings.get('tls_cipher_suites'))
)
)
for cipher in self.config.settings.get('tls_cipher_suites'):
self._logger.debug(cipher)
auth_suite_ciphers = self.auth_suite.ciphers.split(':')
self._logger.debug(
"Authentication suite ciphers to use: {0}".format(
len(auth_suite_ciphers)
)
)
for cipher in auth_suite_ciphers:
self._logger.debug(cipher)
self._socket = ssl.wrap_socket(
self._socket,
keyfile=self.config.settings.get('key_path'),

View File

@ -158,6 +158,17 @@ class KmipSession(threading.Thread):
max_size = self._max_response_size
try:
shared_ciphers = self._connection.shared_ciphers()
self._logger.debug(
"Possible session ciphers: {0}".format(len(shared_ciphers))
)
for cipher in shared_ciphers:
self._logger.debug(cipher)
self._logger.debug(
"Session cipher selected: {0}".format(
self._connection.cipher()
)
)
client_identity = self._get_client_identity()
request.read(request_data)
except Exception as e:

View File

@ -38,7 +38,16 @@ class TestKmipServerConfig(testtools.TestCase):
"""
Test that a KmipServerConfig object can be created without error.
"""
config.KmipServerConfig()
c = config.KmipServerConfig()
self.assertIn('enable_tls_client_auth', c.settings.keys())
self.assertEqual(
True,
c.settings.get('enable_tls_client_auth')
)
self.assertIn('tls_cipher_suites', c.settings.keys())
self.assertEqual([], c.settings.get('tls_cipher_suites'))
def test_set_setting(self):
"""
@ -55,6 +64,7 @@ class TestKmipServerConfig(testtools.TestCase):
c._set_port = mock.MagicMock()
c._set_policy_path = mock.MagicMock()
c._set_enable_tls_client_auth = mock.MagicMock()
c._set_tls_cipher_suites = mock.MagicMock()
# Test the right error is generated when setting an unsupported
# setting.
@ -96,6 +106,9 @@ class TestKmipServerConfig(testtools.TestCase):
c.set_setting('enable_tls_client_auth', False)
c._set_enable_tls_client_auth.assert_called_once_with(False)
c.set_setting('tls_cipher_suites', [])
c._set_tls_cipher_suites.assert_called_once_with([])
def test_load_settings(self):
"""
Test that the right calls are made and the right errors generated when
@ -149,6 +162,7 @@ class TestKmipServerConfig(testtools.TestCase):
c._set_port = mock.MagicMock()
c._set_policy_path = mock.MagicMock()
c._set_enable_tls_client_auth = mock.MagicMock()
c._set_tls_cipher_suites = mock.MagicMock()
# Test that the right calls are made when correctly parsing settings.
parser = configparser.SafeConfigParser()
@ -161,6 +175,11 @@ class TestKmipServerConfig(testtools.TestCase):
parser.set('server', 'auth_suite', 'Basic')
parser.set('server', 'policy_path', '/test/path/policies')
parser.set('server', 'enable_tls_client_auth', 'False')
parser.set(
'server',
'tls_cipher_suites',
"\n TLS_RSA_WITH_AES_256_CBC_SHA256"
)
c._parse_settings(parser)
@ -174,6 +193,9 @@ class TestKmipServerConfig(testtools.TestCase):
c._set_auth_suite.assert_called_once_with('Basic')
c._set_policy_path.assert_called_once_with('/test/path/policies')
c._set_enable_tls_client_auth.assert_called_once_with(False)
c._set_tls_cipher_suites.assert_called_once_with(
"\n TLS_RSA_WITH_AES_256_CBC_SHA256"
)
# Test that a ConfigurationError is generated when the expected
# section is missing.
@ -536,12 +558,6 @@ class TestKmipServerConfig(testtools.TestCase):
c = config.KmipServerConfig()
c._logger = mock.MagicMock()
self.assertIn('enable_tls_client_auth', c.settings.keys())
self.assertEqual(
True,
c.settings.get('enable_tls_client_auth')
)
# Test that the setting is set correctly with a valid value
c._set_enable_tls_client_auth(False)
self.assertEqual(
@ -571,3 +587,102 @@ class TestKmipServerConfig(testtools.TestCase):
c._set_enable_tls_client_auth,
*args
)
def test_set_tls_cipher_suites(self):
"""
Test that the tls_cipher_suites configuration property can be set
correctly with a value expected from the config file.
"""
c = config.KmipServerConfig()
c._logger = mock.MagicMock()
c._set_tls_cipher_suites(
"""
TLS_RSA_WITH_AES_256_CBC_SHA256
TLS_RSA_WITH_AES_128_CBC_SHA256
"""
)
self.assertEqual(2, len(c.settings.get('tls_cipher_suites')))
self.assertIn(
'TLS_RSA_WITH_AES_256_CBC_SHA256',
c.settings.get('tls_cipher_suites')
)
self.assertIn(
'TLS_RSA_WITH_AES_128_CBC_SHA256',
c.settings.get('tls_cipher_suites')
)
def test_set_tls_cipher_suites_preparsed(self):
"""
Test that the tls_cipher_suites configuration property can be set
correctly with a preparsed list of TLS cipher suites, the value
expected if the cipher suites were provided via constructor.
"""
c = config.KmipServerConfig()
c._logger = mock.MagicMock()
c._set_tls_cipher_suites(
[
'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA',
'TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384',
'DH-DSS-AES128-SHA'
]
)
self.assertEqual(3, len(c.settings.get('tls_cipher_suites')))
self.assertIn(
'DH-DSS-AES128-SHA',
c.settings.get('tls_cipher_suites')
)
self.assertIn(
'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA',
c.settings.get('tls_cipher_suites')
)
self.assertIn(
'TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384',
c.settings.get('tls_cipher_suites')
)
def test_set_tls_cipher_suites_empty(self):
"""
Test that the tls_cipher_suites configuration property can be set
correctly with an empty value.
"""
c = config.KmipServerConfig()
c._logger = mock.MagicMock()
c._set_tls_cipher_suites(None)
self.assertEqual([], c.settings.get('tls_cipher_suites'))
def test_set_tls_cipher_suites_invalid_value(self):
"""
Test that the right error is raised when an invalid value is used to
set the tls_cipher_suites configuration property.
"""
c = config.KmipServerConfig()
c._logger = mock.MagicMock()
args = (1,)
self.assertRaisesRegexp(
exceptions.ConfigurationError,
"The TLS cipher suites must be a set of strings representing "
"cipher suite names.",
c._set_tls_cipher_suites,
*args
)
def test_set_tls_cipher_suites_invalid_list_value(self):
"""
Test that the right error is raised when an invalid list value is used
to set the tls_cipher_suites configuration property.
"""
c = config.KmipServerConfig()
c._logger = mock.MagicMock()
args = ([0],)
self.assertRaisesRegexp(
exceptions.ConfigurationError,
"The TLS cipher suites must be a set of strings representing "
"cipher suite names.",
c._set_tls_cipher_suites,
*args
)

View File

@ -95,7 +95,7 @@ class TestKmipServer(testtools.TestCase):
open_mock.assert_called_once_with('/test/path/server.log', 'w')
self.assertTrue(s._logger.addHandler.called)
s._logger.setLevel.assert_called_once_with(logging.INFO)
s._logger.setLevel.assert_called_once_with(logging.DEBUG)
@mock.patch('kmip.services.server.engine.KmipEngine')
@mock.patch('kmip.services.auth.TLS12AuthenticationSuite')
@ -120,7 +120,8 @@ class TestKmipServer(testtools.TestCase):
'/etc/pykmip/certs/ca.crt',
'Basic',
'/etc/pykmip/policies',
False
False,
'TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA'
)
s.config.load_settings.assert_called_with('/etc/pykmip/server.conf')
@ -147,6 +148,13 @@ class TestKmipServer(testtools.TestCase):
'enable_tls_client_auth',
False
)
s.config.set_setting.assert_any_call(
'tls_cipher_suites',
[
'TLS_RSA_WITH_AES_128_CBC_SHA',
'TLS_RSA_WITH_AES_256_CBC_SHA'
]
)
# Test that an attempt is made to instantiate the TLS 1.2 auth suite
s = server.KmipServer(
@ -170,8 +178,10 @@ class TestKmipServer(testtools.TestCase):
s = server.KmipServer(
hostname='127.0.0.1',
port=5696,
auth_suite='Basic',
config_path=None,
policy_path=None
policy_path=None,
tls_cipher_suites='TLS_RSA_WITH_AES_128_CBC_SHA'
)
s._logger = mock.MagicMock()
@ -188,6 +198,13 @@ class TestKmipServer(testtools.TestCase):
s._logger.info.assert_any_call(
"Starting server socket handler."
)
s._logger.debug.assert_any_call("Configured cipher suites: 1")
s._logger.debug.assert_any_call("TLS_RSA_WITH_AES_128_CBC_SHA")
s._logger.debug.assert_any_call(
"Authentication suite ciphers to use: 1"
)
s._logger.debug.assert_any_call("AES128-SHA")
socket_mock.assert_called_once_with(
socket.AF_INET,
socket.SOCK_STREAM

View File

@ -365,6 +365,15 @@ class TestKmipSession(testtools.TestCase):
)
kmip_session._logger = mock.MagicMock()
kmip_session._connection = mock.MagicMock()
kmip_session._connection.shared_ciphers = mock.MagicMock(
return_value=[
('AES128-SHA256', 'TLSv1/SSLv3', 128),
('AES256-SHA256', 'TLSv1/SSLv3', 256)
]
)
kmip_session._connection.cipher = mock.MagicMock(
return_value=('AES128-SHA256', 'TLSv1/SSLv3', 128)
)
kmip_session._receive_request = mock.MagicMock(return_value=data)
kmip_session._send_response = mock.MagicMock()
@ -372,6 +381,20 @@ class TestKmipSession(testtools.TestCase):
kmip_session._receive_request.assert_called_once_with()
kmip_session._logger.info.assert_not_called()
kmip_session._logger.debug.assert_any_call(
"Possible session ciphers: 2"
)
kmip_session._logger.debug.assert_any_call(
('AES128-SHA256', 'TLSv1/SSLv3', 128)
)
kmip_session._logger.debug.assert_any_call(
('AES256-SHA256', 'TLSv1/SSLv3', 256)
)
kmip_session._logger.debug.assert_any_call(
"Session cipher selected: {0}".format(
('AES128-SHA256', 'TLSv1/SSLv3', 128)
)
)
kmip_session._logger.warning.assert_not_called()
kmip_session._logger.exception.assert_not_called()
self.assertTrue(kmip_session._send_response.called)
@ -425,7 +448,7 @@ class TestKmipSession(testtools.TestCase):
kmip_session._handle_message_loop()
kmip_session._receive_request.assert_called_once_with()
kmip_session._logger.info.assert_not_called()
# kmip_session._logger.info.assert_not_called()
self.assertTrue(kmip_session._logger.warning.called)
kmip_session._logger.exception.assert_not_called()
self.assertTrue(kmip_session._send_response.called)
@ -456,7 +479,7 @@ class TestKmipSession(testtools.TestCase):
kmip_session._handle_message_loop()
kmip_session._receive_request.assert_called_once_with()
kmip_session._logger.info.assert_not_called()
# kmip_session._logger.info.assert_not_called()
kmip_session._logger.warning.assert_called_once_with(
"An unexpected error occurred while processing request."
)

View File

@ -64,6 +64,60 @@ class TestBasicAuthenticationSuite(testtools.TestCase):
self.assertEqual(cipher_string, ciphers)
def test_custom_ciphers(self):
"""
Test that providing a custom list of cipher suites yields the right
cipher string for the Basic auth suite.
"""
suite = auth.BasicAuthenticationSuite(
[
'TLS_RSA_WITH_AES_128_CBC_SHA',
'TLS_RSA_WITH_AES_256_CBC_SHA',
'TLS_DHE_PSK_WITH_AES_128_CBC_SHA',
'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA',
'DHE-DSS-AES256-SHA',
'DHE-RSA-AES256-SHA'
]
)
ciphers = suite.ciphers
self.assertIsInstance(ciphers, str)
suites = ciphers.split(':')
self.assertEqual(4, len(suites))
self.assertIn('AES128-SHA', suites)
self.assertIn('AES256-SHA', suites)
self.assertIn('DHE-DSS-AES256-SHA', suites)
self.assertIn('DHE-RSA-AES256-SHA', suites)
def test_custom_ciphers_empty(self):
"""
Test that providing a custom list of cipher suites that ultimately
yields an empty suite list causes the default cipher suite list to
be provided instead.
"""
suite = auth.BasicAuthenticationSuite(
[
'TLS_RSA_WITH_AES_256_CBC_SHA256'
]
)
ciphers = suite.ciphers
self.assertIsInstance(ciphers, str)
suites = ciphers.split(':')
self.assertEqual(12, len(suites))
self.assertIn('AES128-SHA', suites)
self.assertIn('DES-CBC3-SHA', suites)
self.assertIn('AES256-SHA', suites)
self.assertIn('DHE-DSS-DES-CBC3-SHA', suites)
self.assertIn('DHE-RSA-DES-CBC3-SHA', suites)
self.assertIn('DH-DSS-AES128-SHA', suites)
self.assertIn('DH-RSA-AES128-SHA', suites)
self.assertIn('DHE-DSS-AES128-SHA', suites)
self.assertIn('DHE-RSA-AES128-SHA', suites)
self.assertIn('DH-RSA-AES256-SHA', suites)
self.assertIn('DHE-DSS-AES256-SHA', suites)
self.assertIn('DHE-RSA-AES256-SHA', suites)
@pytest.mark.skipif(not hasattr(ssl, 'PROTOCOL_TLSv1_2'),
reason="Requires ssl.PROTOCOL_TLSv1_2")
@ -121,3 +175,65 @@ class TestTLS12AuthenticationSuite(testtools.TestCase):
))
self.assertEqual(cipher_string, ciphers)
def test_custom_ciphers(self):
"""
Test that providing a custom list of cipher suites yields the right
cipher string.
"""
suite = auth.TLS12AuthenticationSuite(
[
'TLS_RSA_WITH_AES_256_CBC_SHA256',
'TLS_RSA_WITH_AES_256_CBC_SHA',
'TLS_DHE_PSK_WITH_AES_128_CBC_SHA',
'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA',
'DHE-DSS-AES256-SHA',
'DHE-RSA-AES256-SHA'
]
)
ciphers = suite.ciphers
self.assertIsInstance(ciphers, str)
suites = ciphers.split(':')
self.assertEqual(1, len(suites))
self.assertIn('AES256-SHA256', suites)
def test_custom_ciphers_empty(self):
"""
Test that providing a custom list of cipher suites that ultimately
yields an empty suite list causes the default cipher suite list to
be provided instead.
"""
suite = auth.TLS12AuthenticationSuite(
[
'TLS_RSA_WITH_AES_256_CBC_SHA'
]
)
ciphers = suite.ciphers
self.assertIsInstance(ciphers, str)
suites = ciphers.split(':')
self.assertEqual(23, len(suites))
self.assertIn('AES128-SHA256', suites)
self.assertIn('AES256-SHA256', suites)
self.assertIn('DH-DSS-AES256-SHA256', suites)
self.assertIn('DH-DSS-AES128-SHA256', suites)
self.assertIn('DH-RSA-AES128-SHA256', suites)
self.assertIn('DHE-DSS-AES128-SHA256', suites)
self.assertIn('DHE-RSA-AES128-SHA256', suites)
self.assertIn('DH-DSS-AES256-SHA256', suites)
self.assertIn('DH-RSA-AES256-SHA256', suites)
self.assertIn('DHE-DSS-AES256-SHA256', suites)
self.assertIn('DHE-RSA-AES256-SHA256', suites)
self.assertIn('ECDH-ECDSA-AES128-SHA256', suites)
self.assertIn('ECDH-ECDSA-AES256-SHA256', suites)
self.assertIn('ECDHE-ECDSA-AES128-SHA256', suites)
self.assertIn('ECDHE-ECDSA-AES256-SHA384', suites)
self.assertIn('ECDH-RSA-AES128-SHA256', suites)
self.assertIn('ECDH-RSA-AES256-SHA384', suites)
self.assertIn('ECDHE-RSA-AES128-SHA256', suites)
self.assertIn('ECDHE-RSA-AES256-SHA384', suites)
self.assertIn('ECDHE-ECDSA-AES128-GCM-SHA256', suites)
self.assertIn('ECDHE-ECDSA-AES256-GCM-SHA384', suites)
self.assertIn('ECDHE-ECDSA-AES128-SHA256', suites)
self.assertIn('ECDHE-ECDSA-AES256-SHA384', suites)