mirror of https://github.com/OpenKMIP/PyKMIP.git
Merge pull request #255 from vbnmmnbv/payload
Add necessary objects/attributes and payloads for MAC operation
This commit is contained in:
commit
84fe76e15d
|
@ -263,6 +263,7 @@ class CryptographicParameters(Struct):
|
||||||
enums.BlockCipherMode, value, Tags.BLOCK_CIPHER_MODE)
|
enums.BlockCipherMode, value, Tags.BLOCK_CIPHER_MODE)
|
||||||
|
|
||||||
class PaddingMethod(Enumeration):
|
class PaddingMethod(Enumeration):
|
||||||
|
|
||||||
def __init__(self, value=None):
|
def __init__(self, value=None):
|
||||||
super(CryptographicParameters.PaddingMethod, self).__init__(
|
super(CryptographicParameters.PaddingMethod, self).__init__(
|
||||||
enums.PaddingMethod, value, Tags.PADDING_METHOD)
|
enums.PaddingMethod, value, Tags.PADDING_METHOD)
|
||||||
|
@ -273,17 +274,29 @@ class CryptographicParameters(Struct):
|
||||||
super(CryptographicParameters.KeyRoleType, self).__init__(
|
super(CryptographicParameters.KeyRoleType, self).__init__(
|
||||||
enums.KeyRoleType, value, Tags.KEY_ROLE_TYPE)
|
enums.KeyRoleType, value, Tags.KEY_ROLE_TYPE)
|
||||||
|
|
||||||
|
class DigitalSignatureAlgorithm(Enumeration):
|
||||||
|
|
||||||
|
def __init__(self, value=None):
|
||||||
|
super(CryptographicParameters.DigitalSignatureAlgorithm,
|
||||||
|
self).__init__(enums.DigitalSignatureAlgorithm,
|
||||||
|
value, Tags.DIGITAL_SIGNATURE_ALGORITHM)
|
||||||
|
|
||||||
|
# TODO: Need to implement other fields of CryptographicParameters (3.6)
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
block_cipher_mode=None,
|
block_cipher_mode=None,
|
||||||
padding_method=None,
|
padding_method=None,
|
||||||
hashing_algorithm=None,
|
hashing_algorithm=None,
|
||||||
key_role_type=None):
|
key_role_type=None,
|
||||||
|
digital_signature_algorithm=None,
|
||||||
|
cryptographic_algorithm=None):
|
||||||
super(CryptographicParameters, self).__init__(
|
super(CryptographicParameters, self).__init__(
|
||||||
tag=Tags.CRYPTOGRAPHIC_PARAMETERS)
|
tag=Tags.CRYPTOGRAPHIC_PARAMETERS)
|
||||||
self.block_cipher_mode = block_cipher_mode
|
self.block_cipher_mode = block_cipher_mode
|
||||||
self.padding_method = padding_method
|
self.padding_method = padding_method
|
||||||
self.hashing_algorithm = hashing_algorithm
|
self.hashing_algorithm = hashing_algorithm
|
||||||
self.key_role_type = key_role_type
|
self.key_role_type = key_role_type
|
||||||
|
self.digital_signature_algorithm = digital_signature_algorithm
|
||||||
|
self.cryptographic_algorithm = cryptographic_algorithm
|
||||||
|
|
||||||
def read(self, istream):
|
def read(self, istream):
|
||||||
super(CryptographicParameters, self).read(istream)
|
super(CryptographicParameters, self).read(istream)
|
||||||
|
@ -305,6 +318,15 @@ class CryptographicParameters(Struct):
|
||||||
self.key_role_type = CryptographicParameters.KeyRoleType()
|
self.key_role_type = CryptographicParameters.KeyRoleType()
|
||||||
self.key_role_type.read(tstream)
|
self.key_role_type.read(tstream)
|
||||||
|
|
||||||
|
if self.is_tag_next(Tags.DIGITAL_SIGNATURE_ALGORITHM, tstream):
|
||||||
|
self.digital_signature_algorithm = \
|
||||||
|
CryptographicParameters.DigitalSignatureAlgorithm()
|
||||||
|
self.digital_signature_algorithm.read(tstream)
|
||||||
|
|
||||||
|
if self.is_tag_next(Tags.CRYPTOGRAPHIC_ALGORITHM, tstream):
|
||||||
|
self.cryptographic_algorithm = CryptographicAlgorithm()
|
||||||
|
self.cryptographic_algorithm.read(tstream)
|
||||||
|
|
||||||
self.is_oversized(tstream)
|
self.is_oversized(tstream)
|
||||||
self.validate()
|
self.validate()
|
||||||
|
|
||||||
|
@ -320,6 +342,10 @@ class CryptographicParameters(Struct):
|
||||||
self.hashing_algorithm.write(tstream)
|
self.hashing_algorithm.write(tstream)
|
||||||
if self.key_role_type is not None:
|
if self.key_role_type is not None:
|
||||||
self.key_role_type.write(tstream)
|
self.key_role_type.write(tstream)
|
||||||
|
if self.digital_signature_algorithm is not None:
|
||||||
|
self.digital_signature_algorithm.write(tstream)
|
||||||
|
if self.cryptographic_algorithm is not None:
|
||||||
|
self.cryptographic_algorithm.write(tstream)
|
||||||
|
|
||||||
# Write the length and value of the request payload
|
# Write the length and value of the request payload
|
||||||
self.length = tstream.length()
|
self.length = tstream.length()
|
||||||
|
@ -354,6 +380,23 @@ class CryptographicParameters(Struct):
|
||||||
msg += "; expected {0}, received {1}".format(
|
msg += "; expected {0}, received {1}".format(
|
||||||
self.KeyRoleType, self.key_role_type)
|
self.KeyRoleType, self.key_role_type)
|
||||||
raise TypeError(msg)
|
raise TypeError(msg)
|
||||||
|
if self.digital_signature_algorithm is not None:
|
||||||
|
if not isinstance(
|
||||||
|
self.digital_signature_algorithm,
|
||||||
|
CryptographicParameters.DigitalSignatureAlgorithm
|
||||||
|
):
|
||||||
|
msg = "Invalid digital signature algorithm"
|
||||||
|
msg += "; expected {0}, received {1}".format(
|
||||||
|
CryptographicParameters.DigitalSignatureAlgorithm,
|
||||||
|
self.digital_signature_algorithm)
|
||||||
|
raise TypeError(msg)
|
||||||
|
if self.cryptographic_algorithm is not None:
|
||||||
|
if not isinstance(self.cryptographic_algorithm,
|
||||||
|
CryptographicAlgorithm):
|
||||||
|
msg = "Invalid cryptograhic algorithm"
|
||||||
|
msg += "; expected {0}, received {1}".format(
|
||||||
|
CryptographicAlgorithm, self.cryptographic_algorithm)
|
||||||
|
raise TypeError(msg)
|
||||||
|
|
||||||
def __eq__(self, other):
|
def __eq__(self, other):
|
||||||
if isinstance(other, CryptographicParameters):
|
if isinstance(other, CryptographicParameters):
|
||||||
|
@ -363,6 +406,11 @@ class CryptographicParameters(Struct):
|
||||||
return False
|
return False
|
||||||
elif self.hashing_algorithm != other.hashing_algorithm:
|
elif self.hashing_algorithm != other.hashing_algorithm:
|
||||||
return False
|
return False
|
||||||
|
elif self.digital_signature_algorithm \
|
||||||
|
!= other.digital_signature_algorithm:
|
||||||
|
return False
|
||||||
|
elif self.cryptographic_algorithm != other.cryptographic_algorithm:
|
||||||
|
return False
|
||||||
elif self.padding_method != other.padding_method:
|
elif self.padding_method != other.padding_method:
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -140,6 +140,9 @@ class AttributeValueFactory(object):
|
||||||
padding_method = None
|
padding_method = None
|
||||||
hashing_algorithm = None
|
hashing_algorithm = None
|
||||||
key_role_type = None
|
key_role_type = None
|
||||||
|
digital_signature_algorithm = None
|
||||||
|
cryptographic_algorithm = None
|
||||||
|
# TODO: Need to implement other fields of CryptographicParameters (3.6)
|
||||||
|
|
||||||
if params is not None:
|
if params is not None:
|
||||||
|
|
||||||
|
@ -162,11 +165,23 @@ class AttributeValueFactory(object):
|
||||||
hashing_algorithm = attributes.HashingAlgorithm(
|
hashing_algorithm = attributes.HashingAlgorithm(
|
||||||
params.get("hashing_algorithm"))
|
params.get("hashing_algorithm"))
|
||||||
|
|
||||||
|
if 'digital_signature_algorithm' in params:
|
||||||
|
digital_signature_algorithm = \
|
||||||
|
attributes.CryptographicParameters. \
|
||||||
|
DigitalSignatureAlgorithm(
|
||||||
|
params.get("digital_signature_algorithm"))
|
||||||
|
|
||||||
|
if 'cryptographic_algorithm' in params:
|
||||||
|
cryptographic_algorithm = attributes.CryptographicAlgorithm(
|
||||||
|
params.get("cryptographic_algorithm"))
|
||||||
|
|
||||||
return attributes.CryptographicParameters(
|
return attributes.CryptographicParameters(
|
||||||
block_cipher_mode=bcm,
|
block_cipher_mode=bcm,
|
||||||
padding_method=padding_method,
|
padding_method=padding_method,
|
||||||
hashing_algorithm=hashing_algorithm,
|
hashing_algorithm=hashing_algorithm,
|
||||||
key_role_type=key_role_type)
|
key_role_type=key_role_type,
|
||||||
|
digital_signature_algorithm=digital_signature_algorithm,
|
||||||
|
cryptographic_algorithm=cryptographic_algorithm)
|
||||||
|
|
||||||
def _create_cryptographic_usage_mask(self, flags):
|
def _create_cryptographic_usage_mask(self, flags):
|
||||||
mask = None
|
mask = None
|
||||||
|
|
|
@ -28,6 +28,7 @@ from kmip.core.messages.payloads import query
|
||||||
from kmip.core.messages.payloads import rekey_key_pair
|
from kmip.core.messages.payloads import rekey_key_pair
|
||||||
from kmip.core.messages.payloads import register
|
from kmip.core.messages.payloads import register
|
||||||
from kmip.core.messages.payloads import revoke
|
from kmip.core.messages.payloads import revoke
|
||||||
|
from kmip.core.messages.payloads import mac
|
||||||
|
|
||||||
|
|
||||||
class RequestPayloadFactory(PayloadFactory):
|
class RequestPayloadFactory(PayloadFactory):
|
||||||
|
@ -70,3 +71,6 @@ class RequestPayloadFactory(PayloadFactory):
|
||||||
|
|
||||||
def _create_revoke_payload(self):
|
def _create_revoke_payload(self):
|
||||||
return revoke.RevokeRequestPayload()
|
return revoke.RevokeRequestPayload()
|
||||||
|
|
||||||
|
def _create_mac_payload(self):
|
||||||
|
return mac.MACRequestPayload()
|
||||||
|
|
|
@ -28,6 +28,7 @@ from kmip.core.messages.payloads import query
|
||||||
from kmip.core.messages.payloads import rekey_key_pair
|
from kmip.core.messages.payloads import rekey_key_pair
|
||||||
from kmip.core.messages.payloads import register
|
from kmip.core.messages.payloads import register
|
||||||
from kmip.core.messages.payloads import revoke
|
from kmip.core.messages.payloads import revoke
|
||||||
|
from kmip.core.messages.payloads import mac
|
||||||
|
|
||||||
|
|
||||||
class ResponsePayloadFactory(PayloadFactory):
|
class ResponsePayloadFactory(PayloadFactory):
|
||||||
|
@ -70,3 +71,6 @@ class ResponsePayloadFactory(PayloadFactory):
|
||||||
|
|
||||||
def _create_revoke_payload(self):
|
def _create_revoke_payload(self):
|
||||||
return revoke.RevokeResponsePayload()
|
return revoke.RevokeResponsePayload()
|
||||||
|
|
||||||
|
def _create_mac_payload(self):
|
||||||
|
return mac.MACResponsePayload()
|
||||||
|
|
|
@ -0,0 +1,207 @@
|
||||||
|
# Copyright (c) 2017 Pure Storage, Inc. All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from kmip.core import attributes
|
||||||
|
from kmip.core import enums
|
||||||
|
from kmip.core import exceptions
|
||||||
|
from kmip.core.enums import Tags
|
||||||
|
|
||||||
|
from kmip.core.objects import Data, MACData
|
||||||
|
|
||||||
|
from kmip.core.primitives import Struct
|
||||||
|
|
||||||
|
from kmip.core.utils import BytearrayStream
|
||||||
|
|
||||||
|
|
||||||
|
# 4.33
|
||||||
|
class MACRequestPayload(Struct):
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
unique_identifier=None,
|
||||||
|
cryptographic_parameters=None,
|
||||||
|
data=None):
|
||||||
|
|
||||||
|
super(MACRequestPayload, self).__init__(
|
||||||
|
tag=enums.Tags.REQUEST_PAYLOAD)
|
||||||
|
|
||||||
|
self._unique_identifier = None
|
||||||
|
self._cryptographic_parameters = None
|
||||||
|
self._data = None
|
||||||
|
|
||||||
|
self.unique_identifier = unique_identifier
|
||||||
|
self.cryptographic_parameters = cryptographic_parameters
|
||||||
|
self.data = data
|
||||||
|
|
||||||
|
@property
|
||||||
|
def unique_identifier(self):
|
||||||
|
return self._unique_identifier
|
||||||
|
|
||||||
|
@unique_identifier.setter
|
||||||
|
def unique_identifier(self, value):
|
||||||
|
if value is None:
|
||||||
|
self._unique_identifier = None
|
||||||
|
elif isinstance(value, attributes.UniqueIdentifier):
|
||||||
|
self._unique_identifier = value
|
||||||
|
else:
|
||||||
|
raise TypeError("unique identifier must be UniqueIdentifier type")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cryptographic_parameters(self):
|
||||||
|
return self._cryptographic_parameters
|
||||||
|
|
||||||
|
@cryptographic_parameters.setter
|
||||||
|
def cryptographic_parameters(self, value):
|
||||||
|
if value is None:
|
||||||
|
self._cryptographic_parameters = None
|
||||||
|
elif isinstance(value, attributes.CryptographicParameters):
|
||||||
|
self._cryptographic_parameters = value
|
||||||
|
else:
|
||||||
|
raise TypeError("cryptographic parameters must "
|
||||||
|
"be CryptographicParameters type")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def data(self):
|
||||||
|
return self._data
|
||||||
|
|
||||||
|
@data.setter
|
||||||
|
def data(self, value):
|
||||||
|
if value is None:
|
||||||
|
self._data = None
|
||||||
|
elif isinstance(value, Data):
|
||||||
|
self._data = value
|
||||||
|
else:
|
||||||
|
raise TypeError("data must be Data type")
|
||||||
|
|
||||||
|
def read(self, istream):
|
||||||
|
super(MACRequestPayload, self).read(istream)
|
||||||
|
tstream = BytearrayStream(istream.read(self.length))
|
||||||
|
|
||||||
|
if self.is_tag_next(Tags.UNIQUE_IDENTIFIER, tstream):
|
||||||
|
self.unique_identifier = attributes.UniqueIdentifier()
|
||||||
|
self.unique_identifier.read(tstream)
|
||||||
|
|
||||||
|
if self.is_tag_next(Tags.CRYPTOGRAPHIC_PARAMETERS, tstream):
|
||||||
|
self.cryptographic_parameters = \
|
||||||
|
attributes.CryptographicParameters()
|
||||||
|
self.cryptographic_parameters.read(tstream)
|
||||||
|
|
||||||
|
if self.is_tag_next(Tags.DATA, tstream):
|
||||||
|
self.data = Data()
|
||||||
|
self.data.read(tstream)
|
||||||
|
else:
|
||||||
|
raise exceptions.InvalidKmipEncoding(
|
||||||
|
"expected mac request data not found"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.is_oversized(tstream)
|
||||||
|
|
||||||
|
def write(self, ostream):
|
||||||
|
tstream = BytearrayStream()
|
||||||
|
|
||||||
|
if self._unique_identifier is not None:
|
||||||
|
self._unique_identifier.write(tstream)
|
||||||
|
if self._cryptographic_parameters is not None:
|
||||||
|
self._cryptographic_parameters.write(tstream)
|
||||||
|
if self._data is not None:
|
||||||
|
self.data.write(tstream)
|
||||||
|
else:
|
||||||
|
raise exceptions.InvalidField(
|
||||||
|
"The mac request data is required"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.length = tstream.length()
|
||||||
|
super(MACRequestPayload, self).write(ostream)
|
||||||
|
ostream.write(tstream.buffer)
|
||||||
|
|
||||||
|
|
||||||
|
class MACResponsePayload(Struct):
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
unique_identifier=None,
|
||||||
|
mac_data=None):
|
||||||
|
super(MACResponsePayload, self).__init__(
|
||||||
|
tag=enums.Tags.RESPONSE_PAYLOAD)
|
||||||
|
|
||||||
|
self._unique_identifier = None
|
||||||
|
self._mac_data = None
|
||||||
|
|
||||||
|
self.unique_identifier = unique_identifier
|
||||||
|
self.mac_data = mac_data
|
||||||
|
|
||||||
|
@property
|
||||||
|
def unique_identifier(self):
|
||||||
|
return self._unique_identifier
|
||||||
|
|
||||||
|
@unique_identifier.setter
|
||||||
|
def unique_identifier(self, value):
|
||||||
|
if value is None:
|
||||||
|
self._unique_identifier = None
|
||||||
|
elif isinstance(value, attributes.UniqueIdentifier):
|
||||||
|
self._unique_identifier = value
|
||||||
|
else:
|
||||||
|
raise TypeError("unique identifier must be UniqueIdentifier type")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def mac_data(self):
|
||||||
|
return self._mac_data
|
||||||
|
|
||||||
|
@mac_data.setter
|
||||||
|
def mac_data(self, value):
|
||||||
|
if value is None:
|
||||||
|
self._mac_data = None
|
||||||
|
elif isinstance(value, MACData):
|
||||||
|
self._mac_data = value
|
||||||
|
else:
|
||||||
|
raise TypeError("mac_data must be MACData type")
|
||||||
|
|
||||||
|
def read(self, istream):
|
||||||
|
super(MACResponsePayload, self).read(istream)
|
||||||
|
tstream = BytearrayStream(istream.read(self.length))
|
||||||
|
|
||||||
|
if self.is_tag_next(Tags.UNIQUE_IDENTIFIER, tstream):
|
||||||
|
self._unique_identifier = attributes.UniqueIdentifier()
|
||||||
|
self._unique_identifier.read(tstream)
|
||||||
|
else:
|
||||||
|
raise exceptions.InvalidKmipEncoding(
|
||||||
|
"expected mac response unique identifier not found"
|
||||||
|
)
|
||||||
|
|
||||||
|
if self.is_tag_next(Tags.MAC_DATA, tstream):
|
||||||
|
self._mac_data = MACData()
|
||||||
|
self._mac_data.read(tstream)
|
||||||
|
else:
|
||||||
|
raise exceptions.InvalidKmipEncoding(
|
||||||
|
"expected mac response mac data not found"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.is_oversized(tstream)
|
||||||
|
|
||||||
|
def write(self, ostream):
|
||||||
|
tstream = BytearrayStream()
|
||||||
|
|
||||||
|
if self._unique_identifier is not None:
|
||||||
|
self._unique_identifier.write(tstream)
|
||||||
|
else:
|
||||||
|
raise exceptions.InvalidField(
|
||||||
|
"The mac response unique identifier is required"
|
||||||
|
)
|
||||||
|
if self._mac_data is not None:
|
||||||
|
self._mac_data.write(tstream)
|
||||||
|
else:
|
||||||
|
raise exceptions.InvalidField(
|
||||||
|
"The mac response mac data is required"
|
||||||
|
)
|
||||||
|
self.length = tstream.length()
|
||||||
|
super(MACResponsePayload, self).write(ostream)
|
||||||
|
ostream.write(tstream.buffer)
|
|
@ -1216,6 +1216,20 @@ class ExtensionInformation(Struct):
|
||||||
extension_type=extension_type)
|
extension_type=extension_type)
|
||||||
|
|
||||||
|
|
||||||
|
# 2.1.10
|
||||||
|
class Data(ByteString):
|
||||||
|
|
||||||
|
def __init__(self, value=None):
|
||||||
|
super(Data, self).__init__(value, Tags.DATA)
|
||||||
|
|
||||||
|
|
||||||
|
# 2.1.13
|
||||||
|
class MACData(ByteString):
|
||||||
|
|
||||||
|
def __init__(self, value=None):
|
||||||
|
super(MACData, self).__init__(value, Tags.MAC_DATA)
|
||||||
|
|
||||||
|
|
||||||
# 3.31, 9.1.3.2.19
|
# 3.31, 9.1.3.2.19
|
||||||
class RevocationReasonCode(Enumeration):
|
class RevocationReasonCode(Enumeration):
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,8 @@ from kmip.core.enums import BlockCipherMode
|
||||||
from kmip.core.enums import CertificateTypeEnum
|
from kmip.core.enums import CertificateTypeEnum
|
||||||
from kmip.core.enums import HashingAlgorithm as HashingAlgorithmEnum
|
from kmip.core.enums import HashingAlgorithm as HashingAlgorithmEnum
|
||||||
from kmip.core.enums import KeyRoleType
|
from kmip.core.enums import KeyRoleType
|
||||||
|
from kmip.core.enums import DigitalSignatureAlgorithm
|
||||||
|
from kmip.core.enums import CryptographicAlgorithm
|
||||||
from kmip.core.enums import PaddingMethod
|
from kmip.core.enums import PaddingMethod
|
||||||
from kmip.core.enums import NameType
|
from kmip.core.enums import NameType
|
||||||
|
|
||||||
|
@ -483,6 +485,7 @@ class TestApplicationData(TestCase):
|
||||||
|
|
||||||
|
|
||||||
class TestCryptographicParameters(TestCase):
|
class TestCryptographicParameters(TestCase):
|
||||||
|
|
||||||
"""
|
"""
|
||||||
A test suite for the CryptographicParameters class
|
A test suite for the CryptographicParameters class
|
||||||
"""
|
"""
|
||||||
|
@ -498,25 +501,33 @@ class TestCryptographicParameters(TestCase):
|
||||||
{'block_cipher_mode': BlockCipherMode.CBC,
|
{'block_cipher_mode': BlockCipherMode.CBC,
|
||||||
'padding_method': PaddingMethod.PKCS5,
|
'padding_method': PaddingMethod.PKCS5,
|
||||||
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
|
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
|
||||||
'key_role_type': KeyRoleType.BDK})
|
'key_role_type': KeyRoleType.BDK,
|
||||||
|
'digital_signature_algorithm':
|
||||||
|
DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION,
|
||||||
|
'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512})
|
||||||
|
|
||||||
self.cp_none = self.factory.create_attribute_value(
|
self.cp_none = self.factory.create_attribute_value(
|
||||||
AttributeType.CRYPTOGRAPHIC_PARAMETERS, {})
|
AttributeType.CRYPTOGRAPHIC_PARAMETERS, {})
|
||||||
|
|
||||||
# Symmetric key object with Cryptographic Parameters
|
# Symmetric key object with Cryptographic Parameters
|
||||||
# Byte stream edited to add Key Role Type parameter
|
# Byte stream edited to add:
|
||||||
|
# Key Role Type parameter
|
||||||
|
# Digital Signature Algorithm parameter
|
||||||
|
# Cryptographic Algorithm parameter
|
||||||
# Based on the KMIP Spec 1.1 Test Cases document
|
# Based on the KMIP Spec 1.1 Test Cases document
|
||||||
# 11.1 page 255 on the pdf version
|
# 11.1 page 255 on the pdf version
|
||||||
self.key_req_with_crypt_params = BytearrayStream((
|
self.key_req_with_crypt_params = BytearrayStream((
|
||||||
b'\x42\x00\x2B\x01\x00\x00\x00\x40'
|
b'\x42\x00\x2B\x01\x00\x00\x00\x60'
|
||||||
b'\x42\x00\x11\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
|
b'\x42\x00\x11\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
|
||||||
b'\x42\x00\x5F\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00'
|
b'\x42\x00\x5F\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00'
|
||||||
b'\x42\x00\x38\x05\x00\x00\x00\x04\x00\x00\x00\x04\x00\x00\x00\x00'
|
b'\x42\x00\x38\x05\x00\x00\x00\x04\x00\x00\x00\x04\x00\x00\x00\x00'
|
||||||
b'\x42\x00\x83\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
|
b'\x42\x00\x83\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
|
||||||
|
b'\x42\x00\xAE\x05\x00\x00\x00\x04\x00\x00\x00\x05\x00\x00\x00\x00'
|
||||||
|
b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x0B\x00\x00\x00\x00'
|
||||||
))
|
))
|
||||||
|
|
||||||
def teardown(self):
|
def teardown(self):
|
||||||
super(TestDigestValue, self).tearDown()
|
super(TestCryptographicParameters, self).tearDown()
|
||||||
|
|
||||||
def test_write_crypto_params(self):
|
def test_write_crypto_params(self):
|
||||||
ostream = BytearrayStream()
|
ostream = BytearrayStream()
|
||||||
|
@ -546,6 +557,17 @@ class TestCryptographicParameters(TestCase):
|
||||||
self.assertEqual(HashingAlgorithmEnum.SHA_1.value,
|
self.assertEqual(HashingAlgorithmEnum.SHA_1.value,
|
||||||
self.cp.hashing_algorithm.value.value)
|
self.cp.hashing_algorithm.value.value)
|
||||||
|
|
||||||
|
self.assertEqual(Tags.DIGITAL_SIGNATURE_ALGORITHM.value,
|
||||||
|
self.cp.digital_signature_algorithm.tag.value)
|
||||||
|
self.assertEqual(DigitalSignatureAlgorithm.
|
||||||
|
SHA256_WITH_RSA_ENCRYPTION.value,
|
||||||
|
self.cp.digital_signature_algorithm.value.value)
|
||||||
|
|
||||||
|
self.assertEqual(Tags.CRYPTOGRAPHIC_ALGORITHM.value,
|
||||||
|
self.cp.cryptographic_algorithm.tag.value)
|
||||||
|
self.assertEqual(CryptographicAlgorithm.HMAC_SHA512.value,
|
||||||
|
self.cp.cryptographic_algorithm.value.value)
|
||||||
|
|
||||||
def test_bad_cipher_mode(self):
|
def test_bad_cipher_mode(self):
|
||||||
self.cp.block_cipher_mode = self.bad_enum_code
|
self.cp.block_cipher_mode = self.bad_enum_code
|
||||||
cp_valid = self.factory.create_attribute_value(
|
cp_valid = self.factory.create_attribute_value(
|
||||||
|
@ -553,7 +575,10 @@ class TestCryptographicParameters(TestCase):
|
||||||
{'block_cipher_mode': BlockCipherMode.CBC,
|
{'block_cipher_mode': BlockCipherMode.CBC,
|
||||||
'padding_method': PaddingMethod.PKCS5,
|
'padding_method': PaddingMethod.PKCS5,
|
||||||
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
|
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
|
||||||
'key_role_type': KeyRoleType.BDK})
|
'key_role_type': KeyRoleType.BDK,
|
||||||
|
'digital_signature_algorithm':
|
||||||
|
DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION,
|
||||||
|
'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512})
|
||||||
self.assertFalse(self.cp == cp_valid)
|
self.assertFalse(self.cp == cp_valid)
|
||||||
self.assertRaises(TypeError, self.cp.validate)
|
self.assertRaises(TypeError, self.cp.validate)
|
||||||
|
|
||||||
|
@ -564,7 +589,10 @@ class TestCryptographicParameters(TestCase):
|
||||||
{'block_cipher_mode': BlockCipherMode.CBC,
|
{'block_cipher_mode': BlockCipherMode.CBC,
|
||||||
'padding_method': PaddingMethod.PKCS5,
|
'padding_method': PaddingMethod.PKCS5,
|
||||||
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
|
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
|
||||||
'key_role_type': KeyRoleType.BDK})
|
'key_role_type': KeyRoleType.BDK,
|
||||||
|
'digital_signature_algorithm':
|
||||||
|
DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION,
|
||||||
|
'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512})
|
||||||
self.assertFalse(self.cp == cp_valid)
|
self.assertFalse(self.cp == cp_valid)
|
||||||
self.assertRaises(TypeError, self.cp.validate)
|
self.assertRaises(TypeError, self.cp.validate)
|
||||||
|
|
||||||
|
@ -575,7 +603,10 @@ class TestCryptographicParameters(TestCase):
|
||||||
{'block_cipher_mode': BlockCipherMode.CBC,
|
{'block_cipher_mode': BlockCipherMode.CBC,
|
||||||
'padding_method': PaddingMethod.PKCS5,
|
'padding_method': PaddingMethod.PKCS5,
|
||||||
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
|
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
|
||||||
'key_role_type': KeyRoleType.BDK})
|
'key_role_type': KeyRoleType.BDK,
|
||||||
|
'digital_signature_algorithm':
|
||||||
|
DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION,
|
||||||
|
'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512})
|
||||||
self.assertFalse(self.cp == cp_valid)
|
self.assertFalse(self.cp == cp_valid)
|
||||||
self.assertRaises(TypeError, self.cp.validate)
|
self.assertRaises(TypeError, self.cp.validate)
|
||||||
|
|
||||||
|
@ -586,6 +617,37 @@ class TestCryptographicParameters(TestCase):
|
||||||
{'block_cipher_mode': BlockCipherMode.CBC,
|
{'block_cipher_mode': BlockCipherMode.CBC,
|
||||||
'padding_method': PaddingMethod.PKCS5,
|
'padding_method': PaddingMethod.PKCS5,
|
||||||
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
|
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
|
||||||
'key_role_type': KeyRoleType.BDK})
|
'key_role_type': KeyRoleType.BDK,
|
||||||
|
'digital_signature_algorithm':
|
||||||
|
DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION,
|
||||||
|
'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512})
|
||||||
|
self.assertFalse(self.cp == cp_valid)
|
||||||
|
self.assertRaises(TypeError, self.cp.validate)
|
||||||
|
|
||||||
|
def test_bad_digital_signature_algorithm(self):
|
||||||
|
self.cp.digital_signature_algorithm = self.bad_enum_code
|
||||||
|
cp_valid = self.factory.create_attribute_value(
|
||||||
|
AttributeType.CRYPTOGRAPHIC_PARAMETERS,
|
||||||
|
{'block_cipher_mode': BlockCipherMode.CBC,
|
||||||
|
'padding_method': PaddingMethod.PKCS5,
|
||||||
|
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
|
||||||
|
'key_role_type': KeyRoleType.BDK,
|
||||||
|
'digital_signature_algorithm':
|
||||||
|
DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION,
|
||||||
|
'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512})
|
||||||
|
self.assertFalse(self.cp == cp_valid)
|
||||||
|
self.assertRaises(TypeError, self.cp.validate)
|
||||||
|
|
||||||
|
def test_bad_cryptographic_algorithm(self):
|
||||||
|
self.cp.cryptographic_algorithm = self.bad_enum_code
|
||||||
|
cp_valid = self.factory.create_attribute_value(
|
||||||
|
AttributeType.CRYPTOGRAPHIC_PARAMETERS,
|
||||||
|
{'block_cipher_mode': BlockCipherMode.CBC,
|
||||||
|
'padding_method': PaddingMethod.PKCS5,
|
||||||
|
'hashing_algorithm': HashingAlgorithmEnum.SHA_1,
|
||||||
|
'key_role_type': KeyRoleType.BDK,
|
||||||
|
'digital_signature_algorithm':
|
||||||
|
DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION,
|
||||||
|
'cryptographic_algorithm': CryptographicAlgorithm.HMAC_SHA512})
|
||||||
self.assertFalse(self.cp == cp_valid)
|
self.assertFalse(self.cp == cp_valid)
|
||||||
self.assertRaises(TypeError, self.cp.validate)
|
self.assertRaises(TypeError, self.cp.validate)
|
||||||
|
|
|
@ -31,6 +31,7 @@ from kmip.core.messages.payloads import query
|
||||||
from kmip.core.messages.payloads import rekey_key_pair
|
from kmip.core.messages.payloads import rekey_key_pair
|
||||||
from kmip.core.messages.payloads import register
|
from kmip.core.messages.payloads import register
|
||||||
from kmip.core.messages.payloads import revoke
|
from kmip.core.messages.payloads import revoke
|
||||||
|
from kmip.core.messages.payloads import mac
|
||||||
|
|
||||||
|
|
||||||
class TestRequestPayloadFactory(testtools.TestCase):
|
class TestRequestPayloadFactory(testtools.TestCase):
|
||||||
|
@ -228,7 +229,8 @@ class TestRequestPayloadFactory(testtools.TestCase):
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_create_mac_payload(self):
|
def test_create_mac_payload(self):
|
||||||
self._test_not_implemented(self.factory.create, enums.Operation.MAC)
|
payload = self.factory.create(enums.Operation.MAC)
|
||||||
|
self._test_payload_type(payload, mac.MACRequestPayload)
|
||||||
|
|
||||||
def test_create_mac_verify_payload(self):
|
def test_create_mac_verify_payload(self):
|
||||||
self._test_not_implemented(
|
self._test_not_implemented(
|
||||||
|
|
|
@ -31,6 +31,7 @@ from kmip.core.messages.payloads import query
|
||||||
from kmip.core.messages.payloads import rekey_key_pair
|
from kmip.core.messages.payloads import rekey_key_pair
|
||||||
from kmip.core.messages.payloads import register
|
from kmip.core.messages.payloads import register
|
||||||
from kmip.core.messages.payloads import revoke
|
from kmip.core.messages.payloads import revoke
|
||||||
|
from kmip.core.messages.payloads import mac
|
||||||
|
|
||||||
|
|
||||||
class TestResponsePayloadFactory(testtools.TestCase):
|
class TestResponsePayloadFactory(testtools.TestCase):
|
||||||
|
@ -226,7 +227,11 @@ class TestResponsePayloadFactory(testtools.TestCase):
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_create_mac_payload(self):
|
def test_create_mac_payload(self):
|
||||||
self._test_not_implemented(self.factory.create, enums.Operation.MAC)
|
payload = self.factory.create(enums.Operation.MAC)
|
||||||
|
self._test_payload_type(
|
||||||
|
payload,
|
||||||
|
mac.MACResponsePayload
|
||||||
|
)
|
||||||
|
|
||||||
def test_create_mac_verify_payload(self):
|
def test_create_mac_verify_payload(self):
|
||||||
self._test_not_implemented(
|
self._test_not_implemented(
|
||||||
|
|
|
@ -89,7 +89,11 @@ class TestAttributeValueFactory(testtools.TestCase):
|
||||||
'block_cipher_mode': enums.BlockCipherMode.NIST_KEY_WRAP,
|
'block_cipher_mode': enums.BlockCipherMode.NIST_KEY_WRAP,
|
||||||
'padding_method': enums.PaddingMethod.ANSI_X9_23,
|
'padding_method': enums.PaddingMethod.ANSI_X9_23,
|
||||||
'key_role_type': enums.KeyRoleType.KEK,
|
'key_role_type': enums.KeyRoleType.KEK,
|
||||||
'hashing_algorithm': enums.HashingAlgorithm.SHA_512}
|
'hashing_algorithm': enums.HashingAlgorithm.SHA_512,
|
||||||
|
'digital_signature_algorithm':
|
||||||
|
enums.DigitalSignatureAlgorithm.ECDSA_WITH_SHA512,
|
||||||
|
'cryptographic_algorithm':
|
||||||
|
enums.CryptographicAlgorithm.HMAC_SHA512}
|
||||||
params = self.factory.create_attribute_value(
|
params = self.factory.create_attribute_value(
|
||||||
enums.AttributeType.CRYPTOGRAPHIC_PARAMETERS, value)
|
enums.AttributeType.CRYPTOGRAPHIC_PARAMETERS, value)
|
||||||
|
|
||||||
|
@ -110,6 +114,14 @@ class TestAttributeValueFactory(testtools.TestCase):
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
attributes.HashingAlgorithm(enums.HashingAlgorithm.SHA_512),
|
attributes.HashingAlgorithm(enums.HashingAlgorithm.SHA_512),
|
||||||
params.hashing_algorithm)
|
params.hashing_algorithm)
|
||||||
|
self.assertEqual(
|
||||||
|
attributes.CryptographicParameters.DigitalSignatureAlgorithm(
|
||||||
|
enums.DigitalSignatureAlgorithm.ECDSA_WITH_SHA512),
|
||||||
|
params.digital_signature_algorithm)
|
||||||
|
self.assertEqual(
|
||||||
|
attributes.CryptographicAlgorithm(
|
||||||
|
enums.CryptographicAlgorithm.HMAC_SHA512),
|
||||||
|
params.cryptographic_algorithm)
|
||||||
|
|
||||||
def test_create_cryptographic_domain_parameters(self):
|
def test_create_cryptographic_domain_parameters(self):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -0,0 +1,298 @@
|
||||||
|
# Copyright (c) 2017 Pure Storage, Inc. All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from testtools import TestCase
|
||||||
|
|
||||||
|
from kmip.core import attributes
|
||||||
|
from kmip.core import objects
|
||||||
|
from kmip.core import utils
|
||||||
|
from kmip.core import enums
|
||||||
|
from kmip.core import exceptions
|
||||||
|
|
||||||
|
from kmip.core.messages.payloads import mac
|
||||||
|
|
||||||
|
|
||||||
|
class TestMACRequestPayload(TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestMACRequestPayload, self).setUp()
|
||||||
|
|
||||||
|
self.unique_identifier = attributes.UniqueIdentifier(value='1')
|
||||||
|
self.cryptographic_parameters = \
|
||||||
|
attributes.CryptographicParameters(
|
||||||
|
cryptographic_algorithm=attributes.CryptographicAlgorithm(
|
||||||
|
enums.CryptographicAlgorithm.HMAC_SHA512)
|
||||||
|
)
|
||||||
|
self.data = objects.Data(
|
||||||
|
value=(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B'
|
||||||
|
b'\x0C\x0D\x0E\x0F')
|
||||||
|
)
|
||||||
|
|
||||||
|
self.encoding_full = utils.BytearrayStream((
|
||||||
|
b'\x42\x00\x79\x01\x00\x00\x00\x40\x42\x00\x94\x07\x00\x00\x00\x01'
|
||||||
|
b'\x31\x00\x00\x00\x00\x00\x00\x00\x42\x00\x2b\x01\x00\x00\x00\x10'
|
||||||
|
b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x0b\x00\x00\x00\x00'
|
||||||
|
b'\x42\x00\xc2\x08\x00\x00\x00\x10\x00\x01\x02\x03\x04\x05\x06\x07'
|
||||||
|
b'\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f'))
|
||||||
|
self.encoding_no_data = utils.BytearrayStream((
|
||||||
|
b'\x42\x00\x79\x01\x00\x00\x00\x28\x42\x00\x94\x07\x00\x00\x00\x01'
|
||||||
|
b'\x31\x00\x00\x00\x00\x00\x00\x00\x42\x00\x2b\x01\x00\x00\x00\x10'
|
||||||
|
b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x0b\x00\x00\x00\x00'
|
||||||
|
))
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super(TestMACRequestPayload, self).tearDown()
|
||||||
|
|
||||||
|
def test_init_with_none(self):
|
||||||
|
mac.MACRequestPayload()
|
||||||
|
|
||||||
|
def test_init_valid(self):
|
||||||
|
"""
|
||||||
|
Test that the payload can be properly constructed and the attributes
|
||||||
|
cab be properly set and retrieved.
|
||||||
|
"""
|
||||||
|
payload = mac.MACRequestPayload(
|
||||||
|
self.unique_identifier,
|
||||||
|
self.cryptographic_parameters,
|
||||||
|
self.data)
|
||||||
|
self.assertEqual(payload.unique_identifier, self.unique_identifier)
|
||||||
|
self.assertEqual(payload.cryptographic_parameters,
|
||||||
|
self.cryptographic_parameters)
|
||||||
|
self.assertEqual(payload.data, self.data)
|
||||||
|
|
||||||
|
def test_init_with_invalid_unique_identifier(self):
|
||||||
|
kwargs = {'unique_identifier': 'invalid',
|
||||||
|
'cryptographic_parameters': None,
|
||||||
|
'data': None}
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
TypeError, "unique identifier must be UniqueIdentifier type",
|
||||||
|
mac.MACRequestPayload, **kwargs)
|
||||||
|
|
||||||
|
def test_init_with_invalid_cryptographic_parameters(self):
|
||||||
|
kwargs = {'unique_identifier': None,
|
||||||
|
'cryptographic_parameters': 'invalid',
|
||||||
|
'data': None}
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
TypeError,
|
||||||
|
"cryptographic parameters must be CryptographicParameters type",
|
||||||
|
mac.MACRequestPayload, **kwargs)
|
||||||
|
|
||||||
|
def test_init_with_invalid_data(self):
|
||||||
|
kwargs = {'unique_identifier': None,
|
||||||
|
'cryptographic_parameters': None,
|
||||||
|
'data': 'invalid'}
|
||||||
|
self.assertRaises(
|
||||||
|
TypeError, "data must be Data type",
|
||||||
|
mac.MACRequestPayload, **kwargs)
|
||||||
|
|
||||||
|
def test_read_valid(self):
|
||||||
|
stream = self.encoding_full
|
||||||
|
payload = mac.MACRequestPayload()
|
||||||
|
payload.read(stream)
|
||||||
|
|
||||||
|
self.assertEqual(self.unique_identifier, payload.unique_identifier)
|
||||||
|
self.assertEqual(self.cryptographic_parameters,
|
||||||
|
payload.cryptographic_parameters)
|
||||||
|
self.assertEqual(self.data, payload.data)
|
||||||
|
|
||||||
|
def test_read_no_data(self):
|
||||||
|
"""
|
||||||
|
Test that an InvalidKmipEncoding error gets raised when attempting to
|
||||||
|
read a mac request encoding with no data.
|
||||||
|
"""
|
||||||
|
payload = mac.MACRequestPayload()
|
||||||
|
args = (self.encoding_no_data,)
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
exceptions.InvalidKmipEncoding,
|
||||||
|
"expected mac request data not found",
|
||||||
|
payload.read,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_write_valid(self):
|
||||||
|
expected = self.encoding_full
|
||||||
|
|
||||||
|
stream = utils.BytearrayStream()
|
||||||
|
payload = mac.MACRequestPayload(
|
||||||
|
self.unique_identifier,
|
||||||
|
self.cryptographic_parameters,
|
||||||
|
self.data)
|
||||||
|
payload.write(stream)
|
||||||
|
|
||||||
|
self.assertEqual(expected, stream)
|
||||||
|
|
||||||
|
def test_write_with_no_data(self):
|
||||||
|
"""
|
||||||
|
Test that an InvalidField error gets raised when attempting to
|
||||||
|
write a mac request with no data.
|
||||||
|
"""
|
||||||
|
stream = utils.BytearrayStream()
|
||||||
|
payload = mac.MACRequestPayload(
|
||||||
|
self.unique_identifier,
|
||||||
|
self.cryptographic_parameters,
|
||||||
|
None)
|
||||||
|
args = (stream,)
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
exceptions.InvalidField,
|
||||||
|
"The mac request data is required",
|
||||||
|
payload.write,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestMACResponsePayload(TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestMACResponsePayload, self).setUp()
|
||||||
|
|
||||||
|
self.unique_identifier = attributes.UniqueIdentifier(value='1')
|
||||||
|
self.mac_data = objects.MACData(value=(
|
||||||
|
b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb\x32\x9f'
|
||||||
|
b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2\x2a\x5b'
|
||||||
|
b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe\xc8\x23'
|
||||||
|
b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51\xff\x7c'
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.encoding_full = utils.BytearrayStream((
|
||||||
|
b'\x42\x00\x7c\x01\x00\x00\x00\x58\x42\x00\x94\x07\x00\x00\x00\x01'
|
||||||
|
b'\x31\x00\x00\x00\x00\x00\x00\x00\x42\x00\xc6\x08\x00\x00\x00\x40'
|
||||||
|
b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb\x32\x9f'
|
||||||
|
b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2\x2a\x5b'
|
||||||
|
b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe\xc8\x23'
|
||||||
|
b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51\xff\x7c'
|
||||||
|
))
|
||||||
|
self.encoding_no_unique_identifier = utils.BytearrayStream((
|
||||||
|
b'\x42\x00\x7c\x01\x00\x00\x00\x48\x42\x00\xc6\x08\x00\x00\x00\x40'
|
||||||
|
b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb\x32\x9f'
|
||||||
|
b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2\x2a\x5b'
|
||||||
|
b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe\xc8\x23'
|
||||||
|
b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51\xff\x7c'
|
||||||
|
))
|
||||||
|
self.encoding_no_mac_data = utils.BytearrayStream((
|
||||||
|
b'\x42\x00\x7c\x01\x00\x00\x00\x10\x42\x00\x94\x07\x00\x00\x00\x01'
|
||||||
|
b'\x31\x00\x00\x00\x00\x00\x00\x00'
|
||||||
|
))
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super(TestMACResponsePayload, self).tearDown()
|
||||||
|
|
||||||
|
def test_init_with_none(self):
|
||||||
|
mac.MACResponsePayload()
|
||||||
|
|
||||||
|
def test_init_valid(self):
|
||||||
|
"""
|
||||||
|
Test that the payload can be properly constructed and the attributes
|
||||||
|
can be properly set and retrieved.
|
||||||
|
"""
|
||||||
|
payload = mac.MACResponsePayload(
|
||||||
|
self.unique_identifier,
|
||||||
|
self.mac_data)
|
||||||
|
self.assertEqual(payload.unique_identifier, self.unique_identifier)
|
||||||
|
self.assertEqual(payload.mac_data, self.mac_data)
|
||||||
|
|
||||||
|
def test_init_with_invalid_unique_identifier(self):
|
||||||
|
kwargs = {'unique_identifier': 'invalid',
|
||||||
|
'mac_data': None}
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
TypeError, "unique identifier must be UniqueIdentifier type",
|
||||||
|
mac.MACResponsePayload, **kwargs)
|
||||||
|
|
||||||
|
def test_init_with_invalid_mac_data(self):
|
||||||
|
kwargs = {'unique_identifier': None,
|
||||||
|
'mac_data': 'invalid'}
|
||||||
|
self.assertRaises(
|
||||||
|
TypeError, "data must be MACData type",
|
||||||
|
mac.MACResponsePayload, **kwargs)
|
||||||
|
|
||||||
|
def test_read_valid(self):
|
||||||
|
stream = self.encoding_full
|
||||||
|
payload = mac.MACResponsePayload()
|
||||||
|
payload.read(stream)
|
||||||
|
|
||||||
|
self.assertEqual(self.unique_identifier, payload.unique_identifier)
|
||||||
|
self.assertEqual(self.mac_data, payload.mac_data)
|
||||||
|
|
||||||
|
def test_read_no_unique_identifier(self):
|
||||||
|
"""
|
||||||
|
Test that an InvalidKmipEncoding error gets raised when attempting to
|
||||||
|
read a mac response encoding with no unique identifier.
|
||||||
|
"""
|
||||||
|
payload = mac.MACResponsePayload()
|
||||||
|
args = (self.encoding_no_unique_identifier,)
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
exceptions.InvalidKmipEncoding,
|
||||||
|
"expected mac response unique identifier not found",
|
||||||
|
payload.read,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_read_no_mac_data(self):
|
||||||
|
"""
|
||||||
|
Test that an InvalidKmipEncoding error gets raised when attempting to
|
||||||
|
read a mac response encoding with no mac data.
|
||||||
|
"""
|
||||||
|
payload = mac.MACResponsePayload()
|
||||||
|
args = (self.encoding_no_mac_data,)
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
exceptions.InvalidKmipEncoding,
|
||||||
|
"expected mac response mac data not found",
|
||||||
|
payload.read,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_write_valid(self):
|
||||||
|
expected = self.encoding_full
|
||||||
|
|
||||||
|
stream = utils.BytearrayStream()
|
||||||
|
payload = mac.MACResponsePayload(
|
||||||
|
self.unique_identifier,
|
||||||
|
self.mac_data)
|
||||||
|
payload.write(stream)
|
||||||
|
|
||||||
|
self.assertEqual(expected, stream)
|
||||||
|
|
||||||
|
def test_write_with_no_unique_identifier(self):
|
||||||
|
"""
|
||||||
|
Test that an InvalidField error gets raised when attempting to
|
||||||
|
write a mac response with no unique identifier.
|
||||||
|
"""
|
||||||
|
stream = utils.BytearrayStream()
|
||||||
|
payload = mac.MACResponsePayload(
|
||||||
|
None,
|
||||||
|
self.mac_data)
|
||||||
|
args = (stream,)
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
exceptions.InvalidField,
|
||||||
|
"The mac response unique identifier is required",
|
||||||
|
payload.write,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_write_with_no_data(self):
|
||||||
|
"""
|
||||||
|
Test that an InvalidField error gets raised when attempting to
|
||||||
|
write a mac response with no mac data.
|
||||||
|
"""
|
||||||
|
stream = utils.BytearrayStream()
|
||||||
|
payload = mac.MACResponsePayload(
|
||||||
|
self.unique_identifier,
|
||||||
|
None)
|
||||||
|
args = (stream,)
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
exceptions.InvalidField,
|
||||||
|
"The mac response mac data is required",
|
||||||
|
payload.write,
|
||||||
|
*args
|
||||||
|
)
|
|
@ -14,6 +14,7 @@
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from testtools import TestCase
|
from testtools import TestCase
|
||||||
|
import binascii
|
||||||
|
|
||||||
from kmip.core.factories.keys import KeyFactory
|
from kmip.core.factories.keys import KeyFactory
|
||||||
from kmip.core.factories.secrets import SecretFactory
|
from kmip.core.factories.secrets import SecretFactory
|
||||||
|
@ -47,6 +48,7 @@ from kmip.core.messages.payloads import get
|
||||||
from kmip.core.messages.payloads import register
|
from kmip.core.messages.payloads import register
|
||||||
from kmip.core.messages.payloads import locate
|
from kmip.core.messages.payloads import locate
|
||||||
from kmip.core.messages.payloads import destroy
|
from kmip.core.messages.payloads import destroy
|
||||||
|
from kmip.core.messages.payloads import mac
|
||||||
|
|
||||||
from kmip.core.misc import KeyFormatType
|
from kmip.core.misc import KeyFormatType
|
||||||
from kmip.core.primitives import TextString
|
from kmip.core.primitives import TextString
|
||||||
|
@ -155,6 +157,18 @@ class TestRequestMessage(TestCase):
|
||||||
b'\x42\x00\x0b\x01\x00\x00\x00\x20\x42\x00\x55\x07\x00\x00\x00\x04'
|
b'\x42\x00\x0b\x01\x00\x00\x00\x20\x42\x00\x55\x07\x00\x00\x00\x04'
|
||||||
b'\x4b\x65\x79\x31\x00\x00\x00\x00\x42\x00\x54\x05\x00\x00\x00\x04'
|
b'\x4b\x65\x79\x31\x00\x00\x00\x00\x42\x00\x54\x05\x00\x00\x00\x04'
|
||||||
b'\x00\x00\x00\x01\x00\x00\x00\x00')
|
b'\x00\x00\x00\x01\x00\x00\x00\x00')
|
||||||
|
self.mac = (
|
||||||
|
b'\x42\x00\x78\x01\x00\x00\x00\xa0\x42\x00\x77\x01\x00\x00\x00\x38'
|
||||||
|
b'\x42\x00\x69\x01\x00\x00\x00\x20\x42\x00\x6a\x02\x00\x00\x00\x04'
|
||||||
|
b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x6b\x02\x00\x00\x00\x04'
|
||||||
|
b'\x00\x00\x00\x02\x00\x00\x00\x00\x42\x00\x0d\x02\x00\x00\x00\x04'
|
||||||
|
b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x0f\x01\x00\x00\x00\x58'
|
||||||
|
b'\x42\x00\x5c\x05\x00\x00\x00\x04\x00\x00\x00\x23\x00\x00\x00\x00'
|
||||||
|
b'\x42\x00\x79\x01\x00\x00\x00\x40\x42\x00\x94\x07\x00\x00\x00\x01'
|
||||||
|
b'\x31\x00\x00\x00\x00\x00\x00\x00\x42\x00\x2b\x01\x00\x00\x00\x10'
|
||||||
|
b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x0b\x00\x00\x00\x00'
|
||||||
|
b'\x42\x00\xc2\x08\x00\x00\x00\x10\x00\x01\x02\x03\x04\x05\x06\x07'
|
||||||
|
b'\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f')
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
super(TestRequestMessage, self).tearDown()
|
super(TestRequestMessage, self).tearDown()
|
||||||
|
@ -966,6 +980,164 @@ class TestRequestMessage(TestCase):
|
||||||
'Key1',
|
'Key1',
|
||||||
attribute_value.name_value.value))
|
attribute_value.name_value.value))
|
||||||
|
|
||||||
|
def test_mac_request_read(self):
|
||||||
|
self.stream = BytearrayStream(self.mac)
|
||||||
|
|
||||||
|
request_message = messages.RequestMessage()
|
||||||
|
request_message.read(self.stream)
|
||||||
|
|
||||||
|
request_header = request_message.request_header
|
||||||
|
msg = "Bad request header type: expected {0}, received{0}"
|
||||||
|
self.assertIsInstance(request_header, messages.RequestHeader,
|
||||||
|
msg.format(messages.RequestHeader,
|
||||||
|
type(request_header)))
|
||||||
|
|
||||||
|
protocol_version = request_header.protocol_version
|
||||||
|
msg = "Bad protocol version type: expected {0}, received {1}"
|
||||||
|
self.assertIsInstance(protocol_version, contents.ProtocolVersion,
|
||||||
|
msg.format(contents.ProtocolVersion,
|
||||||
|
type(protocol_version)))
|
||||||
|
|
||||||
|
protocol_version_major = protocol_version.protocol_version_major
|
||||||
|
msg = "Bad protocol version major type: expected {0}, received {1}"
|
||||||
|
exp_type = contents.ProtocolVersion.ProtocolVersionMajor
|
||||||
|
rcv_type = type(protocol_version_major)
|
||||||
|
self.assertIsInstance(protocol_version_major, exp_type,
|
||||||
|
msg.format(exp_type, rcv_type))
|
||||||
|
msg = "Bad protocol version major value: expected {0}, received {1}"
|
||||||
|
self.assertEqual(1, protocol_version_major.value,
|
||||||
|
msg.format(1, protocol_version_major.value))
|
||||||
|
|
||||||
|
protocol_version_minor = protocol_version.protocol_version_minor
|
||||||
|
msg = "Bad protocol version minor type: expected {0}, received {1}"
|
||||||
|
exp_type = contents.ProtocolVersion.ProtocolVersionMinor
|
||||||
|
rcv_type = type(protocol_version_minor)
|
||||||
|
self.assertIsInstance(protocol_version_minor, exp_type,
|
||||||
|
msg.format(exp_type, rcv_type))
|
||||||
|
msg = "Bad protocol version minor value: expected {0}, received {1}"
|
||||||
|
self.assertEqual(2, protocol_version_minor.value,
|
||||||
|
msg.format(2, protocol_version_minor.value))
|
||||||
|
|
||||||
|
batch_count = request_header.batch_count
|
||||||
|
msg = "Bad batch count type: expected {0}, received {1}"
|
||||||
|
self.assertIsInstance(batch_count, contents.BatchCount,
|
||||||
|
msg.format(contents.BatchCount,
|
||||||
|
type(batch_count)))
|
||||||
|
msg = "Bad batch count value: expected {0}, received {1}"
|
||||||
|
self.assertEqual(1, batch_count.value,
|
||||||
|
msg.format(1, batch_count.value))
|
||||||
|
|
||||||
|
batch_items = request_message.batch_items
|
||||||
|
msg = "Bad batch items type: expected {0}, received {1}"
|
||||||
|
self.assertIsInstance(batch_items, list,
|
||||||
|
msg.format(list, type(batch_items)))
|
||||||
|
self.assertEquals(1, len(batch_items),
|
||||||
|
self.msg.format('batch items', 'length',
|
||||||
|
1, len(batch_items)))
|
||||||
|
|
||||||
|
batch_item = batch_items[0]
|
||||||
|
msg = "Bad batch item type: expected {0}, received {1}"
|
||||||
|
self.assertIsInstance(batch_item, messages.RequestBatchItem,
|
||||||
|
msg.format(messages.RequestBatchItem,
|
||||||
|
type(batch_item)))
|
||||||
|
|
||||||
|
operation = batch_item.operation
|
||||||
|
msg = "Bad operation type: expected {0}, received {1}"
|
||||||
|
self.assertIsInstance(operation, contents.Operation,
|
||||||
|
msg.format(contents.Operation,
|
||||||
|
type(operation)))
|
||||||
|
msg = "Bad operation value: expected {0}, received {1}"
|
||||||
|
self.assertEqual(enums.Operation.MAC, operation.value,
|
||||||
|
msg.format(enums.Operation.MAC,
|
||||||
|
operation.value))
|
||||||
|
|
||||||
|
request_payload = batch_item.request_payload
|
||||||
|
msg = "Bad request payload type: expected {0}, received {1}"
|
||||||
|
self.assertIsInstance(request_payload,
|
||||||
|
mac.MACRequestPayload,
|
||||||
|
msg.format(mac.MACRequestPayload,
|
||||||
|
type(request_payload)))
|
||||||
|
|
||||||
|
unique_identifier = request_payload.unique_identifier
|
||||||
|
msg = "Bad unique identifier type: expected {0}, received {1}"
|
||||||
|
self.assertIsInstance(unique_identifier, attr.UniqueIdentifier,
|
||||||
|
msg.format(attr.UniqueIdentifier,
|
||||||
|
type(unique_identifier)))
|
||||||
|
msg = "Bad unique identifier value: expected {0}, received {1}"
|
||||||
|
self.assertEqual('1', unique_identifier.value,
|
||||||
|
msg.format('1', unique_identifier.value))
|
||||||
|
|
||||||
|
parameters_attribute = request_payload.cryptographic_parameters
|
||||||
|
msg = "Bad cryptographic parameters type: expected {0}, received {1}"
|
||||||
|
self.assertIsInstance(parameters_attribute,
|
||||||
|
attr.CryptographicParameters,
|
||||||
|
msg.format(attr.CryptographicParameters,
|
||||||
|
type(parameters_attribute)))
|
||||||
|
|
||||||
|
cryptographic_algorithm = parameters_attribute.cryptographic_algorithm
|
||||||
|
msg = "Bad cryptographic algorithm type: expected {0}, received {1}"
|
||||||
|
self.assertIsInstance(cryptographic_algorithm,
|
||||||
|
attr.CryptographicAlgorithm,
|
||||||
|
msg.format(attr.CryptographicAlgorithm,
|
||||||
|
type(cryptographic_algorithm)))
|
||||||
|
msg = "Bad cryptographic algorithm value: expected {0}, received {1}"
|
||||||
|
self.assertEquals(cryptographic_algorithm.value,
|
||||||
|
enums.CryptographicAlgorithm.HMAC_SHA512,
|
||||||
|
msg.format(cryptographic_algorithm.value,
|
||||||
|
enums.CryptographicAlgorithm.HMAC_SHA512))
|
||||||
|
|
||||||
|
data = request_payload.data
|
||||||
|
msg = "Bad data type: expected {0}, received {1}"
|
||||||
|
self.assertIsInstance(data, objects.Data, msg.format(objects.Data,
|
||||||
|
type(data)))
|
||||||
|
exp_value = (b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B'
|
||||||
|
b'\x0C\x0D\x0E\x0F')
|
||||||
|
msg = "Bad data value: expected {0}, received {1}"
|
||||||
|
self.assertEqual(
|
||||||
|
exp_value, data.value,
|
||||||
|
msg.format(binascii.hexlify(exp_value),
|
||||||
|
binascii.hexlify(data.value))
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_mac_request_write(self):
|
||||||
|
prot_ver = contents.ProtocolVersion.create(1, 2)
|
||||||
|
|
||||||
|
batch_count = contents.BatchCount(1)
|
||||||
|
req_header = messages.RequestHeader(protocol_version=prot_ver,
|
||||||
|
batch_count=batch_count)
|
||||||
|
|
||||||
|
operation = contents.Operation(enums.Operation.MAC)
|
||||||
|
|
||||||
|
uuid = attr.UniqueIdentifier('1')
|
||||||
|
data = objects.Data(
|
||||||
|
value=(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B'
|
||||||
|
b'\x0C\x0D\x0E\x0F')
|
||||||
|
)
|
||||||
|
parameters_attribute = attr.CryptographicParameters(
|
||||||
|
cryptographic_algorithm=attr.
|
||||||
|
CryptographicAlgorithm(enums.CryptographicAlgorithm.HMAC_SHA512)
|
||||||
|
)
|
||||||
|
request_payload = mac.MACRequestPayload(
|
||||||
|
unique_identifier=uuid,
|
||||||
|
cryptographic_parameters=parameters_attribute,
|
||||||
|
data=data
|
||||||
|
)
|
||||||
|
batch_item = messages.RequestBatchItem(operation=operation,
|
||||||
|
request_payload=request_payload)
|
||||||
|
request_message = messages.RequestMessage(request_header=req_header,
|
||||||
|
batch_items=[batch_item])
|
||||||
|
request_message.write(self.stream)
|
||||||
|
|
||||||
|
result = self.stream.read()
|
||||||
|
len_exp = len(self.mac)
|
||||||
|
len_rcv = len(result)
|
||||||
|
self.assertEqual(len_exp, len_rcv,
|
||||||
|
self.msg.format('request message', 'write',
|
||||||
|
len_exp, len_rcv))
|
||||||
|
|
||||||
|
msg = "Bad request message write: encoding mismatch"
|
||||||
|
self.assertEqual(self.mac, result, msg)
|
||||||
|
|
||||||
|
|
||||||
class TestResponseMessage(TestCase):
|
class TestResponseMessage(TestCase):
|
||||||
|
|
||||||
|
@ -1050,6 +1222,22 @@ class TestResponseMessage(TestCase):
|
||||||
b'\x34\x39\x61\x31\x63\x61\x38\x38\x2d\x36\x62\x65\x61\x2d\x34\x66'
|
b'\x34\x39\x61\x31\x63\x61\x38\x38\x2d\x36\x62\x65\x61\x2d\x34\x66'
|
||||||
b'\x62\x32\x2d\x62\x34\x35\x30\x2d\x37\x65\x35\x38\x38\x30\x32\x63'
|
b'\x62\x32\x2d\x62\x34\x35\x30\x2d\x37\x65\x35\x38\x38\x30\x32\x63'
|
||||||
b'\x33\x30\x33\x38\x00\x00\x00\x00')
|
b'\x33\x30\x33\x38\x00\x00\x00\x00')
|
||||||
|
self.mac = (
|
||||||
|
b'\x42\x00\x7b\x01\x00\x00\x00\xd8\x42\x00\x7a\x01\x00\x00\x00\x48'
|
||||||
|
b'\x42\x00\x69\x01\x00\x00\x00\x20\x42\x00\x6a\x02\x00\x00\x00\x04'
|
||||||
|
b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x6b\x02\x00\x00\x00\x04'
|
||||||
|
b'\x00\x00\x00\x02\x00\x00\x00\x00\x42\x00\x92\x09\x00\x00\x00\x08'
|
||||||
|
b'\x00\x00\x00\x00\x58\x8a\x3f\x23\x42\x00\x0d\x02\x00\x00\x00\x04'
|
||||||
|
b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x0f\x01\x00\x00\x00\x80'
|
||||||
|
b'\x42\x00\x5c\x05\x00\x00\x00\x04\x00\x00\x00\x23\x00\x00\x00\x00'
|
||||||
|
b'\x42\x00\x7f\x05\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
|
b'\x42\x00\x7c\x01\x00\x00\x00\x58\x42\x00\x94\x07\x00\x00\x00\x01'
|
||||||
|
b'\x31\x00\x00\x00\x00\x00\x00\x00\x42\x00\xc6\x08\x00\x00\x00\x40'
|
||||||
|
b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb\x32\x9f'
|
||||||
|
b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2\x2a\x5b'
|
||||||
|
b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe\xc8\x23'
|
||||||
|
b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51\xff\x7c'
|
||||||
|
)
|
||||||
self.invalid_message_response = (
|
self.invalid_message_response = (
|
||||||
b'\x42\x00\x7b\x01\x00\x00\x00\xb0\x42\x00\x7a\x01\x00\x00\x00\x48'
|
b'\x42\x00\x7b\x01\x00\x00\x00\xb0\x42\x00\x7a\x01\x00\x00\x00\x48'
|
||||||
b'\x42\x00\x69\x01\x00\x00\x00\x20\x42\x00\x6a\x02\x00\x00\x00\x04'
|
b'\x42\x00\x69\x01\x00\x00\x00\x20\x42\x00\x6a\x02\x00\x00\x00\x04'
|
||||||
|
@ -1775,6 +1963,175 @@ class TestResponseMessage(TestCase):
|
||||||
msg = "Bad response message write: encoding mismatch"
|
msg = "Bad response message write: encoding mismatch"
|
||||||
self.assertEqual(self.locate, result, msg)
|
self.assertEqual(self.locate, result, msg)
|
||||||
|
|
||||||
|
def test_mac_response_read(self):
|
||||||
|
self.stream = BytearrayStream(self.mac)
|
||||||
|
|
||||||
|
response_message = messages.ResponseMessage()
|
||||||
|
response_message.read(self.stream)
|
||||||
|
|
||||||
|
response_header = response_message.response_header
|
||||||
|
self.assertIsInstance(response_header, messages.ResponseHeader,
|
||||||
|
self.msg.format('response header', 'type',
|
||||||
|
messages.ResponseHeader,
|
||||||
|
type(response_header)))
|
||||||
|
protocol_version = response_header.protocol_version
|
||||||
|
self.assertIsInstance(protocol_version, contents.ProtocolVersion,
|
||||||
|
self.msg.format('response header', 'value',
|
||||||
|
contents.ProtocolVersion,
|
||||||
|
type(protocol_version)))
|
||||||
|
|
||||||
|
protocol_version_major = protocol_version.protocol_version_major
|
||||||
|
exp_type = contents.ProtocolVersion.ProtocolVersionMajor
|
||||||
|
rcv_type = type(protocol_version_major)
|
||||||
|
self.assertIsInstance(protocol_version_major, exp_type,
|
||||||
|
self.msg.format('protocol version major', 'type',
|
||||||
|
exp_type, rcv_type))
|
||||||
|
self.assertEqual(1, protocol_version_major.value,
|
||||||
|
self.msg.format('protocol version major', 'value',
|
||||||
|
1, protocol_version_major.value))
|
||||||
|
|
||||||
|
protocol_version_minor = protocol_version.protocol_version_minor
|
||||||
|
exp_type = contents.ProtocolVersion.ProtocolVersionMinor
|
||||||
|
rcv_type = type(protocol_version_minor)
|
||||||
|
self.assertIsInstance(protocol_version_minor, exp_type,
|
||||||
|
self.msg.format('protocol version minor', 'type',
|
||||||
|
exp_type, rcv_type))
|
||||||
|
self.assertEqual(2, protocol_version_minor.value,
|
||||||
|
self.msg.format('protocol version minor', 'value',
|
||||||
|
2, protocol_version_minor.value))
|
||||||
|
|
||||||
|
time_stamp = response_header.time_stamp
|
||||||
|
value = 0x588a3f23
|
||||||
|
self.assertIsInstance(time_stamp, contents.TimeStamp,
|
||||||
|
self.msg.format('time stamp', 'value',
|
||||||
|
contents.TimeStamp,
|
||||||
|
type(time_stamp)))
|
||||||
|
self.assertEqual(time_stamp.value, value,
|
||||||
|
self.msg.format('time stamp', 'value',
|
||||||
|
time_stamp.value, value))
|
||||||
|
|
||||||
|
batch_count = response_header.batch_count
|
||||||
|
self.assertIsInstance(batch_count, contents.BatchCount,
|
||||||
|
self.msg.format('batch count', 'type',
|
||||||
|
contents.BatchCount,
|
||||||
|
type(batch_count)))
|
||||||
|
self.assertEqual(1, batch_count.value,
|
||||||
|
self.msg.format('batch count', 'value', 1,
|
||||||
|
batch_count.value))
|
||||||
|
|
||||||
|
batch_items = response_message.batch_items
|
||||||
|
self.assertIsInstance(batch_items, list,
|
||||||
|
self.msg.format('batch items', 'type',
|
||||||
|
list, type(batch_items)))
|
||||||
|
|
||||||
|
for batch_item in batch_items:
|
||||||
|
self.assertIsInstance(batch_item, messages.ResponseBatchItem,
|
||||||
|
self.msg.format('batch item', 'type',
|
||||||
|
messages.ResponseBatchItem,
|
||||||
|
type(batch_item)))
|
||||||
|
|
||||||
|
operation = batch_item.operation
|
||||||
|
self.assertIsInstance(operation, contents.Operation,
|
||||||
|
self.msg.format('operation', 'type',
|
||||||
|
contents.Operation,
|
||||||
|
type(operation)))
|
||||||
|
self.assertEqual(enums.Operation.MAC, operation.value,
|
||||||
|
self.msg.format('operation', 'value',
|
||||||
|
enums.Operation.MAC,
|
||||||
|
operation.value))
|
||||||
|
|
||||||
|
result_status = batch_item.result_status
|
||||||
|
self.assertIsInstance(result_status, contents.ResultStatus,
|
||||||
|
self.msg.format('result status', 'type',
|
||||||
|
contents.ResultStatus,
|
||||||
|
type(result_status)))
|
||||||
|
self.assertEqual(enums.ResultStatus.SUCCESS, result_status.value,
|
||||||
|
self.msg.format('result status', 'value',
|
||||||
|
enums.ResultStatus.SUCCESS,
|
||||||
|
result_status.value))
|
||||||
|
|
||||||
|
response_payload = batch_item.response_payload
|
||||||
|
exp_type = mac.MACResponsePayload
|
||||||
|
rcv_type = type(response_payload)
|
||||||
|
self.assertIsInstance(response_payload, exp_type,
|
||||||
|
self.msg.format('response payload', 'type',
|
||||||
|
exp_type, rcv_type))
|
||||||
|
|
||||||
|
unique_identifier = response_payload.unique_identifier
|
||||||
|
value = '1'
|
||||||
|
self.assertIsInstance(unique_identifier, attr.UniqueIdentifier,
|
||||||
|
self.msg.format('unique identifier', 'type',
|
||||||
|
attr.UniqueIdentifier,
|
||||||
|
type(unique_identifier)))
|
||||||
|
self.assertEqual(value, unique_identifier.value,
|
||||||
|
self.msg.format('unique identifier', 'value',
|
||||||
|
unique_identifier.value, value))
|
||||||
|
|
||||||
|
mac_data = response_payload.mac_data
|
||||||
|
value = \
|
||||||
|
(b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb'
|
||||||
|
b'\x32\x9f'
|
||||||
|
b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2'
|
||||||
|
b'\x2a\x5b'
|
||||||
|
b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe'
|
||||||
|
b'\xc8\x23'
|
||||||
|
b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51'
|
||||||
|
b'\xff\x7c')
|
||||||
|
self.assertIsInstance(mac_data, objects.MACData,
|
||||||
|
self.msg.format('secret', 'type',
|
||||||
|
objects.MACData,
|
||||||
|
type(mac_data)))
|
||||||
|
self.assertEqual(value, mac_data.value,
|
||||||
|
self.msg.format('mac data', 'value',
|
||||||
|
binascii.hexlify(mac_data.value),
|
||||||
|
binascii.hexlify(value)))
|
||||||
|
|
||||||
|
def test_mac_response_write(self):
|
||||||
|
prot_ver = contents.ProtocolVersion.create(1, 2)
|
||||||
|
|
||||||
|
# Fri Apr 27 10:12:23 CEST 2012
|
||||||
|
time_stamp = contents.TimeStamp(0x588a3f23)
|
||||||
|
|
||||||
|
batch_count = contents.BatchCount(1)
|
||||||
|
response_header = messages.ResponseHeader(protocol_version=prot_ver,
|
||||||
|
time_stamp=time_stamp,
|
||||||
|
batch_count=batch_count)
|
||||||
|
operation = contents.Operation(enums.Operation.MAC)
|
||||||
|
result_status = contents.ResultStatus(enums.ResultStatus.SUCCESS)
|
||||||
|
|
||||||
|
uuid = '1'
|
||||||
|
uniq_id = attr.UniqueIdentifier(uuid)
|
||||||
|
|
||||||
|
value = \
|
||||||
|
(b'\x99\x8b\x55\x59\x90\x9b\x85\x87\x5b\x90\x63\x13\x12\xbb'
|
||||||
|
b'\x32\x9f'
|
||||||
|
b'\x6a\xc4\xed\x97\x6e\xac\x99\xe5\x21\x53\xc4\x19\x28\xf2'
|
||||||
|
b'\x2a\x5b'
|
||||||
|
b'\xef\x79\xa4\xbe\x05\x3b\x31\x49\x19\xe0\x75\x23\xb9\xbe'
|
||||||
|
b'\xc8\x23'
|
||||||
|
b'\x35\x60\x7e\x49\xba\xa9\x7e\xe0\x9e\x6b\x3d\x55\xf4\x51'
|
||||||
|
b'\xff\x7c')
|
||||||
|
mac_data = objects.MACData(value)
|
||||||
|
|
||||||
|
resp_pl = mac.MACResponsePayload(unique_identifier=uniq_id,
|
||||||
|
mac_data=mac_data)
|
||||||
|
batch_item = messages.ResponseBatchItem(operation=operation,
|
||||||
|
result_status=result_status,
|
||||||
|
response_payload=resp_pl)
|
||||||
|
rm = messages.ResponseMessage(response_header=response_header,
|
||||||
|
batch_items=[batch_item])
|
||||||
|
rm.write(self.stream)
|
||||||
|
|
||||||
|
result = self.stream.read()
|
||||||
|
len_exp = len(self.mac)
|
||||||
|
len_rcv = len(result)
|
||||||
|
self.assertEqual(len_exp, len_rcv,
|
||||||
|
self.msg.format('get response message', 'write',
|
||||||
|
len_exp, len_rcv))
|
||||||
|
|
||||||
|
msg = "Bad response message write: encoding mismatch"
|
||||||
|
self.assertEqual(self.mac, result, msg)
|
||||||
|
|
||||||
def test_message_invalid_response_write(self):
|
def test_message_invalid_response_write(self):
|
||||||
# Batch item of 'INVALID MESSAGE' response
|
# Batch item of 'INVALID MESSAGE' response
|
||||||
# has no 'operation' attribute
|
# has no 'operation' attribute
|
||||||
|
|
Loading…
Reference in New Issue