diff --git a/kmip/core/attributes.py b/kmip/core/attributes.py index 9a8d12b..931cb74 100644 --- a/kmip/core/attributes.py +++ b/kmip/core/attributes.py @@ -263,6 +263,7 @@ class CryptographicParameters(Struct): enums.BlockCipherMode, value, Tags.BLOCK_CIPHER_MODE) class PaddingMethod(Enumeration): + def __init__(self, value=None): super(CryptographicParameters.PaddingMethod, self).__init__( enums.PaddingMethod, value, Tags.PADDING_METHOD) @@ -273,17 +274,29 @@ class CryptographicParameters(Struct): super(CryptographicParameters.KeyRoleType, self).__init__( enums.KeyRoleType, value, Tags.KEY_ROLE_TYPE) + class DigitalSignatureAlgorithm(Enumeration): + + def __init__(self, value=None): + super(CryptographicParameters.DigitalSignatureAlgorithm, + self).__init__(enums.DigitalSignatureAlgorithm, + value, Tags.DIGITAL_SIGNATURE_ALGORITHM) + + # TODO: Need to implement other fields of CryptographicParameters (3.6) def __init__(self, block_cipher_mode=None, padding_method=None, hashing_algorithm=None, - key_role_type=None): + key_role_type=None, + digital_signature_algorithm=None, + cryptographic_algorithm=None): super(CryptographicParameters, self).__init__( tag=Tags.CRYPTOGRAPHIC_PARAMETERS) self.block_cipher_mode = block_cipher_mode self.padding_method = padding_method self.hashing_algorithm = hashing_algorithm self.key_role_type = key_role_type + self.digital_signature_algorithm = digital_signature_algorithm + self.cryptographic_algorithm = cryptographic_algorithm def read(self, istream): super(CryptographicParameters, self).read(istream) @@ -305,6 +318,15 @@ class CryptographicParameters(Struct): self.key_role_type = CryptographicParameters.KeyRoleType() self.key_role_type.read(tstream) + if self.is_tag_next(Tags.DIGITAL_SIGNATURE_ALGORITHM, tstream): + self.digital_signature_algorithm = \ + CryptographicParameters.DigitalSignatureAlgorithm() + self.digital_signature_algorithm.read(tstream) + + if self.is_tag_next(Tags.CRYPTOGRAPHIC_ALGORITHM, tstream): + self.cryptographic_algorithm = CryptographicAlgorithm() + self.cryptographic_algorithm.read(tstream) + self.is_oversized(tstream) self.validate() @@ -320,6 +342,10 @@ class CryptographicParameters(Struct): self.hashing_algorithm.write(tstream) if self.key_role_type is not None: self.key_role_type.write(tstream) + if self.digital_signature_algorithm is not None: + self.digital_signature_algorithm.write(tstream) + if self.cryptographic_algorithm is not None: + self.cryptographic_algorithm.write(tstream) # Write the length and value of the request payload self.length = tstream.length() @@ -354,6 +380,23 @@ class CryptographicParameters(Struct): msg += "; expected {0}, received {1}".format( self.KeyRoleType, self.key_role_type) raise TypeError(msg) + if self.digital_signature_algorithm is not None: + if not isinstance( + self.digital_signature_algorithm, + CryptographicParameters.DigitalSignatureAlgorithm + ): + msg = "Invalid digital signature algorithm" + msg += "; expected {0}, received {1}".format( + CryptographicParameters.DigitalSignatureAlgorithm, + self.digital_signature_algorithm) + raise TypeError(msg) + if self.cryptographic_algorithm is not None: + if not isinstance(self.cryptographic_algorithm, + CryptographicAlgorithm): + msg = "Invalid cryptograhic algorithm" + msg += "; expected {0}, received {1}".format( + CryptographicAlgorithm, self.cryptographic_algorithm) + raise TypeError(msg) def __eq__(self, other): if isinstance(other, CryptographicParameters): @@ -363,6 +406,11 @@ class CryptographicParameters(Struct): return False elif self.hashing_algorithm != other.hashing_algorithm: return False + elif self.digital_signature_algorithm \ + != other.digital_signature_algorithm: + return False + elif self.cryptographic_algorithm != other.cryptographic_algorithm: + return False elif self.padding_method != other.padding_method: return False else: diff --git a/kmip/core/factories/attribute_values.py b/kmip/core/factories/attribute_values.py index 8e07739..afdfdc2 100644 --- a/kmip/core/factories/attribute_values.py +++ b/kmip/core/factories/attribute_values.py @@ -140,6 +140,9 @@ class AttributeValueFactory(object): padding_method = None hashing_algorithm = None key_role_type = None + digital_signature_algorithm = None + cryptographic_algorithm = None + # TODO: Need to implement other fields of CryptographicParameters (3.6) if params is not None: @@ -162,11 +165,23 @@ class AttributeValueFactory(object): hashing_algorithm = attributes.HashingAlgorithm( params.get("hashing_algorithm")) + if 'digital_signature_algorithm' in params: + digital_signature_algorithm = \ + attributes.CryptographicParameters. \ + DigitalSignatureAlgorithm( + params.get("digital_signature_algorithm")) + + if 'cryptographic_algorithm' in params: + cryptographic_algorithm = attributes.CryptographicAlgorithm( + params.get("cryptographic_algorithm")) + return attributes.CryptographicParameters( block_cipher_mode=bcm, padding_method=padding_method, hashing_algorithm=hashing_algorithm, - key_role_type=key_role_type) + key_role_type=key_role_type, + digital_signature_algorithm=digital_signature_algorithm, + cryptographic_algorithm=cryptographic_algorithm) def _create_cryptographic_usage_mask(self, flags): mask = None diff --git a/kmip/core/factories/payloads/request.py b/kmip/core/factories/payloads/request.py index e1f2374..a76a9ed 100644 --- a/kmip/core/factories/payloads/request.py +++ b/kmip/core/factories/payloads/request.py @@ -28,6 +28,7 @@ from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register from kmip.core.messages.payloads import revoke +from kmip.core.messages.payloads import mac class RequestPayloadFactory(PayloadFactory): @@ -70,3 +71,6 @@ class RequestPayloadFactory(PayloadFactory): def _create_revoke_payload(self): return revoke.RevokeRequestPayload() + + def _create_mac_payload(self): + return mac.MACRequestPayload() diff --git a/kmip/core/factories/payloads/response.py b/kmip/core/factories/payloads/response.py index 7cf6a93..b1b8abc 100644 --- a/kmip/core/factories/payloads/response.py +++ b/kmip/core/factories/payloads/response.py @@ -28,6 +28,7 @@ from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register from kmip.core.messages.payloads import revoke +from kmip.core.messages.payloads import mac class ResponsePayloadFactory(PayloadFactory): @@ -70,3 +71,6 @@ class ResponsePayloadFactory(PayloadFactory): def _create_revoke_payload(self): return revoke.RevokeResponsePayload() + + def _create_mac_payload(self): + return mac.MACResponsePayload() diff --git a/kmip/core/messages/payloads/mac.py b/kmip/core/messages/payloads/mac.py new file mode 100644 index 0000000..424b325 --- /dev/null +++ b/kmip/core/messages/payloads/mac.py @@ -0,0 +1,207 @@ +# Copyright (c) 2017 Pure Storage, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from kmip.core import attributes +from kmip.core import enums +from kmip.core import exceptions +from kmip.core.enums import Tags + +from kmip.core.objects import Data, MACData + +from kmip.core.primitives import Struct + +from kmip.core.utils import BytearrayStream + + +# 4.33 +class MACRequestPayload(Struct): + + def __init__(self, + unique_identifier=None, + cryptographic_parameters=None, + data=None): + + super(MACRequestPayload, self).__init__( + tag=enums.Tags.REQUEST_PAYLOAD) + + self._unique_identifier = None + self._cryptographic_parameters = None + self._data = None + + self.unique_identifier = unique_identifier + self.cryptographic_parameters = cryptographic_parameters + self.data = data + + @property + def unique_identifier(self): + return self._unique_identifier + + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, attributes.UniqueIdentifier): + self._unique_identifier = value + else: + raise TypeError("unique identifier must be UniqueIdentifier type") + + @property + def cryptographic_parameters(self): + return self._cryptographic_parameters + + @cryptographic_parameters.setter + def cryptographic_parameters(self, value): + if value is None: + self._cryptographic_parameters = None + elif isinstance(value, attributes.CryptographicParameters): + self._cryptographic_parameters = value + else: + raise TypeError("cryptographic parameters must " + "be CryptographicParameters type") + + @property + def data(self): + return self._data + + @data.setter + def data(self, value): + if value is None: + self._data = None + elif isinstance(value, Data): + self._data = value + else: + raise TypeError("data must be Data type") + + def read(self, istream): + super(MACRequestPayload, self).read(istream) + tstream = BytearrayStream(istream.read(self.length)) + + if self.is_tag_next(Tags.UNIQUE_IDENTIFIER, tstream): + self.unique_identifier = attributes.UniqueIdentifier() + self.unique_identifier.read(tstream) + + if self.is_tag_next(Tags.CRYPTOGRAPHIC_PARAMETERS, tstream): + self.cryptographic_parameters = \ + attributes.CryptographicParameters() + self.cryptographic_parameters.read(tstream) + + if self.is_tag_next(Tags.DATA, tstream): + self.data = Data() + self.data.read(tstream) + else: + raise exceptions.InvalidKmipEncoding( + "expected mac request data not found" + ) + + self.is_oversized(tstream) + + def write(self, ostream): + tstream = BytearrayStream() + + if self._unique_identifier is not None: + self._unique_identifier.write(tstream) + if self._cryptographic_parameters is not None: + self._cryptographic_parameters.write(tstream) + if self._data is not None: + self.data.write(tstream) + else: + raise exceptions.InvalidField( + "The mac request data is required" + ) + + self.length = tstream.length() + super(MACRequestPayload, self).write(ostream) + ostream.write(tstream.buffer) + + +class MACResponsePayload(Struct): + + def __init__(self, + unique_identifier=None, + mac_data=None): + super(MACResponsePayload, self).__init__( + tag=enums.Tags.RESPONSE_PAYLOAD) + + self._unique_identifier = None + self._mac_data = None + + self.unique_identifier = unique_identifier + self.mac_data = mac_data + + @property + def unique_identifier(self): + return self._unique_identifier + + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, attributes.UniqueIdentifier): + self._unique_identifier = value + else: + raise TypeError("unique identifier must be UniqueIdentifier type") + + @property + def mac_data(self): + return self._mac_data + + @mac_data.setter + def mac_data(self, value): + if value is None: + self._mac_data = None + elif isinstance(value, MACData): + self._mac_data = value + else: + raise TypeError("mac_data must be MACData type") + + def read(self, istream): + super(MACResponsePayload, self).read(istream) + tstream = BytearrayStream(istream.read(self.length)) + + if self.is_tag_next(Tags.UNIQUE_IDENTIFIER, tstream): + self._unique_identifier = attributes.UniqueIdentifier() + self._unique_identifier.read(tstream) + else: + raise exceptions.InvalidKmipEncoding( + "expected mac response unique identifier not found" + ) + + if self.is_tag_next(Tags.MAC_DATA, tstream): + self._mac_data = MACData() + self._mac_data.read(tstream) + else: + raise exceptions.InvalidKmipEncoding( + "expected mac response mac data not found" + ) + + self.is_oversized(tstream) + + def write(self, ostream): + tstream = BytearrayStream() + + if self._unique_identifier is not None: + self._unique_identifier.write(tstream) + else: + raise exceptions.InvalidField( + "The mac response unique identifier is required" + ) + if self._mac_data is not None: + self._mac_data.write(tstream) + else: + raise exceptions.InvalidField( + "The mac response mac data is required" + ) + self.length = tstream.length() + super(MACResponsePayload, self).write(ostream) + ostream.write(tstream.buffer) diff --git a/kmip/core/objects.py b/kmip/core/objects.py index f7117fb..83613c4 100644 --- a/kmip/core/objects.py +++ b/kmip/core/objects.py @@ -1216,6 +1216,20 @@ class ExtensionInformation(Struct): extension_type=extension_type) +# 2.1.10 +class Data(ByteString): + + def __init__(self, value=None): + super(Data, self).__init__(value, Tags.DATA) + + +# 2.1.13 +class MACData(ByteString): + + def __init__(self, value=None): + super(MACData, self).__init__(value, Tags.MAC_DATA) + + # 3.31, 9.1.3.2.19 class RevocationReasonCode(Enumeration): diff --git a/kmip/tests/unit/core/attributes/test_attributes.py b/kmip/tests/unit/core/attributes/test_attributes.py index 737ffc1..52fcf0e 100644 --- a/kmip/tests/unit/core/attributes/test_attributes.py +++ b/kmip/tests/unit/core/attributes/test_attributes.py @@ -32,6 +32,8 @@ from kmip.core.enums import BlockCipherMode from kmip.core.enums import CertificateTypeEnum from kmip.core.enums import HashingAlgorithm as HashingAlgorithmEnum from kmip.core.enums import KeyRoleType +from kmip.core.enums import DigitalSignatureAlgorithm +from kmip.core.enums import CryptographicAlgorithm from kmip.core.enums import PaddingMethod from kmip.core.enums import NameType @@ -483,6 +485,7 @@ class TestApplicationData(TestCase): class TestCryptographicParameters(TestCase): + """ A test suite for the CryptographicParameters class """ @@ -498,25 +501,33 @@ class TestCryptographicParameters(TestCase): {'block_cipher_mode': BlockCipherMode.CBC, 'padding_method': PaddingMethod.PKCS5, 'hashing_algorithm': HashingAlgorithmEnum.SHA_1, - 'key_role_type': KeyRoleType.BDK}) + 'key_role_type': KeyRoleType.BDK, + 'digital_signature_algorithm': + DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION, + 'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512}) self.cp_none = self.factory.create_attribute_value( AttributeType.CRYPTOGRAPHIC_PARAMETERS, {}) # Symmetric key object with Cryptographic Parameters - # Byte stream edited to add Key Role Type parameter + # Byte stream edited to add: + # Key Role Type parameter + # Digital Signature Algorithm parameter + # Cryptographic Algorithm parameter # Based on the KMIP Spec 1.1 Test Cases document # 11.1 page 255 on the pdf version self.key_req_with_crypt_params = BytearrayStream(( - b'\x42\x00\x2B\x01\x00\x00\x00\x40' + b'\x42\x00\x2B\x01\x00\x00\x00\x60' b'\x42\x00\x11\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' b'\x42\x00\x5F\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' b'\x42\x00\x38\x05\x00\x00\x00\x04\x00\x00\x00\x04\x00\x00\x00\x00' b'\x42\x00\x83\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\xAE\x05\x00\x00\x00\x04\x00\x00\x00\x05\x00\x00\x00\x00' + b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x0B\x00\x00\x00\x00' )) def teardown(self): - super(TestDigestValue, self).tearDown() + super(TestCryptographicParameters, self).tearDown() def test_write_crypto_params(self): ostream = BytearrayStream() @@ -546,6 +557,17 @@ class TestCryptographicParameters(TestCase): self.assertEqual(HashingAlgorithmEnum.SHA_1.value, self.cp.hashing_algorithm.value.value) + self.assertEqual(Tags.DIGITAL_SIGNATURE_ALGORITHM.value, + self.cp.digital_signature_algorithm.tag.value) + self.assertEqual(DigitalSignatureAlgorithm. + SHA256_WITH_RSA_ENCRYPTION.value, + self.cp.digital_signature_algorithm.value.value) + + self.assertEqual(Tags.CRYPTOGRAPHIC_ALGORITHM.value, + self.cp.cryptographic_algorithm.tag.value) + self.assertEqual(CryptographicAlgorithm.HMAC_SHA512.value, + self.cp.cryptographic_algorithm.value.value) + def test_bad_cipher_mode(self): self.cp.block_cipher_mode = self.bad_enum_code cp_valid = self.factory.create_attribute_value( @@ -553,7 +575,10 @@ class TestCryptographicParameters(TestCase): {'block_cipher_mode': BlockCipherMode.CBC, 'padding_method': PaddingMethod.PKCS5, 'hashing_algorithm': HashingAlgorithmEnum.SHA_1, - 'key_role_type': KeyRoleType.BDK}) + 'key_role_type': KeyRoleType.BDK, + 'digital_signature_algorithm': + DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION, + 'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512}) self.assertFalse(self.cp == cp_valid) self.assertRaises(TypeError, self.cp.validate) @@ -564,7 +589,10 @@ class TestCryptographicParameters(TestCase): {'block_cipher_mode': BlockCipherMode.CBC, 'padding_method': PaddingMethod.PKCS5, 'hashing_algorithm': HashingAlgorithmEnum.SHA_1, - 'key_role_type': KeyRoleType.BDK}) + 'key_role_type': KeyRoleType.BDK, + 'digital_signature_algorithm': + DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION, + 'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512}) self.assertFalse(self.cp == cp_valid) self.assertRaises(TypeError, self.cp.validate) @@ -575,7 +603,10 @@ class TestCryptographicParameters(TestCase): {'block_cipher_mode': BlockCipherMode.CBC, 'padding_method': PaddingMethod.PKCS5, 'hashing_algorithm': HashingAlgorithmEnum.SHA_1, - 'key_role_type': KeyRoleType.BDK}) + 'key_role_type': KeyRoleType.BDK, + 'digital_signature_algorithm': + DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION, + 'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512}) self.assertFalse(self.cp == cp_valid) self.assertRaises(TypeError, self.cp.validate) @@ -586,6 +617,37 @@ class TestCryptographicParameters(TestCase): {'block_cipher_mode': BlockCipherMode.CBC, 'padding_method': PaddingMethod.PKCS5, 'hashing_algorithm': HashingAlgorithmEnum.SHA_1, - 'key_role_type': KeyRoleType.BDK}) + 'key_role_type': KeyRoleType.BDK, + 'digital_signature_algorithm': + DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION, + 'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512}) + self.assertFalse(self.cp == cp_valid) + self.assertRaises(TypeError, self.cp.validate) + + def test_bad_digital_signature_algorithm(self): + self.cp.digital_signature_algorithm = self.bad_enum_code + cp_valid = self.factory.create_attribute_value( + AttributeType.CRYPTOGRAPHIC_PARAMETERS, + {'block_cipher_mode': BlockCipherMode.CBC, + 'padding_method': PaddingMethod.PKCS5, + 'hashing_algorithm': HashingAlgorithmEnum.SHA_1, + 'key_role_type': KeyRoleType.BDK, + 'digital_signature_algorithm': + DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION, + 'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512}) + self.assertFalse(self.cp == cp_valid) + self.assertRaises(TypeError, self.cp.validate) + + def test_bad_cryptographic_algorithm(self): + self.cp.cryptographic_algorithm = self.bad_enum_code + cp_valid = self.factory.create_attribute_value( + AttributeType.CRYPTOGRAPHIC_PARAMETERS, + {'block_cipher_mode': BlockCipherMode.CBC, + 'padding_method': PaddingMethod.PKCS5, + 'hashing_algorithm': HashingAlgorithmEnum.SHA_1, + 'key_role_type': KeyRoleType.BDK, + 'digital_signature_algorithm': + DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION, + 'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512}) self.assertFalse(self.cp == cp_valid) self.assertRaises(TypeError, self.cp.validate) diff --git a/kmip/tests/unit/core/factories/payloads/test_request.py b/kmip/tests/unit/core/factories/payloads/test_request.py index dac1bce..1b16474 100644 --- a/kmip/tests/unit/core/factories/payloads/test_request.py +++ b/kmip/tests/unit/core/factories/payloads/test_request.py @@ -31,6 +31,7 @@ from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register from kmip.core.messages.payloads import revoke +from kmip.core.messages.payloads import mac class TestRequestPayloadFactory(testtools.TestCase): @@ -228,7 +229,8 @@ class TestRequestPayloadFactory(testtools.TestCase): ) def test_create_mac_payload(self): - self._test_not_implemented(self.factory.create, enums.Operation.MAC) + payload = self.factory.create(enums.Operation.MAC) + self._test_payload_type(payload, mac.MACRequestPayload) def test_create_mac_verify_payload(self): self._test_not_implemented( diff --git a/kmip/tests/unit/core/factories/payloads/test_response.py b/kmip/tests/unit/core/factories/payloads/test_response.py index 26101fc..95043b1 100644 --- a/kmip/tests/unit/core/factories/payloads/test_response.py +++ b/kmip/tests/unit/core/factories/payloads/test_response.py @@ -31,6 +31,7 @@ from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register from kmip.core.messages.payloads import revoke +from kmip.core.messages.payloads import mac class TestResponsePayloadFactory(testtools.TestCase): @@ -226,7 +227,11 @@ class TestResponsePayloadFactory(testtools.TestCase): ) def test_create_mac_payload(self): - self._test_not_implemented(self.factory.create, enums.Operation.MAC) + payload = self.factory.create(enums.Operation.MAC) + self._test_payload_type( + payload, + mac.MACResponsePayload + ) def test_create_mac_verify_payload(self): self._test_not_implemented( diff --git a/kmip/tests/unit/core/factories/test_attribute_values.py b/kmip/tests/unit/core/factories/test_attribute_values.py index 65a0c77..68005d6 100644 --- a/kmip/tests/unit/core/factories/test_attribute_values.py +++ b/kmip/tests/unit/core/factories/test_attribute_values.py @@ -89,7 +89,11 @@ class TestAttributeValueFactory(testtools.TestCase): 'block_cipher_mode': enums.BlockCipherMode.NIST_KEY_WRAP, 'padding_method': enums.PaddingMethod.ANSI_X9_23, 'key_role_type': enums.KeyRoleType.KEK, - 'hashing_algorithm': enums.HashingAlgorithm.SHA_512} + 'hashing_algorithm': enums.HashingAlgorithm.SHA_512, + 'digital_signature_algorithm': + enums.DigitalSignatureAlgorithm.ECDSA_WITH_SHA512, + 'cryptographic_algorithm': + enums.CryptographicAlgorithm.HMAC_SHA512} params = self.factory.create_attribute_value( enums.AttributeType.CRYPTOGRAPHIC_PARAMETERS, value) @@ -110,6 +114,14 @@ class TestAttributeValueFactory(testtools.TestCase): self.assertEqual( attributes.HashingAlgorithm(enums.HashingAlgorithm.SHA_512), params.hashing_algorithm) + self.assertEqual( + attributes.CryptographicParameters.DigitalSignatureAlgorithm( + enums.DigitalSignatureAlgorithm.ECDSA_WITH_SHA512), + params.digital_signature_algorithm) + self.assertEqual( + attributes.CryptographicAlgorithm( + enums.CryptographicAlgorithm.HMAC_SHA512), + params.cryptographic_algorithm) def test_create_cryptographic_domain_parameters(self): """ diff --git a/kmip/tests/unit/core/messages/payloads/test_mac.py b/kmip/tests/unit/core/messages/payloads/test_mac.py new file mode 100644 index 0000000..f1f631a --- /dev/null +++ b/kmip/tests/unit/core/messages/payloads/test_mac.py @@ -0,0 +1,298 @@ +# Copyright (c) 2017 Pure Storage, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from testtools import TestCase + +from kmip.core import attributes +from kmip.core import objects +from kmip.core import utils +from kmip.core import enums +from kmip.core import exceptions + +from kmip.core.messages.payloads import mac + + +class TestMACRequestPayload(TestCase): + + def setUp(self): + super(TestMACRequestPayload, self).setUp() + + self.unique_identifier = attributes.UniqueIdentifier(value='1') + self.cryptographic_parameters = \ + attributes.CryptographicParameters( + cryptographic_algorithm=attributes.CryptographicAlgorithm( + enums.CryptographicAlgorithm.HMAC_SHA512) + ) + self.data = objects.Data( + value=(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B' + b'\x0C\x0D\x0E\x0F') + ) + + self.encoding_full = utils.BytearrayStream(( + b'\x42\x00\x79\x01\x00\x00\x00\x40\x42\x00\x94\x07\x00\x00\x00\x01' + b'\x31\x00\x00\x00\x00\x00\x00\x00\x42\x00\x2b\x01\x00\x00\x00\x10' + b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x0b\x00\x00\x00\x00' + b'\x42\x00\xc2\x08\x00\x00\x00\x10\x00\x01\x02\x03\x04\x05\x06\x07' + b'\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f')) + self.encoding_no_data = utils.BytearrayStream(( + b'\x42\x00\x79\x01\x00\x00\x00\x28\x42\x00\x94\x07\x00\x00\x00\x01' + b'\x31\x00\x00\x00\x00\x00\x00\x00\x42\x00\x2b\x01\x00\x00\x00\x10' + b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x0b\x00\x00\x00\x00' + )) + + def tearDown(self): + super(TestMACRequestPayload, self).tearDown() + + def test_init_with_none(self): + mac.MACRequestPayload() + + def test_init_valid(self): + """ + Test that the payload can be properly constructed and the attributes + cab be properly set and retrieved. + """ + payload = mac.MACRequestPayload( + self.unique_identifier, + self.cryptographic_parameters, + self.data) + self.assertEqual(payload.unique_identifier, self.unique_identifier) + self.assertEqual(payload.cryptographic_parameters, + self.cryptographic_parameters) + self.assertEqual(payload.data, self.data) + + def test_init_with_invalid_unique_identifier(self): + kwargs = {'unique_identifier': 'invalid', + 'cryptographic_parameters': None, + 'data': None} + self.assertRaisesRegexp( + TypeError, "unique identifier must be UniqueIdentifier type", + mac.MACRequestPayload, **kwargs) + + def test_init_with_invalid_cryptographic_parameters(self): + kwargs = {'unique_identifier': None, + 'cryptographic_parameters': 'invalid', + 'data': None} + self.assertRaisesRegexp( + TypeError, + "cryptographic parameters must be CryptographicParameters type", + mac.MACRequestPayload, **kwargs) + + def test_init_with_invalid_data(self): + kwargs = {'unique_identifier': None, + 'cryptographic_parameters': None, + 'data': 'invalid'} + self.assertRaises( + TypeError, "data must be Data type", + mac.MACRequestPayload, **kwargs) + + def test_read_valid(self): + stream = self.encoding_full + payload = mac.MACRequestPayload() + payload.read(stream) + + self.assertEqual(self.unique_identifier, payload.unique_identifier) + self.assertEqual(self.cryptographic_parameters, + payload.cryptographic_parameters) + self.assertEqual(self.data, payload.data) + + def test_read_no_data(self): + """ + Test that an InvalidKmipEncoding error gets raised when attempting to + read a mac request encoding with no data. + """ + payload = mac.MACRequestPayload() + args = (self.encoding_no_data,) + self.assertRaisesRegexp( + exceptions.InvalidKmipEncoding, + "expected mac request data not found", + payload.read, + *args + ) + + def test_write_valid(self): + expected = self.encoding_full + + stream = utils.BytearrayStream() + payload = mac.MACRequestPayload( + self.unique_identifier, + self.cryptographic_parameters, + self.data) + payload.write(stream) + + self.assertEqual(expected, stream) + + def test_write_with_no_data(self): + """ + Test that an InvalidField error gets raised when attempting to + write a mac request with no data. + """ + stream = utils.BytearrayStream() + payload = mac.MACRequestPayload( + self.unique_identifier, + self.cryptographic_parameters, + None) + args = (stream,) + self.assertRaisesRegexp( + exceptions.InvalidField, + "The mac request data is required", + payload.write, + *args + ) + + +class TestMACResponsePayload(TestCase): + + def setUp(self): + super(TestMACResponsePayload, self).setUp() + + self.unique_identifier = attributes.UniqueIdentifier(value='1') + self.mac_data = objects.MACData(value=( + b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb\x32\x9f' + b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2\x2a\x5b' + b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe\xc8\x23' + b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51\xff\x7c' + ) + ) + + self.encoding_full = utils.BytearrayStream(( + b'\x42\x00\x7c\x01\x00\x00\x00\x58\x42\x00\x94\x07\x00\x00\x00\x01' + b'\x31\x00\x00\x00\x00\x00\x00\x00\x42\x00\xc6\x08\x00\x00\x00\x40' + b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb\x32\x9f' + b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2\x2a\x5b' + b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe\xc8\x23' + b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51\xff\x7c' + )) + self.encoding_no_unique_identifier = utils.BytearrayStream(( + b'\x42\x00\x7c\x01\x00\x00\x00\x48\x42\x00\xc6\x08\x00\x00\x00\x40' + b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb\x32\x9f' + b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2\x2a\x5b' + b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe\xc8\x23' + b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51\xff\x7c' + )) + self.encoding_no_mac_data = utils.BytearrayStream(( + b'\x42\x00\x7c\x01\x00\x00\x00\x10\x42\x00\x94\x07\x00\x00\x00\x01' + b'\x31\x00\x00\x00\x00\x00\x00\x00' + )) + + def tearDown(self): + super(TestMACResponsePayload, self).tearDown() + + def test_init_with_none(self): + mac.MACResponsePayload() + + def test_init_valid(self): + """ + Test that the payload can be properly constructed and the attributes + can be properly set and retrieved. + """ + payload = mac.MACResponsePayload( + self.unique_identifier, + self.mac_data) + self.assertEqual(payload.unique_identifier, self.unique_identifier) + self.assertEqual(payload.mac_data, self.mac_data) + + def test_init_with_invalid_unique_identifier(self): + kwargs = {'unique_identifier': 'invalid', + 'mac_data': None} + self.assertRaisesRegexp( + TypeError, "unique identifier must be UniqueIdentifier type", + mac.MACResponsePayload, **kwargs) + + def test_init_with_invalid_mac_data(self): + kwargs = {'unique_identifier': None, + 'mac_data': 'invalid'} + self.assertRaises( + TypeError, "data must be MACData type", + mac.MACResponsePayload, **kwargs) + + def test_read_valid(self): + stream = self.encoding_full + payload = mac.MACResponsePayload() + payload.read(stream) + + self.assertEqual(self.unique_identifier, payload.unique_identifier) + self.assertEqual(self.mac_data, payload.mac_data) + + def test_read_no_unique_identifier(self): + """ + Test that an InvalidKmipEncoding error gets raised when attempting to + read a mac response encoding with no unique identifier. + """ + payload = mac.MACResponsePayload() + args = (self.encoding_no_unique_identifier,) + self.assertRaisesRegexp( + exceptions.InvalidKmipEncoding, + "expected mac response unique identifier not found", + payload.read, + *args + ) + + def test_read_no_mac_data(self): + """ + Test that an InvalidKmipEncoding error gets raised when attempting to + read a mac response encoding with no mac data. + """ + payload = mac.MACResponsePayload() + args = (self.encoding_no_mac_data,) + self.assertRaisesRegexp( + exceptions.InvalidKmipEncoding, + "expected mac response mac data not found", + payload.read, + *args + ) + + def test_write_valid(self): + expected = self.encoding_full + + stream = utils.BytearrayStream() + payload = mac.MACResponsePayload( + self.unique_identifier, + self.mac_data) + payload.write(stream) + + self.assertEqual(expected, stream) + + def test_write_with_no_unique_identifier(self): + """ + Test that an InvalidField error gets raised when attempting to + write a mac response with no unique identifier. + """ + stream = utils.BytearrayStream() + payload = mac.MACResponsePayload( + None, + self.mac_data) + args = (stream,) + self.assertRaisesRegexp( + exceptions.InvalidField, + "The mac response unique identifier is required", + payload.write, + *args + ) + + def test_write_with_no_data(self): + """ + Test that an InvalidField error gets raised when attempting to + write a mac response with no mac data. + """ + stream = utils.BytearrayStream() + payload = mac.MACResponsePayload( + self.unique_identifier, + None) + args = (stream,) + self.assertRaisesRegexp( + exceptions.InvalidField, + "The mac response mac data is required", + payload.write, + *args + ) diff --git a/kmip/tests/unit/core/messages/test_messages.py b/kmip/tests/unit/core/messages/test_messages.py index 9dd6697..519b440 100644 --- a/kmip/tests/unit/core/messages/test_messages.py +++ b/kmip/tests/unit/core/messages/test_messages.py @@ -14,6 +14,7 @@ # under the License. from testtools import TestCase +import binascii from kmip.core.factories.keys import KeyFactory from kmip.core.factories.secrets import SecretFactory @@ -47,6 +48,7 @@ from kmip.core.messages.payloads import get from kmip.core.messages.payloads import register from kmip.core.messages.payloads import locate from kmip.core.messages.payloads import destroy +from kmip.core.messages.payloads import mac from kmip.core.misc import KeyFormatType from kmip.core.primitives import TextString @@ -155,6 +157,18 @@ class TestRequestMessage(TestCase): b'\x42\x00\x0b\x01\x00\x00\x00\x20\x42\x00\x55\x07\x00\x00\x00\x04' b'\x4b\x65\x79\x31\x00\x00\x00\x00\x42\x00\x54\x05\x00\x00\x00\x04' b'\x00\x00\x00\x01\x00\x00\x00\x00') + self.mac = ( + b'\x42\x00\x78\x01\x00\x00\x00\xa0\x42\x00\x77\x01\x00\x00\x00\x38' + b'\x42\x00\x69\x01\x00\x00\x00\x20\x42\x00\x6a\x02\x00\x00\x00\x04' + b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x6b\x02\x00\x00\x00\x04' + b'\x00\x00\x00\x02\x00\x00\x00\x00\x42\x00\x0d\x02\x00\x00\x00\x04' + b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x0f\x01\x00\x00\x00\x58' + b'\x42\x00\x5c\x05\x00\x00\x00\x04\x00\x00\x00\x23\x00\x00\x00\x00' + b'\x42\x00\x79\x01\x00\x00\x00\x40\x42\x00\x94\x07\x00\x00\x00\x01' + b'\x31\x00\x00\x00\x00\x00\x00\x00\x42\x00\x2b\x01\x00\x00\x00\x10' + b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x0b\x00\x00\x00\x00' + b'\x42\x00\xc2\x08\x00\x00\x00\x10\x00\x01\x02\x03\x04\x05\x06\x07' + b'\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f') def tearDown(self): super(TestRequestMessage, self).tearDown() @@ -966,6 +980,164 @@ class TestRequestMessage(TestCase): 'Key1', attribute_value.name_value.value)) + def test_mac_request_read(self): + self.stream = BytearrayStream(self.mac) + + request_message = messages.RequestMessage() + request_message.read(self.stream) + + request_header = request_message.request_header + msg = "Bad request header type: expected {0}, received{0}" + self.assertIsInstance(request_header, messages.RequestHeader, + msg.format(messages.RequestHeader, + type(request_header))) + + protocol_version = request_header.protocol_version + msg = "Bad protocol version type: expected {0}, received {1}" + self.assertIsInstance(protocol_version, contents.ProtocolVersion, + msg.format(contents.ProtocolVersion, + type(protocol_version))) + + protocol_version_major = protocol_version.protocol_version_major + msg = "Bad protocol version major type: expected {0}, received {1}" + exp_type = contents.ProtocolVersion.ProtocolVersionMajor + rcv_type = type(protocol_version_major) + self.assertIsInstance(protocol_version_major, exp_type, + msg.format(exp_type, rcv_type)) + msg = "Bad protocol version major value: expected {0}, received {1}" + self.assertEqual(1, protocol_version_major.value, + msg.format(1, protocol_version_major.value)) + + protocol_version_minor = protocol_version.protocol_version_minor + msg = "Bad protocol version minor type: expected {0}, received {1}" + exp_type = contents.ProtocolVersion.ProtocolVersionMinor + rcv_type = type(protocol_version_minor) + self.assertIsInstance(protocol_version_minor, exp_type, + msg.format(exp_type, rcv_type)) + msg = "Bad protocol version minor value: expected {0}, received {1}" + self.assertEqual(2, protocol_version_minor.value, + msg.format(2, protocol_version_minor.value)) + + batch_count = request_header.batch_count + msg = "Bad batch count type: expected {0}, received {1}" + self.assertIsInstance(batch_count, contents.BatchCount, + msg.format(contents.BatchCount, + type(batch_count))) + msg = "Bad batch count value: expected {0}, received {1}" + self.assertEqual(1, batch_count.value, + msg.format(1, batch_count.value)) + + batch_items = request_message.batch_items + msg = "Bad batch items type: expected {0}, received {1}" + self.assertIsInstance(batch_items, list, + msg.format(list, type(batch_items))) + self.assertEquals(1, len(batch_items), + self.msg.format('batch items', 'length', + 1, len(batch_items))) + + batch_item = batch_items[0] + msg = "Bad batch item type: expected {0}, received {1}" + self.assertIsInstance(batch_item, messages.RequestBatchItem, + msg.format(messages.RequestBatchItem, + type(batch_item))) + + operation = batch_item.operation + msg = "Bad operation type: expected {0}, received {1}" + self.assertIsInstance(operation, contents.Operation, + msg.format(contents.Operation, + type(operation))) + msg = "Bad operation value: expected {0}, received {1}" + self.assertEqual(enums.Operation.MAC, operation.value, + msg.format(enums.Operation.MAC, + operation.value)) + + request_payload = batch_item.request_payload + msg = "Bad request payload type: expected {0}, received {1}" + self.assertIsInstance(request_payload, + mac.MACRequestPayload, + msg.format(mac.MACRequestPayload, + type(request_payload))) + + unique_identifier = request_payload.unique_identifier + msg = "Bad unique identifier type: expected {0}, received {1}" + self.assertIsInstance(unique_identifier, attr.UniqueIdentifier, + msg.format(attr.UniqueIdentifier, + type(unique_identifier))) + msg = "Bad unique identifier value: expected {0}, received {1}" + self.assertEqual('1', unique_identifier.value, + msg.format('1', unique_identifier.value)) + + parameters_attribute = request_payload.cryptographic_parameters + msg = "Bad cryptographic parameters type: expected {0}, received {1}" + self.assertIsInstance(parameters_attribute, + attr.CryptographicParameters, + msg.format(attr.CryptographicParameters, + type(parameters_attribute))) + + cryptographic_algorithm = parameters_attribute.cryptographic_algorithm + msg = "Bad cryptographic algorithm type: expected {0}, received {1}" + self.assertIsInstance(cryptographic_algorithm, + attr.CryptographicAlgorithm, + msg.format(attr.CryptographicAlgorithm, + type(cryptographic_algorithm))) + msg = "Bad cryptographic algorithm value: expected {0}, received {1}" + self.assertEquals(cryptographic_algorithm.value, + enums.CryptographicAlgorithm.HMAC_SHA512, + msg.format(cryptographic_algorithm.value, + enums.CryptographicAlgorithm.HMAC_SHA512)) + + data = request_payload.data + msg = "Bad data type: expected {0}, received {1}" + self.assertIsInstance(data, objects.Data, msg.format(objects.Data, + type(data))) + exp_value = (b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B' + b'\x0C\x0D\x0E\x0F') + msg = "Bad data value: expected {0}, received {1}" + self.assertEqual( + exp_value, data.value, + msg.format(binascii.hexlify(exp_value), + binascii.hexlify(data.value)) + ) + + def test_mac_request_write(self): + prot_ver = contents.ProtocolVersion.create(1, 2) + + batch_count = contents.BatchCount(1) + req_header = messages.RequestHeader(protocol_version=prot_ver, + batch_count=batch_count) + + operation = contents.Operation(enums.Operation.MAC) + + uuid = attr.UniqueIdentifier('1') + data = objects.Data( + value=(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B' + b'\x0C\x0D\x0E\x0F') + ) + parameters_attribute = attr.CryptographicParameters( + cryptographic_algorithm=attr. + CryptographicAlgorithm(enums.CryptographicAlgorithm.HMAC_SHA512) + ) + request_payload = mac.MACRequestPayload( + unique_identifier=uuid, + cryptographic_parameters=parameters_attribute, + data=data + ) + batch_item = messages.RequestBatchItem(operation=operation, + request_payload=request_payload) + request_message = messages.RequestMessage(request_header=req_header, + batch_items=[batch_item]) + request_message.write(self.stream) + + result = self.stream.read() + len_exp = len(self.mac) + len_rcv = len(result) + self.assertEqual(len_exp, len_rcv, + self.msg.format('request message', 'write', + len_exp, len_rcv)) + + msg = "Bad request message write: encoding mismatch" + self.assertEqual(self.mac, result, msg) + class TestResponseMessage(TestCase): @@ -1050,6 +1222,22 @@ class TestResponseMessage(TestCase): b'\x34\x39\x61\x31\x63\x61\x38\x38\x2d\x36\x62\x65\x61\x2d\x34\x66' b'\x62\x32\x2d\x62\x34\x35\x30\x2d\x37\x65\x35\x38\x38\x30\x32\x63' b'\x33\x30\x33\x38\x00\x00\x00\x00') + self.mac = ( + b'\x42\x00\x7b\x01\x00\x00\x00\xd8\x42\x00\x7a\x01\x00\x00\x00\x48' + b'\x42\x00\x69\x01\x00\x00\x00\x20\x42\x00\x6a\x02\x00\x00\x00\x04' + b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x6b\x02\x00\x00\x00\x04' + b'\x00\x00\x00\x02\x00\x00\x00\x00\x42\x00\x92\x09\x00\x00\x00\x08' + b'\x00\x00\x00\x00\x58\x8a\x3f\x23\x42\x00\x0d\x02\x00\x00\x00\x04' + b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x0f\x01\x00\x00\x00\x80' + b'\x42\x00\x5c\x05\x00\x00\x00\x04\x00\x00\x00\x23\x00\x00\x00\x00' + b'\x42\x00\x7f\x05\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\x7c\x01\x00\x00\x00\x58\x42\x00\x94\x07\x00\x00\x00\x01' + b'\x31\x00\x00\x00\x00\x00\x00\x00\x42\x00\xc6\x08\x00\x00\x00\x40' + b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb\x32\x9f' + b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2\x2a\x5b' + b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe\xc8\x23' + b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51\xff\x7c' + ) self.invalid_message_response = ( b'\x42\x00\x7b\x01\x00\x00\x00\xb0\x42\x00\x7a\x01\x00\x00\x00\x48' b'\x42\x00\x69\x01\x00\x00\x00\x20\x42\x00\x6a\x02\x00\x00\x00\x04' @@ -1775,6 +1963,175 @@ class TestResponseMessage(TestCase): msg = "Bad response message write: encoding mismatch" self.assertEqual(self.locate, result, msg) + def test_mac_response_read(self): + self.stream = BytearrayStream(self.mac) + + response_message = messages.ResponseMessage() + response_message.read(self.stream) + + response_header = response_message.response_header + self.assertIsInstance(response_header, messages.ResponseHeader, + self.msg.format('response header', 'type', + messages.ResponseHeader, + type(response_header))) + protocol_version = response_header.protocol_version + self.assertIsInstance(protocol_version, contents.ProtocolVersion, + self.msg.format('response header', 'value', + contents.ProtocolVersion, + type(protocol_version))) + + protocol_version_major = protocol_version.protocol_version_major + exp_type = contents.ProtocolVersion.ProtocolVersionMajor + rcv_type = type(protocol_version_major) + self.assertIsInstance(protocol_version_major, exp_type, + self.msg.format('protocol version major', 'type', + exp_type, rcv_type)) + self.assertEqual(1, protocol_version_major.value, + self.msg.format('protocol version major', 'value', + 1, protocol_version_major.value)) + + protocol_version_minor = protocol_version.protocol_version_minor + exp_type = contents.ProtocolVersion.ProtocolVersionMinor + rcv_type = type(protocol_version_minor) + self.assertIsInstance(protocol_version_minor, exp_type, + self.msg.format('protocol version minor', 'type', + exp_type, rcv_type)) + self.assertEqual(2, protocol_version_minor.value, + self.msg.format('protocol version minor', 'value', + 2, protocol_version_minor.value)) + + time_stamp = response_header.time_stamp + value = 0x588a3f23 + self.assertIsInstance(time_stamp, contents.TimeStamp, + self.msg.format('time stamp', 'value', + contents.TimeStamp, + type(time_stamp))) + self.assertEqual(time_stamp.value, value, + self.msg.format('time stamp', 'value', + time_stamp.value, value)) + + batch_count = response_header.batch_count + self.assertIsInstance(batch_count, contents.BatchCount, + self.msg.format('batch count', 'type', + contents.BatchCount, + type(batch_count))) + self.assertEqual(1, batch_count.value, + self.msg.format('batch count', 'value', 1, + batch_count.value)) + + batch_items = response_message.batch_items + self.assertIsInstance(batch_items, list, + self.msg.format('batch items', 'type', + list, type(batch_items))) + + for batch_item in batch_items: + self.assertIsInstance(batch_item, messages.ResponseBatchItem, + self.msg.format('batch item', 'type', + messages.ResponseBatchItem, + type(batch_item))) + + operation = batch_item.operation + self.assertIsInstance(operation, contents.Operation, + self.msg.format('operation', 'type', + contents.Operation, + type(operation))) + self.assertEqual(enums.Operation.MAC, operation.value, + self.msg.format('operation', 'value', + enums.Operation.MAC, + operation.value)) + + result_status = batch_item.result_status + self.assertIsInstance(result_status, contents.ResultStatus, + self.msg.format('result status', 'type', + contents.ResultStatus, + type(result_status))) + self.assertEqual(enums.ResultStatus.SUCCESS, result_status.value, + self.msg.format('result status', 'value', + enums.ResultStatus.SUCCESS, + result_status.value)) + + response_payload = batch_item.response_payload + exp_type = mac.MACResponsePayload + rcv_type = type(response_payload) + self.assertIsInstance(response_payload, exp_type, + self.msg.format('response payload', 'type', + exp_type, rcv_type)) + + unique_identifier = response_payload.unique_identifier + value = '1' + self.assertIsInstance(unique_identifier, attr.UniqueIdentifier, + self.msg.format('unique identifier', 'type', + attr.UniqueIdentifier, + type(unique_identifier))) + self.assertEqual(value, unique_identifier.value, + self.msg.format('unique identifier', 'value', + unique_identifier.value, value)) + + mac_data = response_payload.mac_data + value = \ + (b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb' + b'\x32\x9f' + b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2' + b'\x2a\x5b' + b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe' + b'\xc8\x23' + b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51' + b'\xff\x7c') + self.assertIsInstance(mac_data, objects.MACData, + self.msg.format('secret', 'type', + objects.MACData, + type(mac_data))) + self.assertEqual(value, mac_data.value, + self.msg.format('mac data', 'value', + binascii.hexlify(mac_data.value), + binascii.hexlify(value))) + + def test_mac_response_write(self): + prot_ver = contents.ProtocolVersion.create(1, 2) + + # Fri Apr 27 10:12:23 CEST 2012 + time_stamp = contents.TimeStamp(0x588a3f23) + + batch_count = contents.BatchCount(1) + response_header = messages.ResponseHeader(protocol_version=prot_ver, + time_stamp=time_stamp, + batch_count=batch_count) + operation = contents.Operation(enums.Operation.MAC) + result_status = contents.ResultStatus(enums.ResultStatus.SUCCESS) + + uuid = '1' + uniq_id = attr.UniqueIdentifier(uuid) + + value = \ + (b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb' + b'\x32\x9f' + b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2' + b'\x2a\x5b' + b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe' + b'\xc8\x23' + b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51' + b'\xff\x7c') + mac_data = objects.MACData(value) + + resp_pl = mac.MACResponsePayload(unique_identifier=uniq_id, + mac_data=mac_data) + batch_item = messages.ResponseBatchItem(operation=operation, + result_status=result_status, + response_payload=resp_pl) + rm = messages.ResponseMessage(response_header=response_header, + batch_items=[batch_item]) + rm.write(self.stream) + + result = self.stream.read() + len_exp = len(self.mac) + len_rcv = len(result) + self.assertEqual(len_exp, len_rcv, + self.msg.format('get response message', 'write', + len_exp, len_rcv)) + + msg = "Bad response message write: encoding mismatch" + self.assertEqual(self.mac, result, msg) + def test_message_invalid_response_write(self): # Batch item of 'INVALID MESSAGE' response # has no 'operation' attribute