diff --git a/kmip/core/messages/payloads/get.py b/kmip/core/messages/payloads/get.py index 9a50256..aba89e4 100644 --- a/kmip/core/messages/payloads/get.py +++ b/kmip/core/messages/payloads/get.py @@ -13,143 +13,483 @@ # License for the specific language governing permissions and limitations # under the License. -from kmip.core.factories.secrets import SecretFactory +import six -from kmip.core import attributes from kmip.core import enums -from kmip.core.enums import Tags +from kmip.core import objects +from kmip.core import primitives +from kmip.core import secrets +from kmip.core import utils -from kmip.core.objects import KeyWrappingSpecification - -from kmip.core.primitives import Struct -from kmip.core.primitives import Enumeration - -from kmip.core.utils import BytearrayStream +from kmip.core.factories import secrets as secret_factory -# 4.11 -class GetRequestPayload(Struct): +class GetRequestPayload(primitives.Struct): + """ + A request payload for the Get operation. - # 9.1.3.2.2 - class KeyCompressionType(Enumeration): - - def __init__(self, value=None): - super(GetRequestPayload.KeyCompressionType, self).__init__( - enums.KeyCompressionType, value, Tags.KEY_COMPRESSION_TYPE) - - # 9.1.3.2.3 - class KeyFormatType(Enumeration): - - def __init__(self, value=None): - super(GetRequestPayload.KeyFormatType, self).__init__( - enums.KeyFormatType, value, Tags.KEY_FORMAT_TYPE) + Attributes: + unique_identifier: The unique ID of the managed object to retrieve + from the server. + key_format_type: The format of the returned object, if it is a key. + key_compression_type: The compression method to be used for the + returned object, if it is an elliptic curve public key. + key_wrapping_specification: A collection of settings specifying how + the returned object should be cryptographically wrapped if it is + a key. + """ def __init__(self, unique_identifier=None, key_format_type=None, key_compression_type=None, key_wrapping_specification=None): - super(GetRequestPayload, self).__init__(tag=enums.Tags.REQUEST_PAYLOAD) + """ + Construct a Get request payload struct. + + Args: + unique_identifier (string): The ID of the managed object (e.g., a + symmetric key) to retrieve. Optional, defaults to None. + key_format_type (KeyFormatType): A KeyFormatType enumeration that + specifies the format in which the object should be returned. + Optional, defaults to None. + key_compression_type (KeyCompressionType): A KeyCompressionType + enumeration that specifies the compression method to be used + when returning elliptic curve public keys. Optional, defaults + to None. + key_wrapping_specification (KeyWrappingSpecification): A + KeyWrappingSpecification struct that specifies keys and other + information for wrapping the returned object. Optional, + defaults to None. + """ + super(GetRequestPayload, self).__init__( + enums.Tags.REQUEST_PAYLOAD + ) + + self._unique_identifier = None + self._key_format_type = None + self._key_compression_type = None + self._key_wrapping_specification = None + self.unique_identifier = unique_identifier self.key_format_type = key_format_type self.key_compression_type = key_compression_type self.key_wrapping_specification = key_wrapping_specification - self.validate() - def read(self, istream): - super(GetRequestPayload, self).read(istream) - tstream = BytearrayStream(istream.read(self.length)) + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value + else: + return None - if self.is_tag_next(Tags.UNIQUE_IDENTIFIER, tstream): - self.unique_identifier = attributes.UniqueIdentifier() - self.unique_identifier.read(tstream) + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("Unique identifier must be a string.") - if self.is_tag_next(Tags.KEY_FORMAT_TYPE, tstream): - self.key_format_type = GetRequestPayload.KeyFormatType() - self.key_format_type.read(tstream) + @property + def key_format_type(self): + if self._key_format_type: + return self._key_format_type.value + else: + return None - if self.is_tag_next(Tags.KEY_COMPRESSION_TYPE, tstream): - self.key_compression_type = GetRequestPayload.KeyCompressionType() - self.key_compression_type.read(tstream) + @key_format_type.setter + def key_format_type(self, value): + if value is None: + self._key_format_type = None + elif isinstance(value, enums.KeyFormatType): + self._key_format_type = primitives.Enumeration( + enums.KeyFormatType, + value=value, + tag=enums.Tags.KEY_FORMAT_TYPE + ) + else: + raise TypeError( + "Key format type must be a KeyFormatType enumeration." + ) - if self.is_tag_next(Tags.KEY_WRAPPING_SPECIFICATION, tstream): - self.key_wrapping_specification = KeyWrappingSpecification() - self.key_wrapping_specification.read(tstream) + @property + def key_compression_type(self): + if self._key_compression_type: + return self._key_compression_type.value + else: + return None - self.is_oversized(tstream) - self.validate() + @key_compression_type.setter + def key_compression_type(self, value): + if value is None: + self._key_compression_type = None + elif isinstance(value, enums.KeyCompressionType): + self._key_compression_type = primitives.Enumeration( + enums.KeyCompressionType, + value=value, + tag=enums.Tags.KEY_COMPRESSION_TYPE + ) + else: + raise TypeError( + "Key compression type must be a KeyCompressionType " + "enumeration." + ) - def write(self, ostream): - tstream = BytearrayStream() + @property + def key_wrapping_specification(self): + return self._key_wrapping_specification - # Write the contents of the request payload - if self.unique_identifier is not None: - self.unique_identifier.write(tstream) - if self.key_format_type is not None: - self.key_format_type.write(tstream) - if self.key_compression_type is not None: - self.key_compression_type.write(tstream) - if self.key_wrapping_specification is not None: - self.key_wrapping_specification.write(tstream) + @key_wrapping_specification.setter + def key_wrapping_specification(self, value): + if value is None: + self._key_wrapping_specification = None + elif isinstance(value, objects.KeyWrappingSpecification): + self._key_wrapping_specification = value + else: + raise TypeError( + "Key wrapping specification must be a " + "KeyWrappingSpecification struct." + ) - # Write the length and value of the request payload - self.length = tstream.length() - super(GetRequestPayload, self).write(ostream) - ostream.write(tstream.buffer) + def read(self, input_stream): + """ + Read the data encoding the Get request payload and decode it into its + constituent parts. - def validate(self): - self.__validate() + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. + """ + super(GetRequestPayload, self).read(input_stream) + local_stream = utils.BytearrayStream(input_stream.read(self.length)) - def __validate(self): - # TODO (peter-hamilton) Finish implementation - pass + if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream): + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(local_stream) + + if self.is_tag_next(enums.Tags.KEY_FORMAT_TYPE, local_stream): + self._key_format_type = primitives.Enumeration( + enum=enums.KeyFormatType, + tag=enums.Tags.KEY_FORMAT_TYPE + ) + self._key_format_type.read(local_stream) + + if self.is_tag_next(enums.Tags.KEY_COMPRESSION_TYPE, local_stream): + self._key_compression_type = primitives.Enumeration( + enum=enums.KeyCompressionType, + tag=enums.Tags.KEY_COMPRESSION_TYPE + ) + self._key_compression_type.read(local_stream) + + if self.is_tag_next( + enums.Tags.KEY_WRAPPING_SPECIFICATION, + local_stream + ): + self._key_wrapping_specification = \ + objects.KeyWrappingSpecification() + self._key_wrapping_specification.read(local_stream) + + self.is_oversized(local_stream) + + def write(self, output_stream): + """ + Write the data encoding the Get request payload to a stream. + + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. + """ + local_stream = utils.BytearrayStream() + + if self._unique_identifier is not None: + self._unique_identifier.write(local_stream) + if self._key_format_type is not None: + self._key_format_type.write(local_stream) + if self._key_compression_type is not None: + self._key_compression_type.write(local_stream) + if self._key_wrapping_specification is not None: + self._key_wrapping_specification.write(local_stream) + + self.length = local_stream.length() + super(GetRequestPayload, self).write(output_stream) + output_stream.write(local_stream.buffer) + + def __eq__(self, other): + if isinstance(other, GetRequestPayload): + if self.unique_identifier != other.unique_identifier: + return False + elif self.key_format_type != other.key_format_type: + return False + elif self.key_compression_type != other.key_compression_type: + return False + elif self.key_wrapping_specification != \ + other.key_wrapping_specification: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, GetRequestPayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "unique_identifier='{0}'".format(self.unique_identifier), + "key_format_type={0}".format(self.key_format_type), + "key_compression_type={0}".format(self.key_compression_type), + "key_wrapping_specification={0}".format( + repr(self.key_wrapping_specification) + ) + ]) + return "GetRequestPayload({0})".format(args) + + def __str__(self): + return str({ + 'unique_identifier': self.unique_identifier, + 'key_format_type': self.key_format_type, + 'key_compression_type': self.key_compression_type, + 'key_wrapping_specification': self.key_wrapping_specification + }) -class GetResponsePayload(Struct): +class GetResponsePayload(primitives.Struct): + """ + A response payload for the Get operation. + + Attributes: + object_type: The type of the managed object being returned. + unique_identifier: The unique ID of the managed object being returned. + secret: The managed object being returned. + """ def __init__(self, object_type=None, unique_identifier=None, secret=None): - super(GetResponsePayload, self).__init__(tag=Tags.RESPONSE_PAYLOAD) + """ + Construct a Get response payload struct. + + Args: + object_type (ObjectType): An ObjectType enumeration specifying the + type of managed object being returned. Optional, defaults to + None. Required for read/write. + unique_identifier (string): The ID of the managed object (e.g., a + symmetric key) being returned. Optional, defaults to None. + Required for read/write. + secret (various): The managed object struct being returned. Must + be one of the following: + + Optional, defaults to None. Required for read/write. + """ + super(GetResponsePayload, self).__init__( + tag=enums.Tags.RESPONSE_PAYLOAD + ) + self._object_type = None + self._unique_identifier = None + self._secret = None + self.object_type = object_type self.unique_identifier = unique_identifier self.secret = secret - self.secret_factory = SecretFactory() - self.validate() - def read(self, istream): - super(GetResponsePayload, self).read(istream) - tstream = BytearrayStream(istream.read(self.length)) + self.secret_factory = secret_factory.SecretFactory() - self.object_type = attributes.ObjectType() - self.unique_identifier = attributes.UniqueIdentifier() + @property + def object_type(self): + if self._object_type: + return self._object_type.value + else: + return None - self.object_type.read(tstream) - self.unique_identifier.read(tstream) + @object_type.setter + def object_type(self, value): + if value is None: + self._object_type = None + elif isinstance(value, enums.ObjectType): + self._object_type = primitives.Enumeration( + enums.ObjectType, + value=value, + tag=enums.Tags.OBJECT_TYPE + ) + else: + raise TypeError("Object type must be an ObjectType enumeration.") - secret_type = self.object_type.value - self.secret = self.secret_factory.create(secret_type) - self.secret.read(tstream) + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value + else: + return None - self.is_oversized(tstream) - self.validate() + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("Unique identifier must be a string.") - def write(self, ostream): - tstream = BytearrayStream() + @property + def secret(self): + return self._secret - self.object_type.write(tstream) - self.unique_identifier.write(tstream) - self.secret.write(tstream) + @secret.setter + def secret(self, value): + if value is None: + self._secret = None + elif isinstance( + value, + ( + secrets.Certificate, + secrets.OpaqueObject, + secrets.PrivateKey, + secrets.PublicKey, + secrets.SecretData, + secrets.SplitKey, + secrets.SymmetricKey, + secrets.Template + ) + ): + self._secret = value + else: + raise TypeError( + "Secret must be one of the following structs: Certificate, " + "OpaqueObject, PrivateKey, PublicKey, SecretData, SplitKey, " + "SymmetricKey, Template" + ) - # Write the length and value of the request payload - self.length = tstream.length() - super(GetResponsePayload, self).write(ostream) - ostream.write(tstream.buffer) + def read(self, input_stream): + """ + Read the data encoding the Get response payload and decode it + into its constituent parts. - def validate(self): - self.__validate() + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. - def __validate(self): - # TODO (peter-hamilton) Finish implementation. - pass + Raises: + ValueError: Raised if the object type, unique identifier, or + secret attributes are missing from the encoded payload. + """ + super(GetResponsePayload, self).read(input_stream) + local_stream = utils.BytearrayStream(input_stream.read(self.length)) + + if self.is_tag_next(enums.Tags.OBJECT_TYPE, local_stream): + self._object_type = primitives.Enumeration( + enum=enums.ObjectType, + tag=enums.Tags.OBJECT_TYPE + ) + self._object_type.read(local_stream) + else: + raise ValueError( + "Parsed payload encoding is missing the object type field." + ) + + if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream): + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(local_stream) + else: + raise ValueError( + "Parsed payload encoding is missing the unique identifier " + "field." + ) + + self.secret = self.secret_factory.create(self.object_type) + if self.is_tag_next(self._secret.tag, local_stream): + self._secret.read(local_stream) + else: + raise ValueError( + "Parsed payload encoding is missing the secret field." + ) + + self.is_oversized(local_stream) + + def write(self, output_stream): + """ + Write the data encoding the Get response payload to a stream. + + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the object type, unique identifier, or + secret attributes are missing from the payload struct. + """ + local_stream = utils.BytearrayStream() + + if self.object_type: + self._object_type.write(local_stream) + else: + raise ValueError("Payload is missing the object type field.") + + if self.unique_identifier: + self._unique_identifier.write(local_stream) + else: + raise ValueError( + "Payload is missing the unique identifier field." + ) + + if self.secret: + self._secret.write(local_stream) + else: + raise ValueError("Payload is missing the secret field.") + + self.length = local_stream.length() + super(GetResponsePayload, self).write(output_stream) + output_stream.write(local_stream.buffer) + + def __eq__(self, other): + if isinstance(other, GetResponsePayload): + if self.object_type != other.object_type: + return False + elif self.unique_identifier != other.unique_identifier: + return False + elif self.secret != other.secret: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, GetResponsePayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "object_type={0}".format(self.object_type), + "unique_identifier='{0}'".format(self.unique_identifier), + "secret={0}".format(repr(self.secret)) + ]) + return "GetResponsePayload({0})".format(args) + + def __str__(self): + return str({ + 'object_type': self.object_type, + 'unique_identifier': self.unique_identifier, + 'secret': self.secret + }) diff --git a/kmip/demos/units/get.py b/kmip/demos/units/get.py index feb4676..db1cc0f 100644 --- a/kmip/demos/units/get.py +++ b/kmip/demos/units/get.py @@ -92,9 +92,9 @@ if __name__ == '__main__': if result.result_status.value == ResultStatus.SUCCESS: logger.info('retrieved object type: {0}'.format( result.object_type.value)) - logger.info('retrieved UUID: {0}'.format(result.uuid.value)) + logger.info('retrieved UUID: {0}'.format(result.uuid)) - utils.log_secret(logger, result.object_type.value, result.secret) + utils.log_secret(logger, result.object_type, result.secret) else: logger.info('get() result reason: {0}'.format( result.result_reason.value)) diff --git a/kmip/services/kmip_client.py b/kmip/services/kmip_client.py index c07cda5..ca428ab 100644 --- a/kmip/services/kmip_client.py +++ b/kmip/services/kmip_client.py @@ -769,25 +769,19 @@ class KMIPProxy(KMIP): credential=None): operation = Operation(OperationEnum.GET) - uuid = None - kft = None - kct = None kws = None - if unique_identifier is not None: - uuid = attr.UniqueIdentifier(unique_identifier) if key_format_type is not None: - kft = get.GetRequestPayload.KeyFormatType(key_format_type.value) - if key_compression_type is not None: - kct = key_compression_type - kct = get.GetRequestPayload.KeyCompressionType(kct) + key_format_type = key_format_type.value if key_wrapping_specification is not None: kws = objects.KeyWrappingSpecification(key_wrapping_specification) - req_pl = get.GetRequestPayload(unique_identifier=uuid, - key_format_type=kft, - key_compression_type=kct, - key_wrapping_specification=kws) + req_pl = get.GetRequestPayload( + unique_identifier=unique_identifier, + key_format_type=key_format_type, + key_compression_type=key_compression_type, + key_wrapping_specification=kws + ) batch_item = messages.RequestBatchItem(operation=operation, request_payload=req_pl) diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 9787d86..a0f6c7e 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -1378,11 +1378,11 @@ class KmipEngine(object): unique_identifier = self._id_placeholder if payload.unique_identifier: - unique_identifier = payload.unique_identifier.value + unique_identifier = payload.unique_identifier key_format_type = None if payload.key_format_type: - key_format_type = payload.key_format_type.value + key_format_type = payload.key_format_type if payload.key_compression_type: raise exceptions.KeyCompressionTypeNotSupported( @@ -1429,8 +1429,8 @@ class KmipEngine(object): core_secret = self._build_core_object(managed_object) response_payload = get.GetResponsePayload( - object_type=attributes.ObjectType(managed_object._object_type), - unique_identifier=attributes.UniqueIdentifier(unique_identifier), + object_type=managed_object._object_type, + unique_identifier=unique_identifier, secret=core_secret ) diff --git a/kmip/tests/unit/core/messages/payloads/test_get.py b/kmip/tests/unit/core/messages/payloads/test_get.py index 87b311e..c3fcc59 100644 --- a/kmip/tests/unit/core/messages/payloads/test_get.py +++ b/kmip/tests/unit/core/messages/payloads/test_get.py @@ -12,3 +12,1513 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + +import testtools + +from kmip.core import attributes +from kmip.core import enums +from kmip.core import misc +from kmip.core import objects +from kmip.core import secrets +from kmip.core import utils + +from kmip.core.messages.payloads import get + + +class TestGetRequestPayload(testtools.TestCase): + """ + Test suite for the Get request payload. + """ + + def setUp(self): + super(TestGetRequestPayload, self).setUp() + + # Encoding obtained in part from the KMIP 1.1 testing document, + # Sections 3.1.3 and 14.1. The rest of the encoding was built by + # hand. + # + # This encoding matches the following set of values: + # Request Payload + # Unique Identifier - 49a1ca88-6bea-4fb2-b450-7e58802c3038 + # Key Format Type - Raw + # Key Compression Type - EC Public Key Type Uncompressed + # Key Wrapping Specification + # Key Wrapping Method - Encrypt + # Encryption Key Information + # Unique Identifier - 100182d5-72b8-47aa-8383-4d97d512e98a + # Cryptographic Parameters + # Block Cipher Mode - NIST Key Wrap + # Encoding Option - No Encoding + + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\xC8' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x34\x39\x61\x31\x63\x61\x38\x38\x2D\x36\x62\x65\x61\x2D\x34\x66' + b'\x62\x32\x2D\x62\x34\x35\x30\x2D\x37\x65\x35\x38\x38\x30\x32\x63' + b'\x33\x30\x33\x38\x00\x00\x00\x00' + b'\x42\x00\x42\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x41\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x47\x01\x00\x00\x00\x70' + b'\x42\x00\x9E\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x36\x01\x00\x00\x00\x48' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x31\x30\x30\x31\x38\x32\x64\x35\x2D\x37\x32\x62\x38\x2D\x34\x37' + b'\x61\x61\x2D\x38\x33\x38\x33\x2D\x34\x64\x39\x37\x64\x35\x31\x32' + b'\x65\x39\x38\x61\x00\x00\x00\x00' + b'\x42\x00\x2B\x01\x00\x00\x00\x10' + b'\x42\x00\x11\x05\x00\x00\x00\x04\x00\x00\x00\x0D\x00\x00\x00\x00' + b'\x42\x00\xA3\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, Section 3.1.3. + # + # This encoding matches the following set of values: + # Request Payload + # Unique Identifier - 49a1ca88-6bea-4fb2-b450-7e58802c3038 + + self.partial_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x30' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x34\x39\x61\x31\x63\x61\x38\x38\x2D\x36\x62\x65\x61\x2D\x34\x66' + b'\x62\x32\x2D\x62\x34\x35\x30\x2D\x37\x65\x35\x38\x38\x30\x32\x63' + b'\x33\x30\x33\x38\x00\x00\x00\x00' + ) + + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestGetRequestPayload, self).tearDown() + + def test_init(self): + """ + Test that a Get request payload can be constructed with no arguments. + """ + payload = get.GetRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.key_format_type) + self.assertEqual(None, payload.key_compression_type) + self.assertEqual(None, payload.key_wrapping_specification) + + def test_init_with_args(self): + """ + Test that a Get request payload can be constructed with valid values. + """ + payload = get.GetRequestPayload( + unique_identifier='00000000-2222-4444-6666-888888888888', + key_format_type=enums.KeyFormatType.RAW, + key_compression_type=enums.KeyCompressionType. + EC_PUBLIC_KEY_TYPE_UNCOMPRESSED, + key_wrapping_specification=objects.KeyWrappingSpecification( + wrapping_method=enums.WrappingMethod.ENCRYPT + ) + ) + + self.assertEqual( + '00000000-2222-4444-6666-888888888888', + payload.unique_identifier + ) + self.assertEqual(enums.KeyFormatType.RAW, payload.key_format_type) + self.assertEqual( + enums.KeyCompressionType.EC_PUBLIC_KEY_TYPE_UNCOMPRESSED, + payload.key_compression_type + ) + self.assertIsInstance( + payload.key_wrapping_specification, + objects.KeyWrappingSpecification + ) + self.assertEqual( + enums.WrappingMethod.ENCRYPT, + payload.key_wrapping_specification.wrapping_method + ) + + def test_invalid_unique_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unique identifier of a Get request payload. + """ + kwargs = {'unique_identifier': 0} + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + get.GetRequestPayload, + **kwargs + ) + + args = (get.GetRequestPayload(), 'unique_identifier', 0) + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + setattr, + *args + ) + + def test_invalid_key_format_type(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the key format type of a Get request payload. + """ + kwargs = {'key_format_type': 'invalid'} + self.assertRaisesRegexp( + TypeError, + "Key format type must be a KeyFormatType enumeration.", + get.GetRequestPayload, + **kwargs + ) + + args = (get.GetRequestPayload(), 'key_format_type', 'invalid') + self.assertRaisesRegexp( + TypeError, + "Key format type must be a KeyFormatType enumeration.", + setattr, + *args + ) + + def test_invalid_key_compression_type(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the key compression type of a Get request payload. + """ + kwargs = {'key_compression_type': 'invalid'} + self.assertRaisesRegexp( + TypeError, + "Key compression type must be a KeyCompressionType enumeration.", + get.GetRequestPayload, + **kwargs + ) + + args = (get.GetRequestPayload(), 'key_compression_type', 'invalid') + self.assertRaisesRegexp( + TypeError, + "Key compression type must be a KeyCompressionType enumeration.", + setattr, + *args + ) + + def test_invalid_key_wrapping_specification(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the key wrapping specification of a Get request payload. + """ + kwargs = {'key_wrapping_specification': 'invalid'} + self.assertRaisesRegexp( + TypeError, + "Key wrapping specification must be a KeyWrappingSpecification " + "struct.", + get.GetRequestPayload, + **kwargs + ) + + args = ( + get.GetRequestPayload(), + 'key_wrapping_specification', + 'invalid' + ) + self.assertRaisesRegexp( + TypeError, + "Key wrapping specification must be a KeyWrappingSpecification " + "struct.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a GetRequestPayload struct can be read from a data stream. + """ + payload = get.GetRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.key_format_type) + self.assertEqual(None, payload.key_compression_type) + self.assertEqual(None, payload.key_wrapping_specification) + + payload.read(self.full_encoding) + + self.assertEqual( + '49a1ca88-6bea-4fb2-b450-7e58802c3038', + payload.unique_identifier + ) + self.assertEqual(enums.KeyFormatType.RAW, payload.key_format_type) + self.assertEqual( + enums.KeyCompressionType.EC_PUBLIC_KEY_TYPE_UNCOMPRESSED, + payload.key_compression_type + ) + self.assertIsInstance( + payload.key_wrapping_specification, + objects.KeyWrappingSpecification + ) + k = payload.key_wrapping_specification + self.assertEqual( + enums.WrappingMethod.ENCRYPT, + k.wrapping_method + ) + self.assertIsInstance( + k.encryption_key_information, + objects.EncryptionKeyInformation + ) + e = k.encryption_key_information + self.assertEqual( + '100182d5-72b8-47aa-8383-4d97d512e98a', + e.unique_identifier + ) + self.assertIsInstance( + e.cryptographic_parameters, + attributes.CryptographicParameters + ) + self.assertEqual( + enums.BlockCipherMode.NIST_KEY_WRAP, + e.cryptographic_parameters.block_cipher_mode + ) + self.assertEqual( + k.encoding_option, + enums.EncodingOption.NO_ENCODING + ) + + def test_read_partial(self): + """ + Test that a GetRequestPayload struct can be read from a partial data + stream. + """ + payload = get.GetRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.key_format_type) + self.assertEqual(None, payload.key_compression_type) + self.assertEqual(None, payload.key_wrapping_specification) + + payload.read(self.partial_encoding) + + self.assertEqual( + '49a1ca88-6bea-4fb2-b450-7e58802c3038', + payload.unique_identifier + ) + self.assertEqual(None, payload.key_format_type) + self.assertEqual(None, payload.key_compression_type) + self.assertEqual(None, payload.key_wrapping_specification) + + def test_read_empty(self): + """ + Test that a GetRequestPayload struct can be read from an empty data + stream. + """ + payload = get.GetRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.key_format_type) + self.assertEqual(None, payload.key_compression_type) + self.assertEqual(None, payload.key_wrapping_specification) + + payload.read(self.empty_encoding) + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.key_format_type) + self.assertEqual(None, payload.key_compression_type) + self.assertEqual(None, payload.key_wrapping_specification) + + def test_write(self): + """ + Test that a GetRequestPayload struct can be written to a data stream. + """ + payload = get.GetRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + key_format_type=enums.KeyFormatType.RAW, + key_compression_type=enums.KeyCompressionType. + EC_PUBLIC_KEY_TYPE_UNCOMPRESSED, + key_wrapping_specification=objects.KeyWrappingSpecification( + wrapping_method=enums.WrappingMethod.ENCRYPT, + encryption_key_information=objects.EncryptionKeyInformation( + unique_identifier='100182d5-72b8-47aa-8383-4d97d512e98a', + cryptographic_parameters=attributes. + CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.NIST_KEY_WRAP + ) + ), + encoding_option=enums.EncodingOption.NO_ENCODING + ) + ) + stream = utils.BytearrayStream() + + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_partial(self): + """ + Test that a partially defined GetRequestPayload struct can be written + to a data stream. + """ + payload = get.GetRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + stream = utils.BytearrayStream() + + payload.write(stream) + + self.assertEqual(len(self.partial_encoding), len(stream)) + self.assertEqual(str(self.partial_encoding), str(stream)) + + def test_write_empty(self): + """ + Test that an empty GetRequestPayload struct can be written to a data + stream. + """ + payload = get.GetRequestPayload() + stream = utils.BytearrayStream() + + payload.write(stream) + + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + GetRequestPayload structs with the same data. + """ + a = get.GetRequestPayload() + b = get.GetRequestPayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = get.GetRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + key_format_type=enums.KeyFormatType.RAW, + key_compression_type=enums.KeyCompressionType. + EC_PUBLIC_KEY_TYPE_UNCOMPRESSED, + key_wrapping_specification=objects.KeyWrappingSpecification( + wrapping_method=enums.WrappingMethod.ENCRYPT, + encryption_key_information=objects.EncryptionKeyInformation( + unique_identifier='100182d5-72b8-47aa-8383-4d97d512e98a', + cryptographic_parameters=attributes. + CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.NIST_KEY_WRAP + ) + ), + encoding_option=enums.EncodingOption.NO_ENCODING + ) + ) + b = get.GetRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + key_format_type=enums.KeyFormatType.RAW, + key_compression_type=enums.KeyCompressionType. + EC_PUBLIC_KEY_TYPE_UNCOMPRESSED, + key_wrapping_specification=objects.KeyWrappingSpecification( + wrapping_method=enums.WrappingMethod.ENCRYPT, + encryption_key_information=objects.EncryptionKeyInformation( + unique_identifier='100182d5-72b8-47aa-8383-4d97d512e98a', + cryptographic_parameters=attributes. + CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.NIST_KEY_WRAP + ) + ), + encoding_option=enums.EncodingOption.NO_ENCODING + ) + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns False when comparing two + GetRequestPayload structs with different unique identifiers. + """ + a = get.GetRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + b = get.GetRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c303f' + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_key_format_type(self): + """ + Test that the equality operator returns False when comparing two + GetRequestPayload structs with different key format types. + """ + a = get.GetRequestPayload( + key_format_type=enums.KeyFormatType.RAW + ) + b = get.GetRequestPayload( + key_format_type=enums.KeyFormatType.OPAQUE + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_key_compression_type(self): + """ + Test that the equality operator returns False when comparing two + GetRequestPayload structs with different key compression types. + """ + a = get.GetRequestPayload( + key_compression_type=enums.KeyCompressionType. + EC_PUBLIC_KEY_TYPE_UNCOMPRESSED + ) + b = get.GetRequestPayload( + key_compression_type=enums.KeyCompressionType. + EC_PUBLIC_KEY_TYPE_X9_62_HYBRID + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_key_wrapping_specification(self): + """ + Test that the equality operator returns False when comparing two + GetRequestPayload structs with different key wrapping specifications. + """ + a = get.GetRequestPayload( + key_wrapping_specification=objects.KeyWrappingSpecification( + wrapping_method=enums.WrappingMethod.ENCRYPT_THEN_MAC_SIGN, + encryption_key_information=objects.EncryptionKeyInformation( + unique_identifier='100182d5-72b8-47aa-ffff-4d97d512e98a', + cryptographic_parameters=attributes. + CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.NIST_KEY_WRAP + ) + ), + encoding_option=enums.EncodingOption.NO_ENCODING + ) + ) + b = get.GetRequestPayload( + key_wrapping_specification=objects.KeyWrappingSpecification( + wrapping_method=enums.WrappingMethod.ENCRYPT, + encryption_key_information=objects.EncryptionKeyInformation( + unique_identifier='100182d5-72b8-47aa-8383-4d97d512e98a', + cryptographic_parameters=attributes. + CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.NIST_KEY_WRAP + ) + ), + encoding_option=enums.EncodingOption.NO_ENCODING + ) + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + GetRequestPayload structs with different types. + """ + a = get.GetRequestPayload() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + GetRequestPayload structs with the same data. + """ + a = get.GetRequestPayload() + b = get.GetRequestPayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = get.GetRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + key_format_type=enums.KeyFormatType.RAW, + key_compression_type=enums.KeyCompressionType. + EC_PUBLIC_KEY_TYPE_UNCOMPRESSED, + key_wrapping_specification=objects.KeyWrappingSpecification( + wrapping_method=enums.WrappingMethod.ENCRYPT, + encryption_key_information=objects.EncryptionKeyInformation( + unique_identifier='100182d5-72b8-47aa-8383-4d97d512e98a', + cryptographic_parameters=attributes. + CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.NIST_KEY_WRAP + ) + ), + encoding_option=enums.EncodingOption.NO_ENCODING + ) + ) + b = get.GetRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + key_format_type=enums.KeyFormatType.RAW, + key_compression_type=enums.KeyCompressionType. + EC_PUBLIC_KEY_TYPE_UNCOMPRESSED, + key_wrapping_specification=objects.KeyWrappingSpecification( + wrapping_method=enums.WrappingMethod.ENCRYPT, + encryption_key_information=objects.EncryptionKeyInformation( + unique_identifier='100182d5-72b8-47aa-8383-4d97d512e98a', + cryptographic_parameters=attributes. + CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.NIST_KEY_WRAP + ) + ), + encoding_option=enums.EncodingOption.NO_ENCODING + ) + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_unique_identifier(self): + """ + Test that the inequality operator returns True when comparing two + GetRequestPayload structs with different unique identifiers. + """ + a = get.GetRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + b = get.GetRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c303f' + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_key_format_type(self): + """ + Test that the inequality operator returns True when comparing two + GetRequestPayload structs with different key format types. + """ + a = get.GetRequestPayload( + key_format_type=enums.KeyFormatType.RAW + ) + b = get.GetRequestPayload( + key_format_type=enums.KeyFormatType.OPAQUE + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_key_compression_type(self): + """ + Test that the equality operator returns False when comparing two + GetRequestPayload structs with different key compression types. + """ + a = get.GetRequestPayload( + key_compression_type=enums.KeyCompressionType. + EC_PUBLIC_KEY_TYPE_UNCOMPRESSED + ) + b = get.GetRequestPayload( + key_compression_type=enums.KeyCompressionType. + EC_PUBLIC_KEY_TYPE_X9_62_HYBRID + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_key_wrapping_specification(self): + """ + Test that the inequality operator returns True when comparing two + GetRequestPayload structs with different key wrapping specifications. + """ + a = get.GetRequestPayload( + key_wrapping_specification=objects.KeyWrappingSpecification( + wrapping_method=enums.WrappingMethod.ENCRYPT_THEN_MAC_SIGN, + encryption_key_information=objects.EncryptionKeyInformation( + unique_identifier='100182d5-72b8-47aa-ffff-4d97d512e98a', + cryptographic_parameters=attributes. + CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.NIST_KEY_WRAP + ) + ), + encoding_option=enums.EncodingOption.NO_ENCODING + ) + ) + b = get.GetRequestPayload( + key_wrapping_specification=objects.KeyWrappingSpecification( + wrapping_method=enums.WrappingMethod.ENCRYPT, + encryption_key_information=objects.EncryptionKeyInformation( + unique_identifier='100182d5-72b8-47aa-8383-4d97d512e98a', + cryptographic_parameters=attributes. + CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.NIST_KEY_WRAP + ) + ), + encoding_option=enums.EncodingOption.NO_ENCODING + ) + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + GetRequestPayload structs with different types. + """ + a = get.GetRequestPayload() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can be applied to a GetRequestPayload struct. + """ + payload = get.GetRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + key_format_type=enums.KeyFormatType.RAW, + key_compression_type=enums.KeyCompressionType. + EC_PUBLIC_KEY_TYPE_UNCOMPRESSED, + key_wrapping_specification=objects.KeyWrappingSpecification( + wrapping_method=enums.WrappingMethod.ENCRYPT, + encryption_key_information=objects.EncryptionKeyInformation( + unique_identifier='100182d5-72b8-47aa-8383-4d97d512e98a', + cryptographic_parameters=attributes. + CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.NIST_KEY_WRAP + ) + ), + encoding_option=enums.EncodingOption.NO_ENCODING + ) + ) + + expected = ( + "GetRequestPayload(" + "unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', " + "key_format_type=KeyFormatType.RAW, " + "key_compression_type=" + "KeyCompressionType.EC_PUBLIC_KEY_TYPE_UNCOMPRESSED, " + "key_wrapping_specification=" + "KeyWrappingSpecification(" + "wrapping_method=WrappingMethod.ENCRYPT, " + "encryption_key_information=EncryptionKeyInformation(" + "unique_identifier='100182d5-72b8-47aa-8383-4d97d512e98a', " + "cryptographic_parameters=CryptographicParameters(" + "block_cipher_mode=BlockCipherMode.NIST_KEY_WRAP, " + "padding_method=None, " + "hashing_algorithm=None, " + "key_role_type=None, " + "digital_signature_algorithm=None, " + "cryptographic_algorithm=None, " + "random_iv=None, " + "iv_length=None, " + "tag_length=None, " + "fixed_field_length=None, " + "invocation_field_length=None, " + "counter_length=None, " + "initial_counter_value=None)), " + "mac_signature_key_information=None, " + "attribute_names=None, " + "encoding_option=EncodingOption.NO_ENCODING))" + ) + observed = repr(payload) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to a GetRequestPayload struct. + """ + payload = get.GetRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + key_format_type=enums.KeyFormatType.RAW, + key_compression_type=enums.KeyCompressionType. + EC_PUBLIC_KEY_TYPE_UNCOMPRESSED, + key_wrapping_specification=objects.KeyWrappingSpecification( + wrapping_method=enums.WrappingMethod.ENCRYPT, + encryption_key_information=objects.EncryptionKeyInformation( + unique_identifier='100182d5-72b8-47aa-8383-4d97d512e98a', + cryptographic_parameters=attributes. + CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.NIST_KEY_WRAP + ) + ), + encoding_option=enums.EncodingOption.NO_ENCODING + ) + ) + + expected = str({ + 'unique_identifier': '49a1ca88-6bea-4fb2-b450-7e58802c3038', + 'key_format_type': enums.KeyFormatType.RAW, + 'key_compression_type': enums.KeyCompressionType. + EC_PUBLIC_KEY_TYPE_UNCOMPRESSED, + 'key_wrapping_specification': objects.KeyWrappingSpecification( + wrapping_method=enums.WrappingMethod.ENCRYPT, + encryption_key_information=objects.EncryptionKeyInformation( + unique_identifier='100182d5-72b8-47aa-8383-4d97d512e98a', + cryptographic_parameters=attributes. + CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.NIST_KEY_WRAP + ) + ), + encoding_option=enums.EncodingOption.NO_ENCODING + ) + }) + observed = str(payload) + + self.assertEqual(expected, observed) + + +class TestGetResponsePayload(testtools.TestCase): + """ + Test suite for the Get response payload. + """ + + def setUp(self): + super(TestGetResponsePayload, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, Section 3.1.3. + # + # This encoding matches the following set of values: + # Response Payload + # Object Type - Symmetric Key + # Unique Identifier - 49a1ca88-6bea-4fb2-b450-7e58802c3038 + # Symmetric Key + # Key Block + # Key Format Type - Raw + # Key Value + # Key Material - 0x7367578051012A6D134A855E25C8CD5E4C + # A131455729D3C8 + # Cryptographic Algorithm - 3DES + # Cryptographic Length - 168 + + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\xA8' + b'\x42\x00\x57\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x34\x39\x61\x31\x63\x61\x38\x38\x2D\x36\x62\x65\x61\x2D\x34\x66' + b'\x62\x32\x2D\x62\x34\x35\x30\x2D\x37\x65\x35\x38\x38\x30\x32\x63' + b'\x33\x30\x33\x38\x00\x00\x00\x00' + b'\x42\x00\x8F\x01\x00\x00\x00\x60' + b'\x42\x00\x40\x01\x00\x00\x00\x58' + b'\x42\x00\x42\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x45\x01\x00\x00\x00\x20' + b'\x42\x00\x43\x08\x00\x00\x00\x18' + b'\x73\x67\x57\x80\x51\x01\x2A\x6D\x13\x4A\x85\x5E\x25\xC8\xCD\x5E' + b'\x4C\xA1\x31\x45\x57\x29\xD3\xC8' + b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x2A\x02\x00\x00\x00\x04\x00\x00\x00\xA8\x00\x00\x00\x00' + ) + + self.partial_encoding_missing_object_type = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\xA0' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x34\x39\x61\x31\x63\x61\x38\x38\x2D\x36\x62\x65\x61\x2D\x34\x66' + b'\x62\x32\x2D\x62\x34\x35\x30\x2D\x37\x65\x35\x38\x38\x30\x32\x63' + b'\x33\x30\x33\x38\x00\x00\x00\x00' + b'\x42\x00\x8F\x01\x00\x00\x00\x60' + b'\x42\x00\x40\x01\x00\x00\x00\x58' + b'\x42\x00\x42\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x45\x01\x00\x00\x00\x20' + b'\x42\x00\x43\x08\x00\x00\x00\x18' + b'\x73\x67\x57\x80\x51\x01\x2A\x6D\x13\x4A\x85\x5E\x25\xC8\xCD\x5E' + b'\x4C\xA1\x31\x45\x57\x29\xD3\xC8' + b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x2A\x02\x00\x00\x00\x04\x00\x00\x00\xA8\x00\x00\x00\x00' + ) + self.partial_encoding_missing_unique_id = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x78' + b'\x42\x00\x57\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x8F\x01\x00\x00\x00\x60' + b'\x42\x00\x40\x01\x00\x00\x00\x58' + b'\x42\x00\x42\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x45\x01\x00\x00\x00\x20' + b'\x42\x00\x43\x08\x00\x00\x00\x18' + b'\x73\x67\x57\x80\x51\x01\x2A\x6D\x13\x4A\x85\x5E\x25\xC8\xCD\x5E' + b'\x4C\xA1\x31\x45\x57\x29\xD3\xC8' + b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x2A\x02\x00\x00\x00\x04\x00\x00\x00\xA8\x00\x00\x00\x00' + ) + self.partial_encoding_missing_secret = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x40' + b'\x42\x00\x57\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x34\x39\x61\x31\x63\x61\x38\x38\x2D\x36\x62\x65\x61\x2D\x34\x66' + b'\x62\x32\x2D\x62\x34\x35\x30\x2D\x37\x65\x35\x38\x38\x30\x32\x63' + b'\x33\x30\x33\x38\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestGetResponsePayload, self).tearDown() + + def test_init(self): + """ + Test that a GetRequestPayload struct can be constructed with no + arguments. + """ + payload = get.GetResponsePayload() + + self.assertEqual(None, payload.object_type) + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.secret) + + def test_init_with_args(self): + """ + Test that a GetRequestPayload struct can be constructed with valid + values. + """ + payload = get.GetResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifier='11111111-3333-5555-7777-999999999999', + secret=secrets.SymmetricKey( + key_block=objects.KeyBlock( + key_format_type=misc.KeyFormatType( + enums.KeyFormatType.RAW + ), + key_value=objects.KeyValue( + key_material=objects.KeyMaterial( + b'\x73\x67\x57\x80\x51\x01\x2A\x6D' + b'\x13\x4A\x85\x5E\x25\xC8\xCD\x5E' + b'\x4C\xA1\x31\x45\x57\x29\xD3\xC8' + ) + ), + cryptographic_algorithm=attributes.CryptographicAlgorithm( + enums.CryptographicAlgorithm.TRIPLE_DES + ), + cryptographic_length=attributes.CryptographicLength(168) + ) + ) + ) + + self.assertEqual( + enums.ObjectType.SYMMETRIC_KEY, + payload.object_type + ) + self.assertEqual( + '11111111-3333-5555-7777-999999999999', + payload.unique_identifier + ) + self.assertIsInstance(payload.secret, secrets.SymmetricKey) + self.assertIsInstance(payload.secret.key_block, objects.KeyBlock) + self.assertIsInstance( + payload.secret.key_block.key_format_type, + misc.KeyFormatType + ) + self.assertEqual( + enums.KeyFormatType.RAW, + payload.secret.key_block.key_format_type.value + ) + self.assertIsInstance( + payload.secret.key_block.key_value, + objects.KeyValue + ) + self.assertIsInstance( + payload.secret.key_block.key_value.key_material, + objects.KeyMaterial + ) + self.assertEqual( + ( + b'\x73\x67\x57\x80\x51\x01\x2A\x6D' + b'\x13\x4A\x85\x5E\x25\xC8\xCD\x5E' + b'\x4C\xA1\x31\x45\x57\x29\xD3\xC8' + ), + payload.secret.key_block.key_value.key_material.value + ) + self.assertIsInstance( + payload.secret.key_block.cryptographic_algorithm, + attributes.CryptographicAlgorithm + ) + self.assertEqual( + enums.CryptographicAlgorithm.TRIPLE_DES, + payload.secret.key_block.cryptographic_algorithm.value + ) + self.assertIsInstance( + payload.secret.key_block.cryptographic_length, + attributes.CryptographicLength + ) + self.assertEqual( + 168, + payload.secret.key_block.cryptographic_length.value + ) + + def test_invalid_object_type(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the object type of a GetResponsePayload struct. + """ + kwargs = {'object_type': 'invalid'} + self.assertRaisesRegexp( + TypeError, + "Object type must be an ObjectType enumeration.", + get.GetResponsePayload, + **kwargs + ) + + args = (get.GetResponsePayload(), 'object_type', 'invalid') + self.assertRaisesRegexp( + TypeError, + "Object type must be an ObjectType enumeration.", + setattr, + *args + ) + + def test_invalid_unique_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unique identifier of a GetResponsePayload struct. + """ + kwargs = {'unique_identifier': 0} + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + get.GetResponsePayload, + **kwargs + ) + + args = (get.GetResponsePayload(), 'unique_identifier', 0) + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + setattr, + *args + ) + + def test_invalid_secret(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the secret of a GetResponsePayload struct. + """ + kwargs = {'secret': 0} + self.assertRaisesRegexp( + TypeError, + "Secret must be one of the following structs: Certificate, " + "OpaqueObject, PrivateKey, PublicKey, SecretData, SplitKey, " + "SymmetricKey, Template", + get.GetResponsePayload, + **kwargs + ) + + args = (get.GetResponsePayload(), 'secret', 0) + self.assertRaisesRegexp( + TypeError, + "Secret must be one of the following structs: Certificate, " + "OpaqueObject, PrivateKey, PublicKey, SecretData, SplitKey, " + "SymmetricKey, Template", + setattr, + *args + ) + + def test_read(self): + """ + Test that a GetResponsePayload struct can be read from a data stream. + """ + payload = get.GetResponsePayload() + + self.assertEqual(None, payload.object_type) + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.secret) + + payload.read(self.full_encoding) + + self.assertEqual(enums.ObjectType.SYMMETRIC_KEY, payload.object_type) + self.assertEqual( + '49a1ca88-6bea-4fb2-b450-7e58802c3038', + payload.unique_identifier + ) + self.assertIsInstance(payload.secret, secrets.SymmetricKey) + self.assertIsInstance(payload.secret.key_block, objects.KeyBlock) + self.assertIsInstance( + payload.secret.key_block.key_format_type, + misc.KeyFormatType + ) + self.assertEqual( + enums.KeyFormatType.RAW, + payload.secret.key_block.key_format_type.value + ) + self.assertIsInstance( + payload.secret.key_block.key_value, + objects.KeyValue + ) + self.assertIsInstance( + payload.secret.key_block.key_value.key_material, + objects.KeyMaterial + ) + self.assertEqual( + ( + b'\x73\x67\x57\x80\x51\x01\x2A\x6D' + b'\x13\x4A\x85\x5E\x25\xC8\xCD\x5E' + b'\x4C\xA1\x31\x45\x57\x29\xD3\xC8' + ), + payload.secret.key_block.key_value.key_material.value + ) + self.assertIsInstance( + payload.secret.key_block.cryptographic_algorithm, + attributes.CryptographicAlgorithm + ) + self.assertEqual( + enums.CryptographicAlgorithm.TRIPLE_DES, + payload.secret.key_block.cryptographic_algorithm.value + ) + self.assertIsInstance( + payload.secret.key_block.cryptographic_length, + attributes.CryptographicLength + ) + self.assertEqual( + 168, + payload.secret.key_block.cryptographic_length.value + ) + + def test_read_missing_object_type(self): + """ + Test that a ValueError gets raised when a required GetResponsePayload + field is missing when decoding the struct. + """ + payload = get.GetResponsePayload() + args = (self.partial_encoding_missing_object_type, ) + self.assertRaisesRegexp( + ValueError, + "Parsed payload encoding is missing the object type field.", + payload.read, + *args + ) + + def test_read_missing_unique_identifier(self): + """ + Test that a ValueError gets raised when a required GetResponsePayload + field is missing when decoding the struct. + """ + payload = get.GetResponsePayload() + args = (self.partial_encoding_missing_unique_id, ) + self.assertRaisesRegexp( + ValueError, + "Parsed payload encoding is missing the unique identifier field.", + payload.read, + *args + ) + + def test_read_missing_secret(self): + """ + Test that a ValueError gets raised when a required GetResponsePayload + field is missing when decoding the struct. + """ + payload = get.GetResponsePayload() + args = (self.partial_encoding_missing_secret, ) + self.assertRaisesRegexp( + ValueError, + "Parsed payload encoding is missing the secret field.", + payload.read, + *args + ) + + def test_write(self): + """ + Test that a GetResponsePayload struct can be written to a data stream. + """ + payload = get.GetResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + secret=secrets.SymmetricKey( + key_block=objects.KeyBlock( + key_format_type=misc.KeyFormatType( + enums.KeyFormatType.RAW + ), + key_value=objects.KeyValue( + key_material=objects.KeyMaterial( + b'\x73\x67\x57\x80\x51\x01\x2A\x6D' + b'\x13\x4A\x85\x5E\x25\xC8\xCD\x5E' + b'\x4C\xA1\x31\x45\x57\x29\xD3\xC8' + ) + ), + cryptographic_algorithm=attributes.CryptographicAlgorithm( + enums.CryptographicAlgorithm.TRIPLE_DES + ), + cryptographic_length=attributes.CryptographicLength(168) + ) + ) + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_missing_object_type(self): + """ + Test that a ValueError gets raised when a required GetResponsePayload + field is missing when encoding the struct. + """ + payload = get.GetResponsePayload() + stream = utils.BytearrayStream() + args = (stream, ) + self.assertRaisesRegexp( + ValueError, + "Payload is missing the object type field.", + payload.write, + *args + ) + + def test_write_missing_unique_identifier(self): + """ + Test that a ValueError gets raised when a required GetResponsePayload + field is missing when encoding the struct. + """ + payload = get.GetResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY + ) + stream = utils.BytearrayStream() + args = (stream, ) + self.assertRaisesRegexp( + ValueError, + "Payload is missing the unique identifier field.", + payload.write, + *args + ) + + def test_write_missing_secret(self): + """ + Test that a ValueError gets raised when a required GetResponsePayload + field is missing when encoding the struct. + """ + payload = get.GetResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + stream = utils.BytearrayStream() + args = (stream, ) + self.assertRaisesRegexp( + ValueError, + "Payload is missing the secret field.", + payload.write, + *args + ) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + GetResponsePayload structs with the same data. + """ + a = get.GetResponsePayload() + b = get.GetResponsePayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + # TODO (peter-hamilton): Update this once equality is supported for + # SymmetricKeys. + secret = secrets.SymmetricKey( + key_block=objects.KeyBlock( + key_format_type=misc.KeyFormatType( + enums.KeyFormatType.RAW + ), + key_value=objects.KeyValue( + key_material=objects.KeyMaterial( + b'\x73\x67\x57\x80\x51\x01\x2A\x6D' + b'\x13\x4A\x85\x5E\x25\xC8\xCD\x5E' + b'\x4C\xA1\x31\x45\x57\x29\xD3\xC8' + ) + ), + cryptographic_algorithm=attributes.CryptographicAlgorithm( + enums.CryptographicAlgorithm.TRIPLE_DES + ), + cryptographic_length=attributes.CryptographicLength(168) + ) + ) + + a = get.GetResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + secret=secret + ) + b = get.GetResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + secret=secret + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_object_type(self): + """ + Test that the equality operator returns False when comparing two + GetResponsePayload structs with different object type fields. + """ + a = get.GetResponsePayload(object_type=enums.ObjectType.SYMMETRIC_KEY) + b = get.GetResponsePayload(object_type=enums.ObjectType.OPAQUE_DATA) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns False when comparing two + GetResponsePayload structs with different unique identifier fields. + """ + a = get.GetResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + b = get.GetResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-ffff-7e58802c3038' + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_secrets(self): + """ + Test that the equality operator returns False when comparing two + GetResponsePayload structs with different secret fields. + """ + # TODO (peter-hamilton): Update this test case once SymmetricKeys + # support proper field-based equality. + a = get.GetResponsePayload( + secret=secrets.SymmetricKey( + key_block=objects.KeyBlock( + key_format_type=misc.KeyFormatType( + enums.KeyFormatType.RAW + ), + key_value=objects.KeyValue( + key_material=objects.KeyMaterial( + b'\x73\x67\x57\x80\x51\x01\x2A\x6D' + b'\x13\x4A\x85\x5E\x25\xC8\xCD\x5E' + b'\x4C\xA1\x31\x45\x57\x29\xD3\xC8' + ) + ), + cryptographic_algorithm=attributes.CryptographicAlgorithm( + enums.CryptographicAlgorithm.TRIPLE_DES + ), + cryptographic_length=attributes.CryptographicLength(168) + ) + ) + ) + b = get.GetResponsePayload( + secret=secrets.SymmetricKey( + key_block=objects.KeyBlock( + key_format_type=misc.KeyFormatType( + enums.KeyFormatType.RAW + ), + key_value=objects.KeyValue( + key_material=objects.KeyMaterial( + b'\x73\x67\x57\x80\x51\x01\x2A\x6D' + b'\x13\x4A\x85\x5E\x25\xC8\xCD\x5E' + b'\x4C\xA1\x31\x45\x57\x29\xD3\xC8' + ) + ), + cryptographic_algorithm=attributes.CryptographicAlgorithm( + enums.CryptographicAlgorithm.TRIPLE_DES + ), + cryptographic_length=attributes.CryptographicLength(168) + ) + ) + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operators returns False when comparing two + GetResponsePayload structs with different types. + """ + a = get.GetResponsePayload() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + GetResponsePayload structs with the same data. + """ + a = get.GetResponsePayload() + b = get.GetResponsePayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + # TODO (peter-hamilton): Update this once equality is supported for + # SymmetricKeys. + secret = secrets.SymmetricKey( + key_block=objects.KeyBlock( + key_format_type=misc.KeyFormatType(enums.KeyFormatType.RAW), + key_value=objects.KeyValue( + key_material=objects.KeyMaterial( + b'\x73\x67\x57\x80\x51\x01\x2A\x6D' + b'\x13\x4A\x85\x5E\x25\xC8\xCD\x5E' + b'\x4C\xA1\x31\x45\x57\x29\xD3\xC8' + ) + ), + cryptographic_algorithm=attributes.CryptographicAlgorithm( + enums.CryptographicAlgorithm.TRIPLE_DES + ), + cryptographic_length=attributes.CryptographicLength(168) + ) + ) + + a = get.GetResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + secret=secret + ) + b = get.GetResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + secret=secret + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_object_type(self): + """ + Test that the inequality operator returns True when comparing two + GetResponsePayload structs with different object type fields. + """ + a = get.GetResponsePayload(object_type=enums.ObjectType.SYMMETRIC_KEY) + b = get.GetResponsePayload(object_type=enums.ObjectType.OPAQUE_DATA) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_unique_identifier(self): + """ + Test that the inequality operator returns True when comparing two + GetResponsePayload structs with different unique identifier fields. + """ + a = get.GetResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + b = get.GetResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-ffff-7e58802c3038' + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_secrets(self): + """ + Test that the inequality operator returns True when comparing two + GetResponsePayload structs with different secret fields. + """ + # TODO (peter-hamilton): Update this test case once SymmetricKeys + # support proper field-based equality. + a = get.GetResponsePayload( + secret=secrets.SymmetricKey( + key_block=objects.KeyBlock( + key_format_type=misc.KeyFormatType( + enums.KeyFormatType.RAW + ), + key_value=objects.KeyValue( + key_material=objects.KeyMaterial( + b'\x73\x67\x57\x80\x51\x01\x2A\x6D' + b'\x13\x4A\x85\x5E\x25\xC8\xCD\x5E' + b'\x4C\xA1\x31\x45\x57\x29\xD3\xC8' + ) + ), + cryptographic_algorithm=attributes.CryptographicAlgorithm( + enums.CryptographicAlgorithm.TRIPLE_DES + ), + cryptographic_length=attributes.CryptographicLength(168) + ) + ) + ) + b = get.GetResponsePayload( + secret=secrets.SymmetricKey( + key_block=objects.KeyBlock( + key_format_type=misc.KeyFormatType( + enums.KeyFormatType.RAW + ), + key_value=objects.KeyValue( + key_material=objects.KeyMaterial( + b'\x73\x67\x57\x80\x51\x01\x2A\x6D' + b'\x13\x4A\x85\x5E\x25\xC8\xCD\x5E' + b'\x4C\xA1\x31\x45\x57\x29\xD3\xC8' + ) + ), + cryptographic_algorithm=attributes.CryptographicAlgorithm( + enums.CryptographicAlgorithm.TRIPLE_DES + ), + cryptographic_length=attributes.CryptographicLength(168) + ) + ) + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operators returns True when comparing two + GetResponsePayload structs with different types. + """ + a = get.GetResponsePayload() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can be applied to a GetResponsePayload struct. + """ + payload = get.GetResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + secret=secrets.SymmetricKey( + key_block=objects.KeyBlock( + key_format_type=misc.KeyFormatType( + enums.KeyFormatType.RAW + ), + key_value=objects.KeyValue( + key_material=objects.KeyMaterial( + b'\x73\x67\x57\x80\x51\x01\x2A\x6D' + b'\x13\x4A\x85\x5E\x25\xC8\xCD\x5E' + b'\x4C\xA1\x31\x45\x57\x29\xD3\xC8' + ) + ), + cryptographic_algorithm=attributes.CryptographicAlgorithm( + enums.CryptographicAlgorithm.TRIPLE_DES + ), + cryptographic_length=attributes.CryptographicLength(168) + ) + ) + ) + + # TODO (peter-hamilton): Update the secret portion once SymmetricKeys + # support repr/str. + expected = ( + "GetResponsePayload(" + "object_type=ObjectType.SYMMETRIC_KEY, " + "unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', " + "secret=Struct()" + ")" + ) + observed = repr(payload) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to a GetResponsePayload struct. + """ + secret = secrets.SymmetricKey( + key_block=objects.KeyBlock( + key_format_type=misc.KeyFormatType(enums.KeyFormatType.RAW), + key_value=objects.KeyValue( + key_material=objects.KeyMaterial( + b'\x73\x67\x57\x80\x51\x01\x2A\x6D' + b'\x13\x4A\x85\x5E\x25\xC8\xCD\x5E' + b'\x4C\xA1\x31\x45\x57\x29\xD3\xC8' + ) + ), + cryptographic_algorithm=attributes.CryptographicAlgorithm( + enums.CryptographicAlgorithm.TRIPLE_DES + ), + cryptographic_length=attributes.CryptographicLength(168) + ) + ) + payload = get.GetResponsePayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + secret=secret + ) + + # TODO (peter-hamilton): Update the secret portion once SymmetricKeys + # support repr/str. + expected = str({ + 'object_type': enums.ObjectType.SYMMETRIC_KEY, + 'unique_identifier': '49a1ca88-6bea-4fb2-b450-7e58802c3038', + 'secret': secret + }) + observed = str(payload) + + self.assertEqual(expected, observed) diff --git a/kmip/tests/unit/core/messages/test_messages.py b/kmip/tests/unit/core/messages/test_messages.py index 065db41..13519bc 100644 --- a/kmip/tests/unit/core/messages/test_messages.py +++ b/kmip/tests/unit/core/messages/test_messages.py @@ -480,16 +480,16 @@ class TestRequestMessage(TestCase): msg.format(get.GetRequestPayload, type(request_payload))) - unique_identifier = request_payload.unique_identifier - msg = "Bad unique identifier type: expected {0}, received {1}" - self.assertIsInstance(unique_identifier, attr.UniqueIdentifier, - msg.format(attr.UniqueIdentifier, - type(unique_identifier))) - msg = "Bad unique identifier value: expected {0}, received {1}" - self.assertEqual('49a1ca88-6bea-4fb2-b450-7e58802c3038', - unique_identifier.value, - msg.format('49a1ca88-6bea-4fb2-b450-7e58802c3038', - unique_identifier.value)) + # unique_identifier = request_payload.unique_identifier + # msg = "Bad unique identifier type: expected {0}, received {1}" + # self.assertIsInstance(unique_identifier, attr.UniqueIdentifier, + # msg.format(attr.UniqueIdentifier, + # type(unique_identifier))) + # msg = "Bad unique identifier value: expected {0}, received {1}" + self.assertEqual( + '49a1ca88-6bea-4fb2-b450-7e58802c3038', + request_payload.unique_identifier + ) def test_get_request_write(self): prot_ver = contents.ProtocolVersion.create(1, 1) @@ -500,8 +500,10 @@ class TestRequestMessage(TestCase): operation = contents.Operation(enums.Operation.GET) - uuid = attr.UniqueIdentifier('49a1ca88-6bea-4fb2-b450-7e58802c3038') - request_payload = get.GetRequestPayload(unique_identifier=uuid) +# uuid = attr.UniqueIdentifier('49a1ca88-6bea-4fb2-b450-7e58802c3038') + request_payload = get.GetRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) batch_item = messages.RequestBatchItem(operation=operation, request_payload=request_payload) request_message = messages.RequestMessage(request_header=req_header, @@ -1498,25 +1500,14 @@ class TestResponseMessage(TestCase): self.msg.format('response payload', 'type', exp_type, rcv_type)) - object_type = response_payload.object_type - self.assertIsInstance(object_type, attr.ObjectType, - self.msg.format('object type', 'type', - attr.ObjectType, - type(object_type))) - self.assertEqual(enums.ObjectType.SYMMETRIC_KEY, object_type.value, - self.msg.format('object type', 'value', - enums.ObjectType.SYMMETRIC_KEY, - object_type.value)) - - unique_identifier = response_payload.unique_identifier - value = '49a1ca88-6bea-4fb2-b450-7e58802c3038' - self.assertIsInstance(unique_identifier, attr.UniqueIdentifier, - self.msg.format('unique identifier', 'type', - attr.UniqueIdentifier, - type(unique_identifier))) - self.assertEqual(value, unique_identifier.value, - self.msg.format('unique identifier', 'value', - unique_identifier.value, value)) + self.assertEqual( + enums.ObjectType.SYMMETRIC_KEY, + response_payload.object_type + ) + self.assertEqual( + '49a1ca88-6bea-4fb2-b450-7e58802c3038', + response_payload.unique_identifier + ) secret = response_payload.secret self.assertIsInstance(secret, SymmetricKey, @@ -1619,8 +1610,8 @@ class TestResponseMessage(TestCase): secret = SymmetricKey(key_block) - resp_pl = get.GetResponsePayload(object_type=object_type, - unique_identifier=uniq_id, + resp_pl = get.GetResponsePayload(object_type=object_type.value, + unique_identifier=uniq_id.value, secret=secret) batch_item = messages.ResponseBatchItem(operation=operation, result_status=result_status, diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 092c901..4407063 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -3679,9 +3679,7 @@ class TestKmipEngine(testtools.TestCase): id_b = str(obj_b.unique_identifier) # Test by specifying the ID of the object to get. - payload = get.GetRequestPayload( - unique_identifier=attributes.UniqueIdentifier(id_a) - ) + payload = get.GetRequestPayload(unique_identifier=id_a) response_payload = e._process_get(payload) e._data_session.commit() @@ -3692,9 +3690,9 @@ class TestKmipEngine(testtools.TestCase): ) self.assertEqual( enums.ObjectType.OPAQUE_DATA, - response_payload.object_type.value + response_payload.object_type ) - self.assertEqual(str(id_a), response_payload.unique_identifier.value) + self.assertEqual(str(id_a), response_payload.unique_identifier) self.assertIsInstance(response_payload.secret, secrets.OpaqueObject) self.assertEqual( enums.OpaqueDataType.NONE, @@ -3722,9 +3720,9 @@ class TestKmipEngine(testtools.TestCase): ) self.assertEqual( enums.ObjectType.OPAQUE_DATA, - response_payload.object_type.value + response_payload.object_type ) - self.assertEqual(str(id_b), response_payload.unique_identifier.value) + self.assertEqual(str(id_b), response_payload.unique_identifier) self.assertIsInstance(response_payload.secret, secrets.OpaqueObject) self.assertEqual( enums.OpaqueDataType.NONE, @@ -3749,11 +3747,8 @@ class TestKmipEngine(testtools.TestCase): e._logger = mock.MagicMock() # Test that specifying the key compression type generates an error. - payload = get.GetRequestPayload( - key_compression_type=get.GetRequestPayload.KeyCompressionType( - enums.KeyCompressionType.EC_PUBLIC_KEY_TYPE_UNCOMPRESSED - ) - ) + k = enums.KeyCompressionType.EC_PUBLIC_KEY_TYPE_UNCOMPRESSED + payload = get.GetRequestPayload(key_compression_type=k) args = (payload, ) regex = "Key compression is not supported." @@ -3813,10 +3808,8 @@ class TestKmipEngine(testtools.TestCase): # Test that a key can be retrieved with the right key format. payload = get.GetRequestPayload( - unique_identifier=attributes.UniqueIdentifier(id_a), - key_format_type=get.GetRequestPayload.KeyFormatType( - enums.KeyFormatType.RAW - ) + unique_identifier=id_a, + key_format_type=enums.KeyFormatType.RAW ) response_payload = e._process_get(payload) @@ -3850,10 +3843,8 @@ class TestKmipEngine(testtools.TestCase): e._logger.reset_mock() payload = get.GetRequestPayload( - unique_identifier=attributes.UniqueIdentifier(id_a), - key_format_type=get.GetRequestPayload.KeyFormatType( - enums.KeyFormatType.OPAQUE - ) + unique_identifier=id_a, + key_format_type=enums.KeyFormatType.OPAQUE ) args = (payload, ) @@ -3883,10 +3874,8 @@ class TestKmipEngine(testtools.TestCase): id_b = str(obj_b.unique_identifier) payload = get.GetRequestPayload( - unique_identifier=attributes.UniqueIdentifier(id_b), - key_format_type=get.GetRequestPayload.KeyFormatType( - enums.KeyFormatType.RAW - ) + unique_identifier=id_b, + key_format_type=enums.KeyFormatType.RAW ) args = (payload, ) @@ -3921,9 +3910,7 @@ class TestKmipEngine(testtools.TestCase): e._data_session = e._data_store_session_factory() id_a = str(obj_a.unique_identifier) - payload = get.GetRequestPayload( - unique_identifier=attributes.UniqueIdentifier(id_a) - ) + payload = get.GetRequestPayload(unique_identifier=id_a) # Test by specifying the ID of the object to get. args = [payload] @@ -5931,9 +5918,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.reset_mock() # Retrieve the created key using Get and verify all fields set - payload = get.GetRequestPayload( - unique_identifier=attributes.UniqueIdentifier(uid) - ) + payload = get.GetRequestPayload(unique_identifier=uid) response_payload = e._process_get(payload) e._data_session.commit() @@ -5944,9 +5929,9 @@ class TestKmipEngine(testtools.TestCase): ) self.assertEqual( enums.ObjectType.SYMMETRIC_KEY, - response_payload.object_type.value + response_payload.object_type ) - self.assertEqual(str(uid), response_payload.unique_identifier.value) + self.assertEqual(str(uid), response_payload.unique_identifier) self.assertIsInstance(response_payload.secret, secrets.SymmetricKey) key_block = response_payload.secret.key_block @@ -6070,9 +6055,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.reset_mock() # Retrieve the created public key using Get and verify all fields set - payload = get.GetRequestPayload( - unique_identifier=attributes.UniqueIdentifier(public_id) - ) + payload = get.GetRequestPayload(unique_identifier=public_id) response_payload = e._process_get(payload) e._data_session.commit() @@ -6083,12 +6066,9 @@ class TestKmipEngine(testtools.TestCase): ) self.assertEqual( enums.ObjectType.PUBLIC_KEY, - response_payload.object_type.value - ) - self.assertEqual( - str(public_id), - response_payload.unique_identifier.value + response_payload.object_type ) + self.assertEqual(str(public_id), response_payload.unique_identifier) self.assertIsInstance(response_payload.secret, secrets.PublicKey) key_block = response_payload.secret.key_block @@ -6108,9 +6088,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.reset_mock() # Retrieve the created private key using Get and verify all fields set - payload = get.GetRequestPayload( - unique_identifier=attributes.UniqueIdentifier(private_id) - ) + payload = get.GetRequestPayload(unique_identifier=private_id) response_payload = e._process_get(payload) e._data_session.commit() @@ -6121,12 +6099,9 @@ class TestKmipEngine(testtools.TestCase): ) self.assertEqual( enums.ObjectType.PRIVATE_KEY, - response_payload.object_type.value - ) - self.assertEqual( - str(private_id), - response_payload.unique_identifier.value + response_payload.object_type ) + self.assertEqual(str(private_id), response_payload.unique_identifier) self.assertIsInstance(response_payload.secret, secrets.PrivateKey) key_block = response_payload.secret.key_block @@ -6295,9 +6270,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.reset_mock() # Retrieve the registered key using Get and verify all fields set - payload = get.GetRequestPayload( - unique_identifier=attributes.UniqueIdentifier(uid) - ) + payload = get.GetRequestPayload(unique_identifier=uid) response_payload = e._process_get(payload) e._data_session.commit() @@ -6308,9 +6281,9 @@ class TestKmipEngine(testtools.TestCase): ) self.assertEqual( enums.ObjectType.SYMMETRIC_KEY, - response_payload.object_type.value + response_payload.object_type ) - self.assertEqual(str(uid), response_payload.unique_identifier.value) + self.assertEqual(str(uid), response_payload.unique_identifier) self.assertIsInstance(response_payload.secret, secrets.SymmetricKey) self.assertEqual( key_bytes,