Merge pull request #288 from danetrain/feat/add-sign-payloads

Adding request and response payloads for the Sign operation
This commit is contained in:
Peter Hamilton 2017-08-01 15:47:47 -04:00 committed by GitHub
commit 9ddb74a84c
6 changed files with 1182 additions and 2 deletions

View File

@ -31,6 +31,7 @@ from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import revoke from kmip.core.messages.payloads import revoke
from kmip.core.messages.payloads import sign
from kmip.core.messages.payloads import mac from kmip.core.messages.payloads import mac
@ -86,3 +87,6 @@ class RequestPayloadFactory(PayloadFactory):
def _create_decrypt_payload(self): def _create_decrypt_payload(self):
return decrypt.DecryptRequestPayload() return decrypt.DecryptRequestPayload()
def _create_sign_payload(self):
return sign.SignRequestPayload()

View File

@ -32,6 +32,7 @@ from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import revoke from kmip.core.messages.payloads import revoke
from kmip.core.messages.payloads import mac from kmip.core.messages.payloads import mac
from kmip.core.messages.payloads import sign
class ResponsePayloadFactory(PayloadFactory): class ResponsePayloadFactory(PayloadFactory):
@ -86,3 +87,6 @@ class ResponsePayloadFactory(PayloadFactory):
def _create_decrypt_payload(self): def _create_decrypt_payload(self):
return decrypt.DecryptResponsePayload() return decrypt.DecryptResponsePayload()
def _create_sign_payload(self):
return sign.SignResponsePayload()

View File

