Merge branch 'feat/config-username-password' into 'develop'

Adding username and password config options

This change adds username and password configuration options to the client section of the configuration file. These options are used to create KMIP Credential objects for authenticating connections to a KMIP appliance.

The KMIP proxy now uses these options when handling operation calls. If no credential is provided, the proxy will automatically create one from the config file. If either the username or the password is unspecified, an error is generated. If neither are specified, no credential is created. Only username/password credentials are currently supported.

The KMIP proxy test suite now includes test cases covering this functionality.
This commit is contained in:
Joel Coffman 2014-10-24 14:50:56 -04:00
commit a429ab8757
4 changed files with 101 additions and 3 deletions

View File

@ -35,6 +35,8 @@ class ConfigHelper(object):
DEFAULT_CA_CERTS = os.path.normpath(os.path.join(
FILE_PATH, '../demos/certs/server.crt'))
DEFAULT_SSL_VERSION = 'PROTOCOL_SSLv23'
DEFAULT_USERNAME = None
DEFAULT_PASSWORD = None
def __init__(self):
self.logger = logging.getLogger(__name__)

View File

@ -8,6 +8,8 @@ ssl_version=PROTOCOL_SSLv23
ca_certs=../demos/certs/server.crt
do_handshake_on_connect=True
suppress_ragged_eofs=True
username=None
password=None
[server]
host=127.0.0.1

View File

@ -22,6 +22,9 @@ from kmip.services.results import LocateResult
from kmip.core import attributes as attr
from kmip.core.enums import Operation as OperationEnum
from kmip.core.enums import CredentialType
from kmip.core.factories.credentials import CredentialFactory
from kmip.core import objects
from kmip.core.server import KMIP
@ -55,13 +58,17 @@ class KMIPProxy(KMIP):
def __init__(self, host=None, port=None, keyfile=None, certfile=None,
cert_reqs=None, ssl_version=None, ca_certs=None,
do_handshake_on_connect=None,
suppress_ragged_eofs=None):
suppress_ragged_eofs=None,
username=None,
password=None):
super(self.__class__, self).__init__()
self.logger = logging.getLogger(__name__)
self.credential_factory = CredentialFactory()
self._set_variables(host, port, keyfile, certfile,
cert_reqs, ssl_version, ca_certs,
do_handshake_on_connect, suppress_ragged_eofs)
do_handshake_on_connect, suppress_ragged_eofs,
username, password)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket = ssl.wrap_socket(
@ -327,9 +334,29 @@ class KMIPProxy(KMIP):
uuids)
return result
# TODO (peter-hamilton) Augment to handle device credentials
def _build_credential(self):
if (self.username is None) and (self.password is None):
return None
if self.username is None:
raise ValueError('cannot build credential, username is None')
if self.password is None:
raise ValueError('cannot build credential, password is None')
credential_type = CredentialType.USERNAME_AND_PASSWORD
credential_value = {'Username': self.username,
'Password': self.password}
credential = self.credential_factory.create_credential(
credential_type,
credential_value)
return credential
def _build_request_message(self, credential, batch_items):
protocol_version = ProtocolVersion.create(1, 1)
if credential is None:
credential = self._build_credential()
authentication = None
if credential is not None:
authentication = Authentication(credential)
@ -352,7 +379,8 @@ class KMIPProxy(KMIP):
def _set_variables(self, host, port, keyfile, certfile,
cert_reqs, ssl_version, ca_certs,
do_handshake_on_connect, suppress_ragged_eofs):
do_handshake_on_connect, suppress_ragged_eofs,
username, password):
conf = ConfigHelper()
self.host = conf.get_valid_value(
@ -389,3 +417,9 @@ class KMIPProxy(KMIP):
self.suppress_ragged_eofs = True
else:
self.suppress_ragged_eofs = False
self.username = conf.get_valid_value(
username, 'client', 'username', conf.DEFAULT_USERNAME)
self.password = conf.get_valid_value(
password, 'client', 'password', conf.DEFAULT_PASSWORD)

View File

@ -248,6 +248,66 @@ class TestKMIPClient(TestCase):
expected, observed, 'value')
self.assertEqual(expected, observed, message)
# TODO (peter-hamilton) Modify for credential type and/or add new test
def test_build_credential(self):
username = 'username'
password = 'password'
cred_type = CredentialType.USERNAME_AND_PASSWORD
self.client.username = username
self.client.password = password
credential = self.client._build_credential()
message = utils.build_er_error(credential.__class__, 'type',
cred_type,
credential.credential_type.enum,
'value')
self.assertEqual(CredentialType.USERNAME_AND_PASSWORD,
credential.credential_type.enum,
message)
message = utils.build_er_error(
credential.__class__, 'type', username,
credential.credential_value.username.value, 'value')
self.assertEqual(username, credential.credential_value.username.value,
message)
message = utils.build_er_error(
credential.__class__, 'type', password,
credential.credential_value.password.value, 'value')
self.assertEqual(password, credential.credential_value.password.value,
message)
def test_build_credential_no_username(self):
username = None
password = 'password'
self.client.username = username
self.client.password = password
exception = self.assertRaises(ValueError,
self.client._build_credential)
self.assertEqual('cannot build credential, username is None',
str(exception))
def test_build_credential_no_password(self):
username = 'username'
password = None
self.client.username = username
self.client.password = password
exception = self.assertRaises(ValueError,
self.client._build_credential)
self.assertEqual('cannot build credential, password is None',
str(exception))
def test_build_credential_no_creds(self):
self.client.username = None
self.client.password = None
credential = self.client._build_credential()
self.assertEqual(None, credential)
def _shutdown_server(self):
if self.server.poll() is not None:
return