From 86b23a9d53b296dfb65a5000aa19024b0c7e7745 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Sat, 24 Feb 2018 12:34:07 -0500 Subject: [PATCH] Update the Credential objects This change updates the implementation of the Credential objects. The UsernamePassword and Device credentials are now first-class objects and, along with the base Credential, have been restructured to match the current struct style. Comprehensive unit test suites for each class have been added. Additionally, the credential factory code and its usage in the KMIPProxy class and associated test suites have been updated to reflect this change. --- kmip/core/factories/credentials.py | 67 +- kmip/core/objects.py | 752 +++++-- .../unit/core/objects/test_credentials.py | 1991 +++++++++++++++++ kmip/tests/unit/services/test_kmip_client.py | 28 +- 4 files changed, 2614 insertions(+), 224 deletions(-) create mode 100644 kmip/tests/unit/core/objects/test_credentials.py diff --git a/kmip/core/factories/credentials.py b/kmip/core/factories/credentials.py index 2ca7980..81d7a86 100644 --- a/kmip/core/factories/credentials.py +++ b/kmip/core/factories/credentials.py @@ -13,43 +13,41 @@ # License for the specific language governing permissions and limitations # under the License. -from kmip.core.enums import CredentialType - -from kmip.core.objects import Credential +from kmip.core import enums +from kmip.core import objects class CredentialFactory(object): - def __init__(self): - pass - def _create_credential(self, credential_type, credential_value): - credential_type = Credential.CredentialType(credential_type) - return Credential(credential_type=credential_type, - credential_value=credential_value) - - def create_credential(self, cred_type, value): + def create_credential(self, credential_type, credential_value): # Switch on the type of the credential - if cred_type is CredentialType.USERNAME_AND_PASSWORD: - value = self._create_username_password_credential(value) - elif cred_type is CredentialType.DEVICE: - value = self._create_device_credential(value) + if credential_type is enums.CredentialType.USERNAME_AND_PASSWORD: + credential_value = self.create_username_password_credential( + credential_value + ) + elif credential_type is enums.CredentialType.DEVICE: + credential_value = self.create_device_credential(credential_value) else: msg = 'Unrecognized credential type: {0}' - raise ValueError(msg.format(cred_type)) + raise ValueError(msg.format(credential_type)) - return self._create_credential(cred_type, value) + return objects.Credential( + credential_type=credential_type, + credential_value=credential_value + ) - def _create_username_password_credential(self, value): + @staticmethod + def create_username_password_credential(value): username = value.get('Username') password = value.get('Password') - username = Credential.UsernamePasswordCredential.Username(username) - password = Credential.UsernamePasswordCredential.Password(password) + return objects.UsernamePasswordCredential( + username=username, + password=password + ) - return Credential.UsernamePasswordCredential(username=username, - password=password) - - def _create_device_credential(self, value): + @staticmethod + def create_device_credential(value): dsn = value.get('Device Serial Number') password = value.get('Password') dev_id = value.get('Device Identifier') @@ -57,16 +55,11 @@ class CredentialFactory(object): mach_id = value.get('Machine Identifier') med_id = value.get('Media Identifier') - dsn = Credential.DeviceCredential.DeviceSerialNumber(dsn) - password = Credential.DeviceCredential.Password(password) - dev_id = Credential.DeviceCredential.DeviceIdentifier(dev_id) - net_id = Credential.DeviceCredential.NetworkIdentifier(net_id) - mach_id = Credential.DeviceCredential.MachineIdentifier(mach_id) - med_id = Credential.DeviceCredential.MediaIdentifier(med_id) - - return Credential.DeviceCredential(device_serial_number=dsn, - password=password, - device_identifier=dev_id, - network_identifier=net_id, - machine_identifier=mach_id, - media_identifier=med_id) + return objects.DeviceCredential( + device_serial_number=dsn, + password=password, + device_identifier=dev_id, + network_identifier=net_id, + machine_identifier=mach_id, + media_identifier=med_id + ) diff --git a/kmip/core/objects.py b/kmip/core/objects.py index 20c38a3..b29c110 100644 --- a/kmip/core/objects.py +++ b/kmip/core/objects.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import abc import six from six.moves import xrange @@ -25,7 +26,6 @@ from kmip.core import enums from kmip.core.enums import AttributeType from kmip.core.enums import Tags from kmip.core.enums import Types -from kmip.core.enums import CredentialType from kmip.core.enums import RevocationReasonCode as RevocationReasonCodeEnum from kmip.core import exceptions @@ -172,224 +172,646 @@ class Attribute(Struct): return NotImplemented -# 2.1.2 -class Credential(Struct): +@six.add_metaclass(abc.ABCMeta) +class CredentialValue(primitives.Struct): + """ + An empty, abstract base class to be used by Credential objects to easily + group and type-check credential values. + """ - class CredentialType(Enumeration): - def __init__(self, value=None): - super(Credential.CredentialType, self).__init__( - CredentialType, value, Tags.CREDENTIAL_TYPE) +class UsernamePasswordCredential(CredentialValue): + """ + A struct representing a UsernamePasswordCredential object. - class UsernamePasswordCredential(Struct): + Attributes: + username: The username identifying the credential. + password: The password associated with the username. + """ - class Username(TextString): - def __init__(self, value=None): - super(Credential.UsernamePasswordCredential.Username, - self).__init__( - value, Tags.USERNAME) + def __init__(self, username=None, password=None): + """ + Construct a UsernamePasswordCredential struct. - class Password(TextString): - def __init__(self, value=None): - super(Credential.UsernamePasswordCredential.Password, - self).__init__( - value, Tags.PASSWORD) + Args: + username (string): The username identifying the credential. + Optional, defaults to None. Required for encoding and decoding. + password (string): The password associated with the username. + Optional, defaults to None. + """ + super(UsernamePasswordCredential, self).__init__( + tag=Tags.CREDENTIAL_VALUE + ) - def __init__(self, username=None, password=None): - super(Credential.UsernamePasswordCredential, self).__init__( - tag=Tags.CREDENTIAL_VALUE) - self.username = username - self.password = password - self.validate() + self._username = None + self._password = None - def read(self, istream): - super(Credential.UsernamePasswordCredential, self).read(istream) - tstream = BytearrayStream(istream.read(self.length)) + self.username = username + self.password = password - # Read the username of the credential - self.username = self.Username() - self.username.read(tstream) + @property + def username(self): + if self._username: + return self._username.value + else: + return None - # Read the password if it is next - if self.is_tag_next(Tags.PASSWORD, tstream): - self.password = self.Password() - self.password.read(tstream) + @username.setter + def username(self, value): + if value is None: + self._username = None + elif isinstance(value, six.string_types): + self._username = primitives.TextString( + value=value, + tag=enums.Tags.USERNAME + ) + else: + raise TypeError("Username must be a string.") - self.is_oversized(tstream) - self.validate() + @property + def password(self): + if self._password: + return self._password.value + else: + return None - def write(self, ostream): - tstream = BytearrayStream() + @password.setter + def password(self, value): + if value is None: + self._password = None + elif isinstance(value, six.string_types): + self._password = primitives.TextString( + value=value, + tag=enums.Tags.PASSWORD + ) + else: + raise TypeError("Password must be a string.") - self.username.write(tstream) - if self.password is not None: - self.password.write(tstream) + def read(self, input_stream): + """ + Read the data encoding the UsernamePasswordCredential struct and + decode it into its constituent parts. - # Write the length and value of the credential - self.length = tstream.length() - super(Credential.UsernamePasswordCredential, self).write(ostream) - ostream.write(tstream.buffer) + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. - def validate(self): - pass + Raises: + ValueError: Raised if the username is missing from the encoding. + """ + super(UsernamePasswordCredential, self).read(input_stream) + local_stream = BytearrayStream(input_stream.read(self.length)) - class DeviceCredential(Struct): + if self.is_tag_next(enums.Tags.USERNAME, local_stream): + self._username = primitives.TextString( + tag=enums.Tags.USERNAME + ) + self._username.read(local_stream) + else: + raise ValueError( + "Username/password credential encoding missing the username." + ) - class DeviceSerialNumber(TextString): + if self.is_tag_next(enums.Tags.PASSWORD, local_stream): + self._password = primitives.TextString( + tag=enums.Tags.PASSWORD + ) + self._password.read(local_stream) - def __init__(self, value=None): - super(Credential.DeviceCredential.DeviceSerialNumber, self).\ - __init__(value, Tags.DEVICE_SERIAL_NUMBER) + self.is_oversized(local_stream) - class Password(TextString): + def write(self, output_stream): + """ + Write the data encoding the UsernamePasswordCredential struct to a + stream. - def __init__(self, value=None): - super(Credential.DeviceCredential.Password, self).\ - __init__(value, Tags.PASSWORD) + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. - class DeviceIdentifier(TextString): + Raises: + ValueError: Raised if the username is not defined. + """ + local_stream = BytearrayStream() - def __init__(self, value=None): - super(Credential.DeviceCredential.DeviceIdentifier, self).\ - __init__(value, Tags.DEVICE_IDENTIFIER) + if self._username: + self._username.write(local_stream) + else: + raise ValueError( + "Username/password credential struct missing the username." + ) - class NetworkIdentifier(TextString): + if self._password: + self._password.write(local_stream) - def __init__(self, value=None): - super(Credential.DeviceCredential.NetworkIdentifier, self).\ - __init__(value, Tags.NETWORK_IDENTIFIER) + self.length = local_stream.length() + super(UsernamePasswordCredential, self).write(output_stream) + output_stream.write(local_stream.buffer) - class MachineIdentifier(TextString): + def __eq__(self, other): + if isinstance(other, UsernamePasswordCredential): + if self.username != other.username: + return False + elif self.password != other.password: + return False + else: + return True + else: + return NotImplemented - def __init__(self, value=None): - super(Credential.DeviceCredential.MachineIdentifier, self).\ - __init__(value, Tags.MACHINE_IDENTIFIER) + def __ne__(self, other): + if isinstance(other, UsernamePasswordCredential): + return not (self == other) + else: + return NotImplemented - class MediaIdentifier(TextString): + def __repr__(self): + args = ", ".join([ + "username='{}'".format(self.username), + "password='{}'".format(self.password) + ]) + return "UsernamePasswordCredential({})".format(args) - def __init__(self, value=None): - super(Credential.DeviceCredential.MediaIdentifier, self).\ - __init__(value, Tags.MEDIA_IDENTIFIER) + def __str__(self): + return str({ + "username": self.username, + "password": self.password + }) - def __init__(self, - device_serial_number=None, - password=None, - device_identifier=None, - network_identifier=None, - machine_identifier=None, - media_identifier=None): - super(Credential.DeviceCredential, self).__init__( - tag=Tags.CREDENTIAL_VALUE) - self.device_serial_number = device_serial_number - self.password = password - self.device_identifier = device_identifier - self.network_identifier = network_identifier - self.machine_identifier = machine_identifier - self.media_identifier = media_identifier - def read(self, istream): - super(Credential.DeviceCredential, self).read(istream) - tstream = BytearrayStream(istream.read(self.length)) +class DeviceCredential(CredentialValue): + """ + A struct representing a DeviceCredential object. - # Read the password if it is next - if self.is_tag_next(Tags.DEVICE_SERIAL_NUMBER, tstream): - self.device_serial_number = self.DeviceSerialNumber() - self.device_serial_number.read(tstream) + Attributes: + device_serial_number: The device serial number for the credential. + password: The password associated with the credential. + device_identifier: The device identifier for the credential. + network_identifier: The network identifier for the credential. + machine_identifier: The machine identifier for the credential. + media_identifier: The media identifier for the credential. + """ - # Read the password if it is next - if self.is_tag_next(Tags.PASSWORD, tstream): - self.password = self.Password() - self.password.read(tstream) + def __init__(self, + device_serial_number=None, + password=None, + device_identifier=None, + network_identifier=None, + machine_identifier=None, + media_identifier=None): + """ + Construct a DeviceCredential struct. - # Read the password if it is next - if self.is_tag_next(Tags.DEVICE_IDENTIFIER, tstream): - self.device_identifier = self.DeviceIdentifier() - self.device_identifier.read(tstream) + Args: + device_serial_number (string): The device serial number for the + credential. Optional, defaults to None. + password (string): The password associated with the credential. + Optional, defaults to None. + device_identifier (string): The device identifier for the + credential. Optional, defaults to None. + network_identifier (string): The network identifier for the + credential. Optional, defaults to None. + machine_identifier (string): The machine identifier for the + credential. Optional, defaults to None. + media_identifier (string): The media identifier for the + credential. Optional, defaults to None. + """ + super(DeviceCredential, self).__init__(tag=Tags.CREDENTIAL_VALUE) - # Read the password if it is next - if self.is_tag_next(Tags.NETWORK_IDENTIFIER, tstream): - self.network_identifier = self.NetworkIdentifier() - self.network_identifier.read(tstream) + self._device_serial_number = None + self._password = None + self._device_identifier = None + self._network_identifier = None + self._machine_identifier = None + self._media_identifier = None - # Read the password if it is next - if self.is_tag_next(Tags.MACHINE_IDENTIFIER, tstream): - self.machine_identifier = self.MachineIdentifier() - self.machine_identifier.read(tstream) + self.device_serial_number = device_serial_number + self.password = password + self.device_identifier = device_identifier + self.network_identifier = network_identifier + self.machine_identifier = machine_identifier + self.media_identifier = media_identifier - # Read the password if it is next - if self.is_tag_next(Tags.MEDIA_IDENTIFIER, tstream): - self.media_identifier = self.MediaIdentifier() - self.media_identifier.read(tstream) + @property + def device_serial_number(self): + if self._device_serial_number: + return self._device_serial_number.value + else: + return None - self.is_oversized(tstream) - self.validate() + @device_serial_number.setter + def device_serial_number(self, value): + if value is None: + self._device_serial_number = None + elif isinstance(value, six.string_types): + self._device_serial_number = primitives.TextString( + value=value, + tag=enums.Tags.DEVICE_SERIAL_NUMBER + ) + else: + raise TypeError("Device serial number must be a string.") - def write(self, ostream): - tstream = BytearrayStream() + @property + def password(self): + if self._password: + return self._password.value + else: + return None - if self.device_serial_number is not None: - self.device_serial_number.write(tstream) - if self.password is not None: - self.password.write(tstream) - if self.device_identifier is not None: - self.device_identifier.write(tstream) - if self.network_identifier is not None: - self.network_identifier.write(tstream) - if self.machine_identifier is not None: - self.machine_identifier.write(tstream) - if self.media_identifier is not None: - self.media_identifier.write(tstream) + @password.setter + def password(self, value): + if value is None: + self._password = None + elif isinstance(value, six.string_types): + self._password = primitives.TextString( + value=value, + tag=enums.Tags.PASSWORD + ) + else: + raise TypeError("Password must be a string.") - # Write the length and value of the credential - self.length = tstream.length() - super(Credential.DeviceCredential, self).write(ostream) - ostream.write(tstream.buffer) + @property + def device_identifier(self): + if self._device_identifier: + return self._device_identifier.value + else: + return None - def validate(self): - pass + @device_identifier.setter + def device_identifier(self, value): + if value is None: + self._device_identifier = None + elif isinstance(value, six.string_types): + self._device_identifier = primitives.TextString( + value=value, + tag=enums.Tags.DEVICE_IDENTIFIER + ) + else: + raise TypeError("Device identifier must be a string.") + + @property + def network_identifier(self): + if self._network_identifier: + return self._network_identifier.value + else: + return None + + @network_identifier.setter + def network_identifier(self, value): + if value is None: + self._network_identifier = None + elif isinstance(value, six.string_types): + self._network_identifier = primitives.TextString( + value=value, + tag=enums.Tags.NETWORK_IDENTIFIER + ) + else: + raise TypeError("Network identifier must be a string.") + + @property + def machine_identifier(self): + if self._machine_identifier: + return self._machine_identifier.value + else: + return None + + @machine_identifier.setter + def machine_identifier(self, value): + if value is None: + self._machine_identifier = None + elif isinstance(value, six.string_types): + self._machine_identifier = primitives.TextString( + value=value, + tag=enums.Tags.MACHINE_IDENTIFIER + ) + else: + raise TypeError("Machine identifier must be a string.") + + @property + def media_identifier(self): + if self._media_identifier: + return self._media_identifier.value + else: + return None + + @media_identifier.setter + def media_identifier(self, value): + if value is None: + self._media_identifier = None + elif isinstance(value, six.string_types): + self._media_identifier = primitives.TextString( + value=value, + tag=enums.Tags.MEDIA_IDENTIFIER + ) + else: + raise TypeError("Media identifier must be a string.") + + def read(self, input_stream): + """ + Read the data encoding the DeviceCredential struct and decode it into + its constituent parts. + + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object.. + """ + super(DeviceCredential, self).read(input_stream) + local_stream = BytearrayStream(input_stream.read(self.length)) + + if self.is_tag_next(enums.Tags.DEVICE_SERIAL_NUMBER, local_stream): + self._device_serial_number = primitives.TextString( + tag=enums.Tags.DEVICE_SERIAL_NUMBER + ) + self._device_serial_number.read(local_stream) + + if self.is_tag_next(enums.Tags.PASSWORD, local_stream): + self._password = primitives.TextString( + tag=enums.Tags.PASSWORD + ) + self._password.read(local_stream) + + if self.is_tag_next(enums.Tags.DEVICE_IDENTIFIER, local_stream): + self._device_identifier = primitives.TextString( + tag=enums.Tags.DEVICE_IDENTIFIER + ) + self._device_identifier.read(local_stream) + + if self.is_tag_next(enums.Tags.NETWORK_IDENTIFIER, local_stream): + self._network_identifier = primitives.TextString( + tag=enums.Tags.NETWORK_IDENTIFIER + ) + self._network_identifier.read(local_stream) + + if self.is_tag_next(enums.Tags.MACHINE_IDENTIFIER, local_stream): + self._machine_identifier = primitives.TextString( + tag=enums.Tags.MACHINE_IDENTIFIER + ) + self._machine_identifier.read(local_stream) + + if self.is_tag_next(enums.Tags.MEDIA_IDENTIFIER, local_stream): + self._media_identifier = primitives.TextString( + tag=enums.Tags.MEDIA_IDENTIFIER + ) + self._media_identifier.read(local_stream) + + self.is_oversized(local_stream) + + def write(self, output_stream): + """ + Write the data encoding the DeviceCredential struct to a stream. + + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. + """ + local_stream = BytearrayStream() + + if self._device_serial_number is not None: + self._device_serial_number.write(local_stream) + if self._password is not None: + self._password.write(local_stream) + if self._device_identifier is not None: + self._device_identifier.write(local_stream) + if self._network_identifier is not None: + self._network_identifier.write(local_stream) + if self._machine_identifier is not None: + self._machine_identifier.write(local_stream) + if self._media_identifier is not None: + self._media_identifier.write(local_stream) + + self.length = local_stream.length() + super(DeviceCredential, self).write(output_stream) + output_stream.write(local_stream.buffer) + + def __eq__(self, other): + if isinstance(other, DeviceCredential): + if self.device_serial_number != other.device_serial_number: + return False + elif self.password != other.password: + return False + elif self.device_identifier != other.device_identifier: + return False + elif self.network_identifier != other.network_identifier: + return False + elif self.machine_identifier != other.machine_identifier: + return False + elif self.media_identifier != other.media_identifier: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, DeviceCredential): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "device_serial_number='{}'".format(self.device_serial_number), + "password='{}'".format(self.password), + "device_identifier='{}'".format(self.device_identifier), + "network_identifier='{}'".format(self.network_identifier), + "machine_identifier='{}'".format(self.machine_identifier), + "media_identifier='{}'".format(self.media_identifier), + ]) + return "DeviceCredential({})".format(args) + + def __str__(self): + return str({ + "device_serial_number": self.device_serial_number, + "password": self.password, + "device_identifier": self.device_identifier, + "network_identifier": self.network_identifier, + "machine_identifier": self.machine_identifier, + "media_identifier": self.media_identifier + }) + + +class Credential(primitives.Struct): + """ + A struct representing a Credential object. + + Attributes: + credential_type: The credential type, a CredentialType enumeration. + credential_value: The credential value, a CredentialValue instance. + """ def __init__(self, credential_type=None, credential_value=None): + """ + Construct a Credential struct. + + Args: + credential_type (CredentialType): An enumeration value that + specifies the type of the credential struct. Optional, + defaults to None. Required for encoding and decoding. + credential_value (CredentialValue): The credential value + corresponding to the credential type. Optional, defaults to + None. Required for encoding and decoding. + """ super(Credential, self).__init__(tag=Tags.CREDENTIAL) + + self._credential_type = None + self._credential_value = None + self.credential_type = credential_type self.credential_value = credential_value - def read(self, istream): - super(Credential, self).read(istream) - tstream = BytearrayStream(istream.read(self.length)) - - # Read the type of the credential - self.credential_type = self.CredentialType() - self.credential_type.read(tstream) - - # Use the type to determine what credential value to read - if self.credential_type.value is CredentialType.USERNAME_AND_PASSWORD: - self.credential_value = self.UsernamePasswordCredential() - elif self.credential_type.value is CredentialType.DEVICE: - self.credential_value = self.DeviceCredential() + @property + def credential_type(self): + if self._credential_type: + return self._credential_type.value else: - # TODO (peter-hamilton) Use more descriptive error here - raise NotImplementedError() - self.credential_value.read(tstream) + return None - self.is_oversized(tstream) - self.validate() + @credential_type.setter + def credential_type(self, value): + if value is None: + self._credential_type = None + elif isinstance(value, enums.CredentialType): + self._credential_type = Enumeration( + enums.CredentialType, + value=value, + tag=Tags.CREDENTIAL_TYPE + ) + else: + raise TypeError( + "Credential type must be a CredentialType enumeration." + ) - def write(self, ostream): - tstream = BytearrayStream() + @property + def credential_value(self): + return self._credential_value - self.credential_type.write(tstream) - self.credential_value.write(tstream) + @credential_value.setter + def credential_value(self, value): + if value is None: + self._credential_value = None + elif isinstance(value, CredentialValue): + self._credential_value = value + else: + raise TypeError( + "Credential value must be a CredentialValue struct." + ) - # Write the length and value of the credential - self.length = tstream.length() - super(Credential, self).write(ostream) - ostream.write(tstream.buffer) + def read(self, input_stream): + """ + Read the data encoding the Credential struct and decode it into its + constituent parts. - def validate(self): - pass + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if either the credential type or value are + missing from the encoding. + """ + super(Credential, self).read(input_stream) + local_stream = BytearrayStream(input_stream.read(self.length)) + + if self.is_tag_next(enums.Tags.CREDENTIAL_TYPE, local_stream): + self._credential_type = primitives.Enumeration( + enum=enums.CredentialType, + tag=enums.Tags.CREDENTIAL_TYPE + ) + self._credential_type.read(local_stream) + else: + raise ValueError( + "Credential encoding missing the credential type." + ) + + if self.is_tag_next(enums.Tags.CREDENTIAL_VALUE, local_stream): + if self.credential_type == \ + enums.CredentialType.USERNAME_AND_PASSWORD: + self._credential_value = UsernamePasswordCredential() + elif self.credential_type == enums.CredentialType.DEVICE: + self._credential_value = DeviceCredential() + else: + # TODO (peter-hamilton) Add case for Attestation. + raise ValueError( + "Credential encoding includes unrecognized credential " + "type." + ) + self._credential_value.read(local_stream) + else: + raise ValueError( + "Credential encoding missing the credential value." + ) + + self.is_oversized(local_stream) + + def write(self, output_stream): + """ + Write the data encoding the Credential struct to a stream. + + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if either the credential type or value are not + defined. + """ + local_stream = BytearrayStream() + + if self._credential_type: + self._credential_type.write(local_stream) + else: + raise ValueError( + "Credential struct missing the credential type." + ) + + if self._credential_value: + self._credential_value.write(local_stream) + else: + raise ValueError( + "Credential struct missing the credential value." + ) + + self.length = local_stream.length() + super(Credential, self).write(output_stream) + output_stream.write(local_stream.buffer) + + def __eq__(self, other): + if isinstance(other, Credential): + if self.credential_type != other.credential_type: + return False + elif self.credential_value != other.credential_value: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, Credential): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "credential_type={}".format(self.credential_type), + "credential_value={}".format(repr(self.credential_value)) + ]) + return "Credential({})".format(args) + + def __str__(self): + return str({ + "credential_type": self.credential_type, + "credential_value": str(self.credential_value) + }) -# 2.1.3 class KeyBlock(Struct): class KeyCompressionType(Enumeration): diff --git a/kmip/tests/unit/core/objects/test_credentials.py b/kmip/tests/unit/core/objects/test_credentials.py new file mode 100644 index 0000000..09561e0 --- /dev/null +++ b/kmip/tests/unit/core/objects/test_credentials.py @@ -0,0 +1,1991 @@ +# Copyright (c) 2018 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools + +from kmip import enums +from kmip.core import objects +from kmip.core import utils + + +class TestUsernamePasswordCredential(testtools.TestCase): + """ + Test suite for the UsernamePasswordCredential struct. + """ + + def setUp(self): + super(TestUsernamePasswordCredential, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, Section 11.1. + # + # This encoding matches the following set of values: + # UsernamePasswordCredential + # Username - Fred + # Password - password1 + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x25\x01\x00\x00\x00\x28' + b'\x42\x00\x99\x07\x00\x00\x00\x04' + b'\x46\x72\x65\x64\x00\x00\x00\x00' + b'\x42\x00\xA1\x07\x00\x00\x00\x09' + b'\x70\x61\x73\x73\x77\x6F\x72\x64\x31\x00\x00\x00\x00\x00\x00\x00' + ) + + self.encoding_missing_username = utils.BytearrayStream( + b'\x42\x00\x25\x01\x00\x00\x00\x18' + b'\x42\x00\xA1\x07\x00\x00\x00\x09' + b'\x70\x61\x73\x73\x77\x6F\x72\x64\x31\x00\x00\x00\x00\x00\x00\x00' + ) + + self.encoding_missing_password = utils.BytearrayStream( + b'\x42\x00\x25\x01\x00\x00\x00\x10' + b'\x42\x00\x99\x07\x00\x00\x00\x04' + b'\x46\x72\x65\x64\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestUsernamePasswordCredential, self).tearDown() + + def test_init(self): + """ + Test that a UsernamePasswordCredential struct can be constructed + without arguments. + """ + credential = objects.UsernamePasswordCredential() + + self.assertEqual(None, credential.username) + self.assertEqual(None, credential.password) + + def test_init_with_args(self): + """ + Test that a UsernamePasswordCredential struct can be constructed with + arguments. + """ + credential = objects.UsernamePasswordCredential( + username="John", + password="abc123" + ) + + self.assertEqual("John", credential.username) + self.assertEqual("abc123", credential.password) + + def test_invalid_username(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the username of a UsernamePasswordCredential struct. + """ + kwargs = {'username': 0} + self.assertRaisesRegexp( + TypeError, + "Username must be a string.", + objects.UsernamePasswordCredential, + **kwargs + ) + + credential = objects.UsernamePasswordCredential() + args = (credential, "username", 0) + self.assertRaisesRegexp( + TypeError, + "Username must be a string.", + setattr, + *args + ) + + def test_invalid_password(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the password of a UsernamePasswordCredential struct. + """ + kwargs = {'password': 0} + self.assertRaisesRegexp( + TypeError, + "Password must be a string.", + objects.UsernamePasswordCredential, + **kwargs + ) + + credential = objects.UsernamePasswordCredential() + args = (credential, "password", 0) + self.assertRaisesRegexp( + TypeError, + "Password must be a string.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a UsernamePasswordCredential struct can be read from a data + stream. + """ + credential = objects.UsernamePasswordCredential() + + self.assertEqual(None, credential.username) + self.assertEqual(None, credential.password) + + credential.read(self.full_encoding) + + self.assertEqual("Fred", credential.username) + self.assertEqual("password1", credential.password) + + def test_read_missing_username(self): + """ + Test that a ValueError gets raised when attempting to read a + UsernamePasswordCredential struct from a data stream missing the + username data. + """ + credential = objects.UsernamePasswordCredential() + + self.assertEqual(None, credential.username) + self.assertEqual(None, credential.password) + + args = (self.encoding_missing_username, ) + self.assertRaisesRegexp( + ValueError, + "Username/password credential encoding missing the username.", + credential.read, + *args + ) + + def test_read_missing_password(self): + """ + Test that a UsernamePasswordCredential struct can be read from a data + stream missing the password data. + """ + credential = objects.UsernamePasswordCredential() + + self.assertEqual(None, credential.username) + self.assertEqual(None, credential.password) + + credential.read(self.encoding_missing_password) + + self.assertEqual("Fred", credential.username) + self.assertEqual(None, credential.password) + + def test_write(self): + """ + Test that a UsernamePasswordCredential struct can be written to a + data stream. + """ + credential = objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + stream = utils.BytearrayStream() + + credential.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_missing_username(self): + """ + Test that a ValueError gets raised when attempting to write a + UsernamePasswordCredential struct missing username data to a data + stream. + """ + credential = objects.UsernamePasswordCredential( + password="password1" + ) + stream = utils.BytearrayStream() + + args = (stream, ) + self.assertRaisesRegexp( + ValueError, + "Username/password credential struct missing the username.", + credential.write, + *args + ) + + def test_write_missing_password(self): + """ + Test that a UsernamePasswordCredential struct missing password data + can be written to a data stream. + """ + credential = objects.UsernamePasswordCredential( + username="Fred" + ) + stream = utils.BytearrayStream() + + credential.write(stream) + + self.assertEqual(len(self.encoding_missing_password), len(stream)) + self.assertEqual(str(self.encoding_missing_password), str(stream)) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + UsernamePasswordCredential structs with the same data. + """ + a = objects.UsernamePasswordCredential() + b = objects.UsernamePasswordCredential() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + b = objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_username(self): + """ + Test that the equality operator returns False when comparing two + UsernamePasswordCredential structs with different usernames. + """ + a = objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + b = objects.UsernamePasswordCredential( + username="Wilma", + password="password1" + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_password(self): + """ + Test that the equality operator returns False when comparing two + UsernamePasswordCredential structs with different passwords. + """ + a = objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + b = objects.UsernamePasswordCredential( + username="Fred", + password="1password" + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + UsernamePasswordCredential structs with different types. + """ + a = objects.UsernamePasswordCredential() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + UsernamePasswordCredential structs with the same data. + """ + a = objects.UsernamePasswordCredential() + b = objects.UsernamePasswordCredential() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + b = objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_username(self): + """ + Test that the inequality operator returns True when comparing two + UsernamePasswordCredential structs with different usernames. + """ + a = objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + b = objects.UsernamePasswordCredential( + username="Wilma", + password="password1" + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_password(self): + """ + Test that the inequality operator returns True when comparing two + UsernamePasswordCredential structs with different passwords. + """ + a = objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + b = objects.UsernamePasswordCredential( + username="Fred", + password="1password" + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + UsernamePasswordCredential structs with different types. + """ + a = objects.UsernamePasswordCredential() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can be applied to a UsernamePasswordCredential struct. + """ + credential = objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + expected = ( + "UsernamePasswordCredential(" + "username='Fred', " + "password='password1')" + ) + observed = repr(credential) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to a UsernamePasswordCredential struct. + """ + credential = objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + expected = str({"username": "Fred", "password": "password1"}) + observed = str(credential) + + self.assertEqual(expected, observed) + + +class TestDeviceCredential(testtools.TestCase): + """ + Test suite for the DeviceCredential struct. + """ + + def setUp(self): + super(TestDeviceCredential, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, Section 11.2. + # + # This encoding matches the following set of values: + # DeviceCredential + # Device Serial Number - serNum123456 + # Password - secret + # Device Identifier - devID2233 + # Network Identifier - netID9000 + # Machine Identifier - machineID1 + # Media Identifier - mediaID313 + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x25\x01\x00\x00\x00\x88' + b'\x42\x00\xB0\x07\x00\x00\x00\x0C' + b'\x73\x65\x72\x4E\x75\x6D\x31\x32\x33\x34\x35\x36\x00\x00\x00\x00' + b'\x42\x00\xA1\x07\x00\x00\x00\x06' + b'\x73\x65\x63\x72\x65\x74\x00\x00' + b'\x42\x00\xA2\x07\x00\x00\x00\x09' + b'\x64\x65\x76\x49\x44\x32\x32\x33\x33\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xAB\x07\x00\x00\x00\x09' + b'\x6E\x65\x74\x49\x44\x39\x30\x30\x30\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xA9\x07\x00\x00\x00\x0A' + b'\x6D\x61\x63\x68\x69\x6E\x65\x49\x44\x31\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xAA\x07\x00\x00\x00\x0A' + b'\x6D\x65\x64\x69\x61\x49\x44\x33\x31\x33\x00\x00\x00\x00\x00\x00' + ) + self.encoding_missing_device_serial_number = utils.BytearrayStream( + b'\x42\x00\x25\x01\x00\x00\x00\x70' + b'\x42\x00\xA1\x07\x00\x00\x00\x06' + b'\x73\x65\x63\x72\x65\x74\x00\x00' + b'\x42\x00\xA2\x07\x00\x00\x00\x09' + b'\x64\x65\x76\x49\x44\x32\x32\x33\x33\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xAB\x07\x00\x00\x00\x09' + b'\x6E\x65\x74\x49\x44\x39\x30\x30\x30\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xA9\x07\x00\x00\x00\x0A' + b'\x6D\x61\x63\x68\x69\x6E\x65\x49\x44\x31\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xAA\x07\x00\x00\x00\x0A' + b'\x6D\x65\x64\x69\x61\x49\x44\x33\x31\x33\x00\x00\x00\x00\x00\x00' + ) + self.encoding_missing_password = utils.BytearrayStream( + b'\x42\x00\x25\x01\x00\x00\x00\x78' + b'\x42\x00\xB0\x07\x00\x00\x00\x0C' + b'\x73\x65\x72\x4E\x75\x6D\x31\x32\x33\x34\x35\x36\x00\x00\x00\x00' + b'\x42\x00\xA2\x07\x00\x00\x00\x09' + b'\x64\x65\x76\x49\x44\x32\x32\x33\x33\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xAB\x07\x00\x00\x00\x09' + b'\x6E\x65\x74\x49\x44\x39\x30\x30\x30\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xA9\x07\x00\x00\x00\x0A' + b'\x6D\x61\x63\x68\x69\x6E\x65\x49\x44\x31\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xAA\x07\x00\x00\x00\x0A' + b'\x6D\x65\x64\x69\x61\x49\x44\x33\x31\x33\x00\x00\x00\x00\x00\x00' + ) + self.encoding_missing_device_identifier = utils.BytearrayStream( + b'\x42\x00\x25\x01\x00\x00\x00\x70' + b'\x42\x00\xB0\x07\x00\x00\x00\x0C' + b'\x73\x65\x72\x4E\x75\x6D\x31\x32\x33\x34\x35\x36\x00\x00\x00\x00' + b'\x42\x00\xA1\x07\x00\x00\x00\x06' + b'\x73\x65\x63\x72\x65\x74\x00\x00' + b'\x42\x00\xAB\x07\x00\x00\x00\x09' + b'\x6E\x65\x74\x49\x44\x39\x30\x30\x30\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xA9\x07\x00\x00\x00\x0A' + b'\x6D\x61\x63\x68\x69\x6E\x65\x49\x44\x31\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xAA\x07\x00\x00\x00\x0A' + b'\x6D\x65\x64\x69\x61\x49\x44\x33\x31\x33\x00\x00\x00\x00\x00\x00' + ) + self.encoding_missing_network_identifier = utils.BytearrayStream( + b'\x42\x00\x25\x01\x00\x00\x00\x70' + b'\x42\x00\xB0\x07\x00\x00\x00\x0C' + b'\x73\x65\x72\x4E\x75\x6D\x31\x32\x33\x34\x35\x36\x00\x00\x00\x00' + b'\x42\x00\xA1\x07\x00\x00\x00\x06' + b'\x73\x65\x63\x72\x65\x74\x00\x00' + b'\x42\x00\xA2\x07\x00\x00\x00\x09' + b'\x64\x65\x76\x49\x44\x32\x32\x33\x33\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xA9\x07\x00\x00\x00\x0A' + b'\x6D\x61\x63\x68\x69\x6E\x65\x49\x44\x31\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xAA\x07\x00\x00\x00\x0A' + b'\x6D\x65\x64\x69\x61\x49\x44\x33\x31\x33\x00\x00\x00\x00\x00\x00' + ) + self.encoding_missing_machine_identifier = utils.BytearrayStream( + b'\x42\x00\x25\x01\x00\x00\x00\x70' + b'\x42\x00\xB0\x07\x00\x00\x00\x0C' + b'\x73\x65\x72\x4E\x75\x6D\x31\x32\x33\x34\x35\x36\x00\x00\x00\x00' + b'\x42\x00\xA1\x07\x00\x00\x00\x06' + b'\x73\x65\x63\x72\x65\x74\x00\x00' + b'\x42\x00\xA2\x07\x00\x00\x00\x09' + b'\x64\x65\x76\x49\x44\x32\x32\x33\x33\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xAB\x07\x00\x00\x00\x09' + b'\x6E\x65\x74\x49\x44\x39\x30\x30\x30\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xAA\x07\x00\x00\x00\x0A' + b'\x6D\x65\x64\x69\x61\x49\x44\x33\x31\x33\x00\x00\x00\x00\x00\x00' + ) + self.encoding_missing_media_identifier = utils.BytearrayStream( + b'\x42\x00\x25\x01\x00\x00\x00\x70' + b'\x42\x00\xB0\x07\x00\x00\x00\x0C' + b'\x73\x65\x72\x4E\x75\x6D\x31\x32\x33\x34\x35\x36\x00\x00\x00\x00' + b'\x42\x00\xA1\x07\x00\x00\x00\x06' + b'\x73\x65\x63\x72\x65\x74\x00\x00' + b'\x42\x00\xA2\x07\x00\x00\x00\x09' + b'\x64\x65\x76\x49\x44\x32\x32\x33\x33\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xAB\x07\x00\x00\x00\x09' + b'\x6E\x65\x74\x49\x44\x39\x30\x30\x30\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xA9\x07\x00\x00\x00\x0A' + b'\x6D\x61\x63\x68\x69\x6E\x65\x49\x44\x31\x00\x00\x00\x00\x00\x00' + ) + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x25\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestDeviceCredential, self).tearDown() + + def test_init(self): + """ + Test that a DeviceCredential struct can be constructed without + arguments. + """ + credential = objects.DeviceCredential() + + self.assertEqual(None, credential.device_serial_number) + self.assertEqual(None, credential.password) + self.assertEqual(None, credential.device_identifier) + self.assertEqual(None, credential.network_identifier) + self.assertEqual(None, credential.machine_identifier) + self.assertEqual(None, credential.media_identifier) + + def test_init_with_args(self): + """ + Test that a DeviceCredential struct can be constructed with arguments. + """ + credential = objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + + self.assertEqual("serNum123456", credential.device_serial_number) + self.assertEqual("secret", credential.password) + self.assertEqual("devID2233", credential.device_identifier) + self.assertEqual("netID9000", credential.network_identifier) + self.assertEqual("machineID1", credential.machine_identifier) + self.assertEqual("mediaID313", credential.media_identifier) + + def test_invalid_device_serial_number(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the device serial number of a DeviceCredential struct. + """ + kwargs = {'device_serial_number': 0} + self.assertRaisesRegexp( + TypeError, + "Device serial number must be a string.", + objects.DeviceCredential, + **kwargs + ) + + credential = objects.DeviceCredential() + args = (credential, "device_serial_number", 0) + self.assertRaisesRegexp( + TypeError, + "Device serial number must be a string.", + setattr, + *args + ) + + def test_invalid_password(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the password of a DeviceCredential struct. + """ + kwargs = {'password': 0} + self.assertRaisesRegexp( + TypeError, + "Password must be a string.", + objects.DeviceCredential, + **kwargs + ) + + credential = objects.DeviceCredential() + args = (credential, "password", 0) + self.assertRaisesRegexp( + TypeError, + "Password must be a string.", + setattr, + *args + ) + + def test_invalid_device_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the device identifier of a DeviceCredential struct. + """ + kwargs = {'device_identifier': 0} + self.assertRaisesRegexp( + TypeError, + "Device identifier must be a string.", + objects.DeviceCredential, + **kwargs + ) + + credential = objects.DeviceCredential() + args = (credential, "device_identifier", 0) + self.assertRaisesRegexp( + TypeError, + "Device identifier must be a string.", + setattr, + *args + ) + + def test_invalid_network_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the network identifier of a DeviceCredential struct. + """ + kwargs = {'network_identifier': 0} + self.assertRaisesRegexp( + TypeError, + "Network identifier must be a string.", + objects.DeviceCredential, + **kwargs + ) + + credential = objects.DeviceCredential() + args = (credential, "network_identifier", 0) + self.assertRaisesRegexp( + TypeError, + "Network identifier must be a string.", + setattr, + *args + ) + + def test_invalid_machine_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the machine identifier of a DeviceCredential struct. + """ + kwargs = {'machine_identifier': 0} + self.assertRaisesRegexp( + TypeError, + "Machine identifier must be a string.", + objects.DeviceCredential, + **kwargs + ) + + credential = objects.DeviceCredential() + args = (credential, "machine_identifier", 0) + self.assertRaisesRegexp( + TypeError, + "Machine identifier must be a string.", + setattr, + *args + ) + + def test_invalid_media_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the media identifier of a DeviceCredential struct. + """ + kwargs = {'media_identifier': 0} + self.assertRaisesRegexp( + TypeError, + "Media identifier must be a string.", + objects.DeviceCredential, + **kwargs + ) + + credential = objects.DeviceCredential() + args = (credential, "media_identifier", 0) + self.assertRaisesRegexp( + TypeError, + "Media identifier must be a string.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a DeviceCredential struct can be read from a data stream. + """ + credential = objects.DeviceCredential() + + self.assertEqual(None, credential.device_serial_number) + self.assertEqual(None, credential.password) + self.assertEqual(None, credential.device_identifier) + self.assertEqual(None, credential.network_identifier) + self.assertEqual(None, credential.machine_identifier) + self.assertEqual(None, credential.media_identifier) + + credential.read(self.full_encoding) + + self.assertEqual("serNum123456", credential.device_serial_number) + self.assertEqual("secret", credential.password) + self.assertEqual("devID2233", credential.device_identifier) + self.assertEqual("netID9000", credential.network_identifier) + self.assertEqual("machineID1", credential.machine_identifier) + self.assertEqual("mediaID313", credential.media_identifier) + + def test_read_missing_device_serial_number(self): + """ + Test that a DeviceCredential struct can be read from a data stream + missing the device serial number data. + """ + credential = objects.DeviceCredential() + + self.assertEqual(None, credential.device_serial_number) + self.assertEqual(None, credential.password) + self.assertEqual(None, credential.device_identifier) + self.assertEqual(None, credential.network_identifier) + self.assertEqual(None, credential.machine_identifier) + self.assertEqual(None, credential.media_identifier) + + credential.read(self.encoding_missing_device_serial_number) + + self.assertEqual(None, credential.device_serial_number) + self.assertEqual("secret", credential.password) + self.assertEqual("devID2233", credential.device_identifier) + self.assertEqual("netID9000", credential.network_identifier) + self.assertEqual("machineID1", credential.machine_identifier) + self.assertEqual("mediaID313", credential.media_identifier) + + def test_read_missing_password(self): + """ + Test that a DeviceCredential struct can be read from a data stream + missing the password data. + """ + credential = objects.DeviceCredential() + + self.assertEqual(None, credential.device_serial_number) + self.assertEqual(None, credential.password) + self.assertEqual(None, credential.device_identifier) + self.assertEqual(None, credential.network_identifier) + self.assertEqual(None, credential.machine_identifier) + self.assertEqual(None, credential.media_identifier) + + credential.read(self.encoding_missing_password) + + self.assertEqual("serNum123456", credential.device_serial_number) + self.assertEqual(None, credential.password) + self.assertEqual("devID2233", credential.device_identifier) + self.assertEqual("netID9000", credential.network_identifier) + self.assertEqual("machineID1", credential.machine_identifier) + self.assertEqual("mediaID313", credential.media_identifier) + + def test_read_missing_device_identifier(self): + """ + Test that a DeviceCredential struct can be read from a data stream + missing the device identifier data. + """ + credential = objects.DeviceCredential() + + self.assertEqual(None, credential.device_serial_number) + self.assertEqual(None, credential.password) + self.assertEqual(None, credential.device_identifier) + self.assertEqual(None, credential.network_identifier) + self.assertEqual(None, credential.machine_identifier) + self.assertEqual(None, credential.media_identifier) + + credential.read(self.encoding_missing_device_identifier) + + self.assertEqual("serNum123456", credential.device_serial_number) + self.assertEqual("secret", credential.password) + self.assertEqual(None, credential.device_identifier) + self.assertEqual("netID9000", credential.network_identifier) + self.assertEqual("machineID1", credential.machine_identifier) + self.assertEqual("mediaID313", credential.media_identifier) + + def test_read_missing_network_identifier(self): + """ + Test that a DeviceCredential struct can be read from a data stream + missing the network identifier data. + """ + credential = objects.DeviceCredential() + + self.assertEqual(None, credential.device_serial_number) + self.assertEqual(None, credential.password) + self.assertEqual(None, credential.device_identifier) + self.assertEqual(None, credential.network_identifier) + self.assertEqual(None, credential.machine_identifier) + self.assertEqual(None, credential.media_identifier) + + credential.read(self.encoding_missing_network_identifier) + + self.assertEqual("serNum123456", credential.device_serial_number) + self.assertEqual("secret", credential.password) + self.assertEqual("devID2233", credential.device_identifier) + self.assertEqual(None, credential.network_identifier) + self.assertEqual("machineID1", credential.machine_identifier) + self.assertEqual("mediaID313", credential.media_identifier) + + def test_read_missing_machine_identifier(self): + """ + Test that a DeviceCredential struct can be read from a data stream + missing the machine identifier data. + """ + credential = objects.DeviceCredential() + + self.assertEqual(None, credential.device_serial_number) + self.assertEqual(None, credential.password) + self.assertEqual(None, credential.device_identifier) + self.assertEqual(None, credential.network_identifier) + self.assertEqual(None, credential.machine_identifier) + self.assertEqual(None, credential.media_identifier) + + credential.read(self.encoding_missing_machine_identifier) + + self.assertEqual("serNum123456", credential.device_serial_number) + self.assertEqual("secret", credential.password) + self.assertEqual("devID2233", credential.device_identifier) + self.assertEqual("netID9000", credential.network_identifier) + self.assertEqual(None, credential.machine_identifier) + self.assertEqual("mediaID313", credential.media_identifier) + + def test_read_missing_media_identifier(self): + """ + Test that a DeviceCredential struct can be read from a data stream + missing the media identifier data. + """ + credential = objects.DeviceCredential() + + self.assertEqual(None, credential.device_serial_number) + self.assertEqual(None, credential.password) + self.assertEqual(None, credential.device_identifier) + self.assertEqual(None, credential.network_identifier) + self.assertEqual(None, credential.machine_identifier) + self.assertEqual(None, credential.media_identifier) + + credential.read(self.encoding_missing_media_identifier) + + self.assertEqual("serNum123456", credential.device_serial_number) + self.assertEqual("secret", credential.password) + self.assertEqual("devID2233", credential.device_identifier) + self.assertEqual("netID9000", credential.network_identifier) + self.assertEqual("machineID1", credential.machine_identifier) + self.assertEqual(None, credential.media_identifier) + + def test_read_missing_everything(self): + """ + Test that a DeviceCredential struct can be read from a data stream + missing all data. + """ + credential = objects.DeviceCredential() + + self.assertEqual(None, credential.device_serial_number) + self.assertEqual(None, credential.password) + self.assertEqual(None, credential.device_identifier) + self.assertEqual(None, credential.network_identifier) + self.assertEqual(None, credential.machine_identifier) + self.assertEqual(None, credential.media_identifier) + + credential.read(self.empty_encoding) + + self.assertEqual(None, credential.device_serial_number) + self.assertEqual(None, credential.password) + self.assertEqual(None, credential.device_identifier) + self.assertEqual(None, credential.network_identifier) + self.assertEqual(None, credential.machine_identifier) + self.assertEqual(None, credential.media_identifier) + + def test_write(self): + """ + Test that a DeviceCredential struct can be written to a data stream. + """ + credential = objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + stream = utils.BytearrayStream() + + credential.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_missing_device_serial_number(self): + """ + Test that a DeviceCredential struct missing device serial number data + can be written to a data stream. + """ + credential = objects.DeviceCredential( + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + stream = utils.BytearrayStream() + + credential.write(stream) + + self.assertEqual( + len(self.encoding_missing_device_serial_number), + len(stream) + ) + self.assertEqual( + str(self.encoding_missing_device_serial_number), + str(stream) + ) + + def test_write_missing_password(self): + """ + Test that a DeviceCredential struct missing password data can be + written to a data stream. + """ + credential = objects.DeviceCredential( + device_serial_number="serNum123456", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + stream = utils.BytearrayStream() + + credential.write(stream) + + self.assertEqual(len(self.encoding_missing_password), len(stream)) + self.assertEqual(str(self.encoding_missing_password), str(stream)) + + def test_write_missing_device_identifier(self): + """ + Test that a DeviceCredential struct missing device identifier data can + be written to a data stream. + """ + credential = objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + stream = utils.BytearrayStream() + + credential.write(stream) + + self.assertEqual( + len(self.encoding_missing_device_identifier), + len(stream) + ) + self.assertEqual( + str(self.encoding_missing_device_identifier), + str(stream) + ) + + def test_write_missing_network_identifier(self): + """ + Test that a DeviceCredential struct missing network identifier data + can be written to a data stream. + """ + credential = objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + stream = utils.BytearrayStream() + + credential.write(stream) + + self.assertEqual( + len(self.encoding_missing_network_identifier), + len(stream) + ) + self.assertEqual( + str(self.encoding_missing_network_identifier), + str(stream) + ) + + def test_write_missing_machine_identifier(self): + """ + Test that a DeviceCredential struct missing machine identifier data + can be written to a data stream. + """ + credential = objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + media_identifier="mediaID313" + ) + stream = utils.BytearrayStream() + + credential.write(stream) + + self.assertEqual( + len(self.encoding_missing_machine_identifier), + len(stream) + ) + self.assertEqual( + str(self.encoding_missing_machine_identifier), + str(stream) + ) + + def test_write_missing_media_identifier(self): + """ + Test that a DeviceCredential struct missing media identifier data can + be written to a data stream. + """ + credential = objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1" + ) + stream = utils.BytearrayStream() + + credential.write(stream) + + self.assertEqual( + len(self.encoding_missing_media_identifier), + len(stream) + ) + self.assertEqual( + str(self.encoding_missing_media_identifier), + str(stream) + ) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + DeviceCredential structs with the same data. + """ + a = objects.DeviceCredential() + b = objects.DeviceCredential() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + b = objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_device_serial_number(self): + """ + Test that the equality operator returns False when comparing two + DeviceCredential structs with different device serial numbers. + """ + a = objects.DeviceCredential( + device_serial_number="serNum123456" + ) + b = objects.DeviceCredential( + device_serial_number="serNum654321" + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_password(self): + """ + Test that the equality operator returns False when comparing two + DeviceCredential structs with different passwords. + """ + a = objects.DeviceCredential( + password="secret" + ) + b = objects.DeviceCredential( + password="public" + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_device_identifier(self): + """ + Test that the equality operator returns False when comparing two + DeviceCredential structs with different device identifiers. + """ + a = objects.DeviceCredential( + device_identifier="devID2233" + ) + b = objects.DeviceCredential( + device_identifier="devID0011" + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_network_identifier(self): + """ + Test that the equality operator returns False when comparing two + DeviceCredential structs with different network identifiers. + """ + a = objects.DeviceCredential( + network_identifier="netID9000" + ) + b = objects.DeviceCredential( + network_identifier="netID0999" + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_machine_identifier(self): + """ + Test that the inequality operator returns True when comparing two + DeviceCredential structs with different machine identifiers. + """ + a = objects.DeviceCredential( + machine_identifier="machineID1" + ) + b = objects.DeviceCredential( + machine_identifier="machineID2" + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_media_identifier(self): + """ + Test that the equality operator returns False when comparing two + DeviceCredential structs with different media identifiers. + """ + a = objects.DeviceCredential( + media_identifier="mediaID313" + ) + b = objects.DeviceCredential( + media_identifier="mediaID828" + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + DeviceCredential structs with different types. + """ + a = objects.DeviceCredential() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + DeviceCredential structs with the same data. + """ + a = objects.DeviceCredential() + b = objects.DeviceCredential() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + b = objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_device_serial_number(self): + """ + Test that the inequality operator returns True when comparing two + DeviceCredential structs with different device serial numbers. + """ + a = objects.DeviceCredential( + device_serial_number="serNum123456" + ) + b = objects.DeviceCredential( + device_serial_number="serNum654321" + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_password(self): + """ + Test that the inequality operator returns True when comparing two + DeviceCredential structs with different passwords. + """ + a = objects.DeviceCredential( + password="secret" + ) + b = objects.DeviceCredential( + password="public" + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_device_identifier(self): + """ + Test that the inequality operator returns True when comparing two + DeviceCredential structs with different device identifiers. + """ + a = objects.DeviceCredential( + device_identifier="devID2233" + ) + b = objects.DeviceCredential( + device_identifier="devID0011" + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_network_identifier(self): + """ + Test that the inequality operator returns True when comparing two + DeviceCredential structs with different network identifiers. + """ + a = objects.DeviceCredential( + network_identifier="netID9000" + ) + b = objects.DeviceCredential( + network_identifier="netID0999" + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_machine_identifier(self): + """ + Test that the inequality operator returns True when comparing two + DeviceCredential structs with different machine identifiers. + """ + a = objects.DeviceCredential( + machine_identifier="machineID1" + ) + b = objects.DeviceCredential( + machine_identifier="machineID2" + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_media_identifier(self): + """ + Test that the inequality operator returns True when comparing two + DeviceCredential structs with different media identifiers. + """ + a = objects.DeviceCredential( + media_identifier="mediaID313" + ) + b = objects.DeviceCredential( + media_identifier="mediaID828" + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + DeviceCredential structs with different types. + """ + a = objects.DeviceCredential() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can be applied to a DeviceCredential struct. + """ + credential = objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + expected = ( + "DeviceCredential(" + "device_serial_number='serNum123456', " + "password='secret', " + "device_identifier='devID2233', " + "network_identifier='netID9000', " + "machine_identifier='machineID1', " + "media_identifier='mediaID313')" + ) + observed = repr(credential) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to a DeviceCredential struct. + """ + credential = objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + expected = str( + { + "device_serial_number": "serNum123456", + "password": "secret", + "device_identifier": "devID2233", + "network_identifier": "netID9000", + "machine_identifier": "machineID1", + "media_identifier": "mediaID313" + } + ) + observed = str(credential) + + self.assertEqual(expected, observed) + + +class TestCredential(testtools.TestCase): + """ + Test suite for the Credential struct. + """ + + def setUp(self): + super(TestCredential, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, Section 11.1. + # + # This encoding matches the following set of values: + # Credential + # CredentialType - Username and Password + # CredentialValue + # Username - Fred + # Password - password1 + self.username_password_encoding = utils.BytearrayStream( + b'\x42\x00\x23\x01\x00\x00\x00\x40' + b'\x42\x00\x24\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x25\x01\x00\x00\x00\x28' + b'\x42\x00\x99\x07\x00\x00\x00\x04' + b'\x46\x72\x65\x64\x00\x00\x00\x00' + b'\x42\x00\xA1\x07\x00\x00\x00\x09' + b'\x70\x61\x73\x73\x77\x6F\x72\x64\x31\x00\x00\x00\x00\x00\x00\x00' + ) + self.encoding_missing_credential_type = utils.BytearrayStream( + b'\x42\x00\x23\x01\x00\x00\x00\x30' + b'\x42\x00\x25\x01\x00\x00\x00\x28' + b'\x42\x00\x99\x07\x00\x00\x00\x04' + b'\x46\x72\x65\x64\x00\x00\x00\x00' + b'\x42\x00\xA1\x07\x00\x00\x00\x09' + b'\x70\x61\x73\x73\x77\x6F\x72\x64\x31\x00\x00\x00\x00\x00\x00\x00' + ) + self.encoding_missing_credential_value = utils.BytearrayStream( + b'\x42\x00\x23\x01\x00\x00\x00\x10' + b'\x42\x00\x24\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + ) + self.encoding_unknown_credential_type = utils.BytearrayStream( + b'\x42\x00\x23\x01\x00\x00\x00\x40' + b'\x42\x00\x24\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\x25\x01\x00\x00\x00\x28' + b'\x42\x00\x99\x07\x00\x00\x00\x04' + b'\x46\x72\x65\x64\x00\x00\x00\x00' + b'\x42\x00\xA1\x07\x00\x00\x00\x09' + b'\x70\x61\x73\x73\x77\x6F\x72\x64\x31\x00\x00\x00\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, Section 11.2. + # + # This encoding matches the following set of values: + # Credential + # CredentialType - Device + # CredentialValue + # Device Serial Number - serNum123456 + # Password - secret + # Device Identifier - devID2233 + # Network Identifier - netID9000 + # Machine Identifier - machineID1 + # Media Identifier - mediaID313 + self.device_encoding = utils.BytearrayStream( + b'\x42\x00\x23\x01\x00\x00\x00\xA0' + b'\x42\x00\x24\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x25\x01\x00\x00\x00\x88' + b'\x42\x00\xB0\x07\x00\x00\x00\x0C' + b'\x73\x65\x72\x4E\x75\x6D\x31\x32\x33\x34\x35\x36\x00\x00\x00\x00' + b'\x42\x00\xA1\x07\x00\x00\x00\x06' + b'\x73\x65\x63\x72\x65\x74\x00\x00' + b'\x42\x00\xA2\x07\x00\x00\x00\x09' + b'\x64\x65\x76\x49\x44\x32\x32\x33\x33\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xAB\x07\x00\x00\x00\x09' + b'\x6E\x65\x74\x49\x44\x39\x30\x30\x30\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xA9\x07\x00\x00\x00\x0A' + b'\x6D\x61\x63\x68\x69\x6E\x65\x49\x44\x31\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xAA\x07\x00\x00\x00\x0A' + b'\x6D\x65\x64\x69\x61\x49\x44\x33\x31\x33\x00\x00\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestCredential, self).tearDown() + + def test_init(self): + """ + Test that a Credential struct can be constructed without arguments. + """ + credential = objects.Credential() + + self.assertEqual(None, credential.credential_type) + self.assertEqual(None, credential.credential_value) + + def test_init_with_args(self): + """ + Test that a Credential struct can be constructed with arguments. + """ + credential = objects.Credential( + credential_type=enums.CredentialType.USERNAME_AND_PASSWORD, + credential_value=objects.UsernamePasswordCredential( + username="John", + password="abc123" + ) + ) + + self.assertEqual( + enums.CredentialType.USERNAME_AND_PASSWORD, + credential.credential_type + ) + self.assertEqual( + objects.UsernamePasswordCredential( + username="John", + password="abc123" + ), + credential.credential_value + ) + + def test_invalid_credential_type(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the credential type of a Credential struct. + """ + kwargs = {"credential_type": "invalid"} + self.assertRaisesRegexp( + TypeError, + "Credential type must be a CredentialType enumeration.", + objects.Credential, + **kwargs + ) + + credential = objects.Credential() + args = (credential, "credential_type", 0) + self.assertRaisesRegexp( + TypeError, + "Credential type must be a CredentialType enumeration.", + setattr, + *args + ) + + def test_invalid_credential_value(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the credential value of a Credential struct. + """ + kwargs = {"credential_value": "invalid"} + self.assertRaisesRegexp( + TypeError, + "Credential value must be a CredentialValue struct.", + objects.Credential, + **kwargs + ) + + credential = objects.Credential() + args = (credential, "credential_value", 0) + self.assertRaisesRegexp( + TypeError, + "Credential value must be a CredentialValue struct.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a Credential struct can be read from a data stream. + """ + # Test with a UsernamePasswordCredential. + credential = objects.Credential() + + self.assertEqual(None, credential.credential_type) + self.assertEqual(None, credential.credential_value) + + credential.read(self.username_password_encoding) + + self.assertEqual( + enums.CredentialType.USERNAME_AND_PASSWORD, + credential.credential_type + ) + self.assertEqual( + objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ), + credential.credential_value + ) + + # Test with a DeviceCredential + credential = objects.Credential() + + self.assertEqual(None, credential.credential_type) + self.assertEqual(None, credential.credential_value) + + credential.read(self.device_encoding) + + self.assertEqual( + enums.CredentialType.DEVICE, + credential.credential_type + ) + self.assertEqual( + objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ), + credential.credential_value + ) + + def test_read_missing_credential_type(self): + """ + Test that a ValueError gets raised when attempting to read a + Credential struct from a data stream missing the credential type data. + """ + credential = objects.Credential() + + self.assertEqual(None, credential.credential_type) + self.assertEqual(None, credential.credential_value) + + args = (self.encoding_missing_credential_type, ) + self.assertRaisesRegexp( + ValueError, + "Credential encoding missing the credential type.", + credential.read, + *args + ) + + def test_read_unknown_credential_type(self): + """ + Test that a ValueError gets raised when attempting to read a + Credential struct from a data stream with an unknown credential + type. + """ + credential = objects.Credential() + + self.assertEqual(None, credential.credential_type) + self.assertEqual(None, credential.credential_value) + + args = (self.encoding_unknown_credential_type, ) + self.assertRaisesRegexp( + ValueError, + "Credential encoding includes unrecognized credential type.", + credential.read, + *args + ) + + def test_read_missing_credential_value(self): + """ + Test that a ValueError gets raised when attempting to read a + Credential struct from a data stream missing the credential value + data. + """ + credential = objects.Credential() + + self.assertEqual(None, credential.credential_type) + self.assertEqual(None, credential.credential_value) + + args = (self.encoding_missing_credential_value, ) + self.assertRaisesRegexp( + ValueError, + "Credential encoding missing the credential value.", + credential.read, + *args + ) + + def test_write(self): + """ + Test that a Credential struct can be written to a data stream. + """ + # Test with a UsernamePasswordCredential. + credential = objects.Credential( + credential_type=enums.CredentialType.USERNAME_AND_PASSWORD, + credential_value=objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + ) + stream = utils.BytearrayStream() + + credential.write(stream) + + self.assertEqual(len(self.username_password_encoding), len(stream)) + self.assertEqual(str(self.username_password_encoding), str(stream)) + + # Test with a DeviceCredential. + credential = objects.Credential( + credential_type=enums.CredentialType.DEVICE, + credential_value=objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + ) + stream = utils.BytearrayStream() + + credential.write(stream) + + self.assertEqual(len(self.device_encoding), len(stream)) + self.assertEqual(str(self.device_encoding), str(stream)) + + def test_write_missing_credential_type(self): + """ + Test that a ValueError gets raised when attempting to write a + Credential struct missing credential type data to a data stream. + """ + credential = objects.Credential( + credential_value=objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + ) + stream = utils.BytearrayStream() + + args = (stream, ) + self.assertRaisesRegexp( + ValueError, + "Credential struct missing the credential type.", + credential.write, + *args + ) + + def test_write_missing_credential_value(self): + """ + Test that a ValueError gets raised when attempting to write a + Credential struct missing credential value data to a data stream. + """ + credential = objects.Credential( + credential_type=enums.CredentialType.DEVICE + ) + stream = utils.BytearrayStream() + + args = (stream, ) + self.assertRaisesRegexp( + ValueError, + "Credential struct missing the credential value.", + credential.write, + *args + ) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + Credential structs with the same data. + """ + a = objects.Credential() + b = objects.Credential() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + # Test with a UsernamePasswordCredential. + a = objects.Credential( + credential_type=enums.CredentialType.USERNAME_AND_PASSWORD, + credential_value=objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + ) + b = objects.Credential( + credential_type=enums.CredentialType.USERNAME_AND_PASSWORD, + credential_value=objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + # Test with a DeviceCredential. + a = objects.Credential( + credential_type=enums.CredentialType.DEVICE, + credential_value=objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + ) + b = objects.Credential( + credential_type=enums.CredentialType.DEVICE, + credential_value=objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_credential_type(self): + """ + Test that the equality operator returns False when comparing two + Credential structs with different credential types. + """ + a = objects.Credential( + credential_type=enums.CredentialType.USERNAME_AND_PASSWORD + ) + b = objects.Credential( + credential_type=enums.CredentialType.DEVICE + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_credential_value(self): + """ + Test that the equality operator returns False when comparing two + Credential structs with different credential values. + """ + a = objects.Credential( + credential_value=objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + ) + b = objects.Credential( + credential_value=objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + Credential structs with different types. + """ + a = objects.Credential() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Credential structs with the same data. + """ + a = objects.Credential() + b = objects.Credential() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + # Test with a UsernamePasswordCredential. + a = objects.Credential( + credential_type=enums.CredentialType.USERNAME_AND_PASSWORD, + credential_value=objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + ) + b = objects.Credential( + credential_type=enums.CredentialType.USERNAME_AND_PASSWORD, + credential_value=objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + # Test with a DeviceCredential. + a = objects.Credential( + credential_type=enums.CredentialType.DEVICE, + credential_value=objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + ) + b = objects.Credential( + credential_type=enums.CredentialType.DEVICE, + credential_value=objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_credential_type(self): + """ + Test that the inequality operator returns True when comparing two + Credential structs with different credential types. + """ + a = objects.Credential( + credential_type=enums.CredentialType.USERNAME_AND_PASSWORD + ) + b = objects.Credential( + credential_type=enums.CredentialType.DEVICE + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_credential_value(self): + """ + Test that the inequality operator returns True when comparing two + Credential structs with different credential values. + """ + a = objects.Credential( + credential_value=objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + ) + b = objects.Credential( + credential_value=objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + Credential structs with different types. + """ + a = objects.Credential() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can be applied to a Credential struct. + """ + # Test with a UsernamePasswordCredential. + credential = objects.Credential( + credential_type=enums.CredentialType.USERNAME_AND_PASSWORD, + credential_value=objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + ) + expected = ( + "Credential(" + "credential_type=CredentialType.USERNAME_AND_PASSWORD, " + "credential_value=UsernamePasswordCredential(" + "username='Fred', " + "password='password1'))" + ) + observed = repr(credential) + + self.assertEqual(expected, observed) + + # Test with a DeviceCredential. + credential = objects.Credential( + credential_type=enums.CredentialType.DEVICE, + credential_value=objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + ) + expected = ( + "Credential(" + "credential_type=CredentialType.DEVICE, " + "credential_value=DeviceCredential(" + "device_serial_number='serNum123456', " + "password='secret', " + "device_identifier='devID2233', " + "network_identifier='netID9000', " + "machine_identifier='machineID1', " + "media_identifier='mediaID313'))" + ) + observed = repr(credential) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to a Credential struct. + """ + # Test with a UsernamePasswordCredential. + credential = objects.Credential( + credential_type=enums.CredentialType.USERNAME_AND_PASSWORD, + credential_value=objects.UsernamePasswordCredential( + username="Fred", + password="password1" + ) + ) + expected = str({ + "credential_type": enums.CredentialType.USERNAME_AND_PASSWORD, + "credential_value": str({ + "username": "Fred", + "password": "password1" + }) + }) + observed = str(credential) + + self.assertEqual(expected, observed) + + # Test with a DeviceCredential. + credential = objects.Credential( + credential_type=enums.CredentialType.DEVICE, + credential_value=objects.DeviceCredential( + device_serial_number="serNum123456", + password="secret", + device_identifier="devID2233", + network_identifier="netID9000", + machine_identifier="machineID1", + media_identifier="mediaID313" + ) + ) + expected = str({ + "credential_type": enums.CredentialType.DEVICE, + "credential_value": str({ + "device_serial_number": "serNum123456", + "password": "secret", + "device_identifier": "devID2233", + "network_identifier": "netID9000", + "machine_identifier": "machineID1", + "media_identifier": "mediaID313" + }) + }) + observed = str(credential) + + self.assertEqual(expected, observed) diff --git a/kmip/tests/unit/services/test_kmip_client.py b/kmip/tests/unit/services/test_kmip_client.py index 36cc5e1..2a6a172 100644 --- a/kmip/tests/unit/services/test_kmip_client.py +++ b/kmip/tests/unit/services/test_kmip_client.py @@ -64,8 +64,6 @@ from kmip.services.results import OperationResult from kmip.services.results import QueryResult from kmip.services.results import RekeyKeyPairResult -import kmip.core.utils as utils - import mock import os import socket @@ -144,31 +142,17 @@ class TestKMIPClient(TestCase): def test_build_credential(self): username = 'username' password = 'password' - cred_type = CredentialType.USERNAME_AND_PASSWORD self.client.username = username self.client.password = password credential = self.client._build_credential() - message = utils.build_er_error(credential.__class__, 'type', - cred_type, - credential.credential_type.value, - 'value') - self.assertEqual(CredentialType.USERNAME_AND_PASSWORD, - credential.credential_type.value, - message) - - message = utils.build_er_error( - credential.__class__, 'type', username, - credential.credential_value.username.value, 'value') - self.assertEqual(username, credential.credential_value.username.value, - message) - - message = utils.build_er_error( - credential.__class__, 'type', password, - credential.credential_value.password.value, 'value') - self.assertEqual(password, credential.credential_value.password.value, - message) + self.assertEqual( + CredentialType.USERNAME_AND_PASSWORD, + credential.credential_type + ) + self.assertEqual(username, credential.credential_value.username) + self.assertEqual(password, credential.credential_value.password) def test_build_credential_no_username(self): username = None