@ -0,0 +1,386 @@
# Copyright (c) 2017 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from kmip.core import attributes
from kmip.core import enums
from kmip.core import primitives
from kmip.core import utils
class SignRequestPayload(primitives.Struct):
"""
A request payload for the Sign operation.
Attributes:
unique_identifier: The unique ID of the managed object to be used for
signing some data.
cryptographic_parameters: A collection of settings relevant for the
signature operation.
data: The data to be signed in the form of a binary string.
"""
def __init__(self,
unique_identifier=None,
cryptographic_parameters=None,
data=None):
"""
Construct a Sign request payload struct.
Args:
unique_identifier (string): The ID of the managed object (e.g. a
public key) to be used for encryption. Optional, defaults to
None. If not included, the ID placeholder will be used.
cryptographic_parameters (CryptographicParameters): A
CryptographicParameters struct containing the settings for
the signature algorithm. Optional, defaults to None. If not
included, the CryptographicParameters associated with the
managed object will be used instead.
data (bytes): The data to be signed, in binary form.
"""
super(SignRequestPayload, self).__init__(
enums.Tags.REQUEST_PAYLOAD
)
self._unique_identifier = None
self._cryptographic_parameters = None
self._data = None
self.unique_identifier = unique_identifier
self.cryptographic_parameters = cryptographic_parameters
self.data = data
@property
def unique_identifier(self):
if self._unique_identifier:
return self._unique_identifier.value
else:
return None
@unique_identifier.setter
def unique_identifier(self, value):
if value is None:
self._unique_identifier = None
elif isinstance(value, six.string_types):
self._unique_identifier = primitives.TextString(
value=value,
tag=enums.Tags.UNIQUE_IDENTIFIER
)
else:
raise TypeError("unique identifier must be a string")
@property
def cryptographic_parameters(self):
return self._cryptographic_parameters
@cryptographic_parameters.setter
def cryptographic_parameters(self, value):
if value is None:
self._cryptographic_parameters = None
elif isinstance(value, attributes.CryptographicParameters):
self._cryptographic_parameters = value
else:
raise TypeError(
"cryptographic parameters must be a CryptographicParameters "
"struct"
)
@property
def data(self):
if self._data:
return self._data.value
else:
return None
@data.setter
def data(self, value):
if value is None:
self._data is None
elif isinstance(value, six.binary_type):
self._data = primitives.ByteString(
value=value,
tag=enums.Tags.DATA
)
else:
raise TypeError("data must be bytes")
def read(self, input_stream):
"""
Read the data encoding the Sign request payload and decode it
into its parts
Args:
input_stream (stream): A data stream containing encoded object
data, supporting a read method.
Raises:
ValueError: Raised if the data attribute is missing from the
encoded payload.
"""
super(SignRequestPayload, self).read(input_stream)
local_stream = utils.BytearrayStream(input_stream.read(self.length))
if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream):
self._unique_identifier = primitives.TextString(
tag=enums.Tags.UNIQUE_IDENTIFIER
)
self._unique_identifier.read(local_stream)
if self.is_tag_next(
enums.Tags.CRYPTOGRAPHIC_PARAMETERS,
local_stream
):
self._cryptographic_parameters = \
attributes.CryptographicParameters()
self._cryptographic_parameters.read(local_stream)
if self.is_tag_next(enums.Tags.DATA, local_stream):
self._data = primitives.ByteString(tag=enums.Tags.DATA)
self._data.read(local_stream)
else:
raise ValueError(
"invalid payload missing the data attribute"
)
def write(self, output_stream):
"""
Write the data encoding the Sign request payload to a stream.
Args:
output_stream (stream): A data stream in which to encode object
data, supporting a write method; usually a BytearrayStream
object.
Raises:
ValueError: Raised if the data attribute is not defined.
"""
local_stream = utils.BytearrayStream()
if self._unique_identifier:
self._unique_identifier.write(local_stream)
if self._cryptographic_parameters:
self._cryptographic_parameters.write(local_stream)
if self._data:
self._data.write(local_stream)
else:
raise ValueError("invalid payload missing the data attribute")
self.length = local_stream.length()
super(SignRequestPayload, self).write(output_stream)
output_stream.write(local_stream.buffer)
def __eq__(self, other):
if isinstance(other, SignRequestPayload):
if self.unique_identifier != other.unique_identifier:
return False
elif self.cryptographic_parameters !=\
other.cryptographic_parameters:
return False
elif self.data != other.data:
return False
else:
return True
else:
return NotImplemented
def __ne__(self, other):
if isinstance(other, SignRequestPayload):
return not (self == other)
else:
return NotImplemented
def __repr__(self):
args = ", ".join([
"unique_identifier='{0}'".format(self.unique_identifier),
"cryptographic_parameters={0}".format(
repr(self.cryptographic_parameters)
),
"data={0}".format(self.data)
])
return "SignRequestPayload({0})".format(args)
def __str__(self):
return str({
'unique_identifier': self.unique_identifier,
'cryptographic_parameters': self.cryptographic_parameters,
'data': self.data
})
class SignResponsePayload(primitives.Struct):
"""
A response payload for the Sign operation.
Attributes:
unique_identifier: The unique ID of the managed object used to sign
the data.
signature_data: The signature data as a byte string.
"""
def __init__(self,
unique_identifier=None,
signature_data=None):
super(SignResponsePayload, self).__init__(
enums.Tags.RESPONSE_PAYLOAD
)
self._unique_identifier = None
self._signature_data = None
self.unique_identifier = unique_identifier
self.signature_data = signature_data
@property
def unique_identifier(self):
if self._unique_identifier:
return self._unique_identifier.value
else:
return None
@unique_identifier.setter
def unique_identifier(self, value):
if value is None:
self._unique_identifier = None
elif isinstance(value, six.string_types):
self._unique_identifier = primitives.TextString(
value=value,
tag=enums.Tags.UNIQUE_IDENTIFIER
)
else:
raise TypeError("unique identifier must be a string")
@property
def signature_data(self):
if self._signature_data:
return self._signature_data.value
else:
return None
@signature_data.setter
def signature_data(self, value):
if value is None:
self._signature_data = None
elif isinstance(value, six.binary_type):
self._signature_data = primitives.ByteString(
value=value,
tag=enums.Tags.SIGNATURE_DATA
)
else:
raise TypeError("signature data must be bytes")
def read(self, input_stream):
"""
Read the data encoding the Sign response payload and decode it.
Args:
input_stream (stream): A data stream containing encoded object
data, supporting a read method; usually a BytearrayStream
object.
Raises:
ValueError: Raised if the unique_identifier or signature attributes
are missing from the encoded payload.
"""
super(SignResponsePayload, self).read(input_stream)
local_stream = utils.BytearrayStream(input_stream.read(self.length))
if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream):
self._unique_identifier = primitives.TextString(
tag=enums.Tags.UNIQUE_IDENTIFIER
)
self._unique_identifier.read(local_stream)
else:
raise ValueError(
"invalid payload missing the unique identifier attribute"
)
if self.is_tag_next(enums.Tags.SIGNATURE_DATA, local_stream):
self._signature_data = primitives.ByteString(
tag=enums.Tags.SIGNATURE_DATA
)
self._signature_data.read(local_stream)
else:
raise ValueError(
"invalid payload missing the signature data attribute"
)
def write(self, output_stream):
"""
Write the data encoding the Sign response to a stream.
Args:
output_stream (stream): A data stream in which to encode object
data, supporting a write method; usually a BytearrayStream
object.
Raises:
ValueError: Raised if the unique_identifier or signature
attributes are not defined.
"""
local_stream = utils.BytearrayStream()
if self._unique_identifier:
self._unique_identifier.write(local_stream)
else:
raise ValueError(
"invalid payload missing the unique identifier attribute"
)
if self._signature_data:
self._signature_data.write(local_stream)
else:
raise ValueError(
"invalid payload missing the signature attribute"
)
self.length = local_stream.length()
super(SignResponsePayload, self).write(output_stream)
output_stream.write(local_stream.buffer)
def __eq__(self, other):
if isinstance(other, SignResponsePayload):
if self.unique_identifier != other.unique_identifier:
return False
elif self.signature_data != other.signature_data:
return False
else:
return True
else:
return NotImplemented
def __ne__(self, other):
if isinstance(other, SignResponsePayload):
return not (self == other)
else:
return NotImplemented
def __repr__(self):
args = ", ".join([
"unique_identifier='{0}'".format(self.unique_identifier),
"signature_data={0}".format(self.signature_data)
])
return "SignResponsePayload({0})".format(args)
def __str__(self):
return str({
'unique_identifier': self.unique_identifier,
'signature_data': self.signature_data
})

View File

@ -34,6 +34,7 @@ from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import revoke from kmip.core.messages.payloads import revoke
from kmip.core.messages.payloads import sign
from kmip.core.messages.payloads import mac from kmip.core.messages.payloads import mac
@ -217,7 +218,8 @@ class TestRequestPayloadFactory(testtools.TestCase):
self._test_payload_type(payload, decrypt.DecryptRequestPayload) self._test_payload_type(payload, decrypt.DecryptRequestPayload)
def test_create_sign_payload(self): def test_create_sign_payload(self):
self._test_not_implemented(self.factory.create, enums.Operation.SIGN) payload = self.factory.create(enums.Operation.SIGN)
self._test_payload_type(payload, sign.SignRequestPayload)
def test_create_signature_verify_payload(self): def test_create_signature_verify_payload(self):
self._test_not_implemented( self._test_not_implemented(

View File

@ -34,6 +34,7 @@ from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import revoke from kmip.core.messages.payloads import revoke
from kmip.core.messages.payloads import sign
from kmip.core.messages.payloads import mac from kmip.core.messages.payloads import mac
@ -215,7 +216,8 @@ class TestResponsePayloadFactory(testtools.TestCase):
self._test_payload_type(payload, decrypt.DecryptResponsePayload) self._test_payload_type(payload, decrypt.DecryptResponsePayload)
def test_create_sign_payload(self): def test_create_sign_payload(self):
self._test_not_implemented(self.factory.create, enums.Operation.SIGN) payload = self.factory.create(enums.Operation.SIGN)
self._test_payload_type(payload, sign.SignResponsePayload)
def test_create_signature_verify_payload(self): def test_create_signature_verify_payload(self):
self._test_not_implemented( self._test_not_implemented(

View File

@ -0,0 +1,782 @@
# Copyright (c) 2017 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from kmip.core import attributes
from kmip.core import enums
from kmip.core import utils
from kmip.core.messages.payloads import sign
class TestSignRequestPayload(testtools.TestCase):
"""
Test suite for the Sign request payload.
"""
def setUp(self):
super(TestSignRequestPayload, self).setUp()
# Encoding obtained in part from KMIP 1.4 testing document,
# partially cobbled together by hand from other test cases
# in this code base.
#
# This encoding matches the following set of values:
# Unique Identifier - b4faee10-aa2a-4446-8ad4-0881f3422959
# Cryptographic Parameters
# Cryptographic Algorithm - ECDSA
# Data - 01020304050607080910111213141516
self.full_encoding = utils.BytearrayStream(
b'\x42\x00\x79\x01\x00\x00\x00\x60'
b'\x42\x00\x94\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30'
b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D'
b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00'
b'\x42\x00\x2B\x01\x00\x00\x00\x10'
b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x06\x00\x00\x00\x00'
b'\x42\x00\xC2\x08\x00\x00\x00\x10\x01\x02\x03\x04\x05\x06\x07\x08'
b'\x09\x10\x11\x12\x13\x14\x15\x16'
)
self.minimum_encoding = utils.BytearrayStream(
b'\x42\x00\x79\x01\x00\x00\x00\x18'
b'\x42\x00\xC2\x08\x00\x00\x00\x10\x01\x02\x03\x04\x05\x06\x07\x08'
b'\x09\x10\x11\x12\x13\x14\x15\x16'
)
self.empty_encoding = utils.BytearrayStream(
b'\x42\x00\x79\x01\x00\x00\x00\x00'
)
def tearDown(self):
super(TestSignRequestPayload, self).tearDown()
def test_init(self):
"""
Test that a Sign request payload can be constructed with no arguments.
"""
payload = sign.SignRequestPayload()
self.assertEqual(None, payload.unique_identifier)
self.assertEqual(None, payload.cryptographic_parameters)
self.assertEqual(None, payload.data)
def test_init_with_args(self):
"""
Test that a Sign request payload can be constructed with valid values.
"""
payload = sign.SignRequestPayload(
unique_identifier='00000000-1111-2222-3333-444444444444',
cryptographic_parameters=attributes.CryptographicParameters(),
data=b'\x01\x02\x03'
)
self.assertEqual(
'00000000-1111-2222-3333-444444444444',
payload.unique_identifier
)
self.assertEqual(
attributes.CryptographicParameters(),
payload.cryptographic_parameters
)
self.assertEqual(b'\x01\x02\x03', payload.data)
def test_invalid_unique_identifier(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the unique identifier of a Sign request payload.
"""
payload = sign.SignRequestPayload()
args = (payload, 'unique_identifier', 0)
self.assertRaisesRegexp(
TypeError,
"unique identifier must be a string",
setattr,
*args
)
def test_invalid_cryptographic_parameters(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the cryptographic parameters of a Sign request payload.
"""
payload = sign.SignRequestPayload()
args = (payload, 'cryptographic_parameters', b'\x01\x02\x03')
self.assertRaisesRegexp(
TypeError,
"cryptographic parameters must be a CryptographicParameters "
"struct",
setattr,
*args
)
def test_invalid_data(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the data of a Sign request payload.
"""
payload = sign.SignRequestPayload()
args = (payload, 'data', 0)
self.assertRaisesRegexp(
TypeError,
"data must be bytes",
setattr,
*args
)
def test_read(self):
"""
Test that a Sign request payload can be read from a data stream.
"""
payload = sign.SignRequestPayload()
self.assertEqual(None, payload.unique_identifier)
self.assertEqual(None, payload.cryptographic_parameters)
self.assertEqual(None, payload.data)
payload.read(self.full_encoding)
self.assertEqual(
'b4faee10-aa2a-4446-8ad4-0881f3422959',
payload.unique_identifier
)
self.assertIsNotNone(payload.cryptographic_parameters)
self.assertEqual(
enums.CryptographicAlgorithm.ECDSA,
payload.cryptographic_parameters.cryptographic_algorithm
)
self.assertEqual(
b'\x01\x02\x03\x04\x05\x06\x07\x08'
b'\x09\x10\x11\x12\x13\x14\x15\x16',
payload.data
)
def test_read_partial(self):
"""
Test that a Sign request payload can be read from a partial data
stream containing the minimum required attributes.
"""
payload = sign.SignRequestPayload()
self.assertEqual(None, payload.unique_identifier)
self.assertEqual(None, payload.cryptographic_parameters)
self.assertEqual(None, payload.data)
payload.read(self.minimum_encoding)
self.assertEqual(
b'\x01\x02\x03\x04\x05\x06\x07\x08'
b'\x09\x10\x11\x12\x13\x14\x15\x16',
payload.data
)
def test_read_invalid(self):
"""
Test that a ValueError gets raised when a required Sign request
payload attribute is missing from the payload encoding.
"""
payload = sign.SignRequestPayload()
args = (self.empty_encoding, )
self.assertRaisesRegexp(
ValueError,
"invalid payload missing the data attribute",
payload.read,
*args
)
def test_write(self):
"""
Test that a Sign request payload can be written to a data stream.
"""
payload = sign.SignRequestPayload(
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
cryptographic_parameters=attributes.CryptographicParameters(
cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA
),
data=b'\x01\x02\x03\x04\x05\x06\x07\x08'
b'\x09\x10\x11\x12\x13\x14\x15\x16'
)
stream = utils.BytearrayStream()
payload.write(stream)
self.assertEqual(len(self.full_encoding), len(stream))
self.assertEqual(str(self.full_encoding), str(stream))
def test_write_partial(self):
"""
Test that a partially defined Sign request payload can be written
to a data stream.
"""
payload = sign.SignRequestPayload(
data=b'\x01\x02\x03\x04\x05\x06\x07\x08'
b'\x09\x10\x11\x12\x13\x14\x15\x16'
)
stream = utils.BytearrayStream()
payload.write(stream)
self.assertEqual(len(self.minimum_encoding), len(stream))
self.assertEqual(str(self.minimum_encoding), str(stream))
def test_write_invalid(self):
"""
Test that a ValueError gets raised when a required Sign request
payload attribute is missing when encoding the payload.
"""
payload = sign.SignRequestPayload()
stream = utils.BytearrayStream()
args = (stream, )
self.assertRaisesRegexp(
ValueError,
"invalid payload missing the data attribute",
payload.write,
*args
)
def test_equal_on_equal(self):
"""
Test that the equality operator returns True when comparing two
Sign request payloads with the same data.
"""
a = sign.SignRequestPayload()
b = sign.SignRequestPayload()
self.assertTrue(a == b)
self.assertTrue(b == a)
a = sign.SignRequestPayload(
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
cryptographic_parameters=attributes.CryptographicParameters(
cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA
),
data=b'\x01\x02\x03\x04\x05\x06\x07\x08'
b'\x09\x10\x11\x12\x13\x14\x15\x16'
)
b = sign.SignRequestPayload(
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
cryptographic_parameters=attributes.CryptographicParameters(
cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA
),
data=b'\x01\x02\x03\x04\x05\x06\x07\x08'
b'\x09\x10\x11\x12\x13\x14\x15\x16'
)
self.assertTrue(a == b)
self.assertTrue(b == a)
def test_equal_on_not_equal_unique_identifier(self):
"""
Test that the equality operator returns False when comparing two
Sign request payloads with different unique identifiers.
"""
a = sign.SignRequestPayload(
unique_identifier='a'
)
b = sign.SignRequestPayload(
unique_identifier='b'
)
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_not_equal_cryptographic_parameters(self):
"""
Test that the equality operator returns False when comparing two
Sign request payloads with cryptographic parameters.
"""
a = sign.SignRequestPayload(
cryptographic_parameters=attributes.CryptographicParameters(
hashing_algorithm=enums.HashingAlgorithm.MD5
)
)
b = sign.SignRequestPayload(
cryptographic_parameters=attributes.CryptographicParameters(
cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA
)
)
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_not_equal_data(self):
"""
Test that the equality operator returns False when comparing two
Sign request payloads with different data.
"""
a = sign.SignRequestPayload(data=b'\x01')
b = sign.SignRequestPayload(data=b'\xFF')
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_type_mismatch(self):
"""
Test that the equality operator returns False when comparing two
Sign request payloads with different types.
"""
a = sign.SignRequestPayload()
b = 'invalid'
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_not_equal_on_equal(self):
"""
Test that the inequality operator returns False when comparing two
Sign request payloads with the same data.
"""
a = sign.SignRequestPayload()
b = sign.SignRequestPayload()
self.assertFalse(a != b)
self.assertFalse(b != a)
a = sign.SignRequestPayload(
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
cryptographic_parameters=attributes.CryptographicParameters(
cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA
),
data=b'\x01\x02\x03\x04\x05\x06\x07\x08'
b'\x09\x10\x11\x12\x13\x14\x15\x16'
)
b = sign.SignRequestPayload(
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
cryptographic_parameters=attributes.CryptographicParameters(
cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA
),
data=b'\x01\x02\x03\x04\x05\x06\x07\x08'
b'\x09\x10\x11\x12\x13\x14\x15\x16'
)
self.assertFalse(a != b)
self.assertFalse(b != a)
def test_not_equal_on_not_equal_unique_identifier(self):
"""
Test that the inequality operator returns True when comparing two
Sign request payloads with different unique identifiers.
"""
a = sign.SignRequestPayload(
unique_identifier='a'
)
b = sign.SignRequestPayload(
unique_identifier='b'
)
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_not_equal_cryptographic_parameters(self):
"""
Test that the equality operator returns False when comparing two
Sign request payloads with cryptographic parameters.
"""
a = sign.SignRequestPayload(
cryptographic_parameters=attributes.CryptographicParameters(
hashing_algorithm=enums.HashingAlgorithm.MD5
)
)
b = sign.SignRequestPayload(
cryptographic_parameters=attributes.CryptographicParameters(
cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA
)
)
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_not_equal_data(self):
"""
Test that the inequality operator returns True when comparing two
Sign request payloads with different data.
"""
a = sign.SignRequestPayload(data=b'\x01')
b = sign.SignRequestPayload(data=b'\xFF')
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_type_mismatch(self):
"""
Test that the inequality operator returns True when comparing two
Sign request payloads with different types.
"""
a = sign.SignRequestPayload()
b = 'invalid'
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_repr(self):
"""
Test that repr can be applied to a Sign request payload.
"""
payload = sign.SignRequestPayload(
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
cryptographic_parameters=attributes.CryptographicParameters(
cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA
),
data=b'\x01\x02\x03\x04\x05\x06\x07\x08'
)
expected = (
"SignRequestPayload("
"unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', "
"cryptographic_parameters=CryptographicParameters("
"block_cipher_mode=None, padding_method=None, "
"hashing_algorithm=None, key_role_type=None, "
"digital_signature_algorithm=None, "
"cryptographic_algorithm=CryptographicAlgorithm.ECDSA, "
"random_iv=None, iv_length=None, tag_length=None, "
"fixed_field_length=None, invocation_field_length=None, "
"counter_length=None, initial_counter_value=None), "
"data=" + str(b'\x01\x02\x03\x04\x05\x06\x07\x08') + ")"
)
observed = repr(payload)
self.assertEqual(expected, observed)
def test_str(self):
"""
Test that str can be applied to a Sign request payload.
"""
crypto_params = attributes.CryptographicParameters(
cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA
)
payload = sign.SignRequestPayload(
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
cryptographic_parameters=crypto_params,
data=b'\x01\x02\x03\x04\x05\x06\x07\x08',
)
expected = str({
'unique_identifier': 'b4faee10-aa2a-4446-8ad4-0881f3422959',
'cryptographic_parameters': crypto_params,
'data': b'\x01\x02\x03\x04\x05\x06\x07\x08'
})
observed = str(payload)
self.assertEqual(expected, observed)
class TestSignResponsePayload(testtools.TestCase):
"""
Test suite for the Sign response payload.
"""
def setUp(self):
super(TestSignResponsePayload, self).setUp()
# Encoding obtained in part from KMIP 1.4 testing document,
# partially cobbled together by hand from other test cases
# in this code base.
#
# This encoding matches the following set of values:
# Unique Identifier - b4faee10-aa2a-4446-8ad4-0881f3422959
# Signature Data - 01020304050607080910111213141516
self.full_encoding = utils.BytearrayStream(
b'\x42\x00\x7C\x01\x00\x00\x00\x48'
b'\x42\x00\x94\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30'
b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D'
b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00'
b'\x42\x00\xC3\x08\x00\x00\x00\x10\x01\x02\x03\x04\x05\x06\x07\x08'
b'\x09\x10\x11\x12\x13\x14\x15\x16'
)
self.incomplete_encoding = utils.BytearrayStream(
b'\x42\x00\x7C\x01\x00\x00\x00\x30'
b'\x42\x00\x94\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30'
b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D'
b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00'
)
self.empty_encoding = utils.BytearrayStream(
b'\x42\x00\x7C\x01\x00\x00\x00\x00'
)
def tearDown(self):
super(TestSignResponsePayload, self).tearDown()
def test_init(self):
"""
Test that a Sign response payload can be constructed with no
arguments.
"""
payload = sign.SignResponsePayload()
self.assertEqual(None, payload.unique_identifier)
self.assertEqual(None, payload.signature_data)
def test_init_with_args(self):
"""
Test that a Sign response payload can be constructed with valid
values.
"""
payload = sign.SignResponsePayload(
unique_identifier='00000000-1111-2222-3333-444444444444',
signature_data=b'\x01\x02\x03'
)
self.assertEqual(
'00000000-1111-2222-3333-444444444444',
payload.unique_identifier
)
self.assertEqual(b'\x01\x02\x03', payload.signature_data)
def test_invalid_unique_identifier(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the unique identifier of a Sign response payload.
"""
payload = sign.SignResponsePayload()
args = (payload, 'unique_identifier', 0)
self.assertRaisesRegexp(
TypeError,
"unique identifier must be a string",
setattr,
*args
)
def test_invalid_signature_data(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the signature data of a Sign response payload.
"""
payload = sign.SignResponsePayload()
args = (payload, 'signature_data', 0)
self.assertRaisesRegexp(
TypeError,
"signature data must be bytes",
setattr,
*args
)
def test_read(self):
"""
Test that a Sign response payload can be read from a data stream.
"""
payload = sign.SignResponsePayload()
self.assertEqual(None, payload.unique_identifier)
self.assertEqual(None, payload.signature_data)
payload.read(self.full_encoding)
self.assertEqual(
'b4faee10-aa2a-4446-8ad4-0881f3422959',
payload.unique_identifier
)
self.assertEqual(
b'\x01\x02\x03\x04\x05\x06\x07\x08'
b'\x09\x10\x11\x12\x13\x14\x15\x16',
payload.signature_data
)
def test_read_invalid(self):
"""
Test that a ValueError gets raised when required Sign response
payload attributes are missing from the payload encoding.
"""
payload = sign.SignResponsePayload()
args = (self.empty_encoding, )
self.assertRaisesRegexp(
ValueError,
"invalid payload missing the unique identifier attribute",
payload.read,
*args
)
payload = sign.SignResponsePayload()
args = (self.incomplete_encoding, )
self.assertRaisesRegexp(
ValueError,
"invalid payload missing the signature data attribute",
payload.read,
*args
)
def test_write(self):
"""
Test that a Sign response payload can be written to a data stream.
"""
payload = sign.SignResponsePayload(
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
signature_data=b'\x01\x02\x03\x04\x05\x06\x07\x08'
b'\x09\x10\x11\x12\x13\x14\x15\x16'
)
stream = utils.BytearrayStream()
payload.write(stream)
self.assertEqual(len(self.full_encoding), len(stream))
self.assertEqual(str(self.full_encoding), str(stream))
def test_write_invalid(self):
"""
Test that a ValueError gets raised when a required Sign response
payload attribute is missing when encoding the payload.
"""
payload = sign.SignResponsePayload()
stream = utils.BytearrayStream()
args = (stream, )
self.assertRaisesRegexp(
ValueError,
"invalid payload missing the unique identifier attribute",
payload.write,
*args
)
def test_equal_on_equal(self):
"""
Test that the equality operator returns True when comparing two
equal sign response payloads
"""
encoding = utils.BytearrayStream(self.full_encoding.buffer)
a = sign.SignResponsePayload()
b = sign.SignResponsePayload()
self.assertTrue(a == b)
self.assertTrue(b == a)
a.read(encoding)
b.read(self.full_encoding)
self.assertTrue(a == b)
self.assertTrue(b == a)
def test_equal_on_not_equal_unique_identifier(self):
"""
Test that the equality operator returns False when comparing two
sign reponse payloads with different unique_identifier.
"""
a = sign.SignResponsePayload(unique_identifier='a',
signature_data=b'\x01')
b = sign.SignResponsePayload(unique_identifier='b',
signature_data=b'\x01')
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_not_equal_signature_data(self):
"""
Test that the equality operator returns False when comparing two
sign response payloads with different signature_data.
"""
a = sign.SignResponsePayload(unique_identifier='a',
signature_data=b'\x01')
b = sign.SignResponsePayload(unique_identifier='a',
signature_data=b'\x02')
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_type_mismatch(self):
"""
Test that the equality operator returns False when comparing a sign
response payload to another type.
"""
a = sign.SignResponsePayload()
b = 'invalid'
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_not_equal_on_equal(self):
"""
Test that the inequality operator returns false when comparing two
equal sign response payloads.
"""
encoding = utils.BytearrayStream(self.full_encoding.buffer)
a = sign.SignResponsePayload()
b = sign.SignResponsePayload()
self.assertFalse(a != b)
self.assertFalse(b != a)
a.read(encoding)
b.read(self.full_encoding)
self.assertFalse(a != b)
self.assertFalse(b != a)
def test_not_equal_on_not_equal_unique_identifier(self):
"""
Test that the inequality operator returns True when comparing two
sign response payloads with different unique_identifier.
"""
a = sign.SignResponsePayload(unique_identifier='a',
signature_data=b'\x01')
b = sign.SignResponsePayload(unique_identifier='b',
signature_data=b'\x01')
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_not_equal_signature_data(self):
"""
Test that the inequality operator returns True when comparing two
sign response payloads with different signature_data.
"""
a = sign.SignResponsePayload(unique_identifier='a',
signature_data=b'\x01')
b = sign.SignResponsePayload(unique_identifier='a',
signature_data=b'\x02')
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_type_mismatch(self):
"""
Test that the inequality operator returns True when comparing a
sign response payload to a different type.
"""
a = sign.SignResponsePayload()
b = 'invalid'
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_repr(self):
"""
Test that repr can applied to a sign response payload.
"""
payload = sign.SignResponsePayload(
unique_identifier='00000000-1111-2222-3333-444444444444',
signature_data=b'\x01\x02\x03'
)
expected = (
"SignResponsePayload("
"unique_identifier='00000000-1111-2222-3333-444444444444', "
"signature_data="+str(b'\x01\x02\x03') + ")"
)
observed = repr(payload)
self.assertEqual(expected, observed)
def test_str(self):
"""
Test that str can be applied to a sign response payload.
"""
payload = sign.SignResponsePayload(
unique_identifier='00000000-1111-2222-3333-444444444444',
signature_data=b'\x01\x02\x03'
)
expected = str({
'unique_identifier': '00000000-1111-2222-3333-444444444444',
'signature_data': b'\x01\x02\x03'
})
observed = str(payload)
self.assertEqual(expected, observed)