From 089d126b04c6049d491841cb9b5547550e45155b Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Thu, 5 Jan 2017 18:43:11 -0500 Subject: [PATCH] Adding server support for the GetAttributeList operation This change adds support for the GetAttributeList operation. The user can specify the ID of a managed object and get back a list containing the names of all attributes currently set on the object. The user can also omit the ID and the server will default to using the ID placeholder for the object ID. New server tests have been added to cover this feature. The GetAttributeList payloads have also been updated for consistency with other payloads, requiring minor updates in other clients and unit tests. --- .../messages/payloads/get_attribute_list.py | 245 +++--- kmip/services/kmip_client.py | 2 +- kmip/services/server/engine.py | 49 ++ .../payloads/test_get_attribute_list.py | 812 ++++++++++++++---- .../tests/unit/services/server/test_engine.py | 196 ++++- kmip/tests/unit/services/test_kmip_client.py | 4 +- 6 files changed, 1027 insertions(+), 281 deletions(-) diff --git a/kmip/core/messages/payloads/get_attribute_list.py b/kmip/core/messages/payloads/get_attribute_list.py index 738e417..4d70acb 100644 --- a/kmip/core/messages/payloads/get_attribute_list.py +++ b/kmip/core/messages/payloads/get_attribute_list.py @@ -16,7 +16,6 @@ import six from kmip.core import enums -from kmip.core import exceptions from kmip.core import primitives from kmip.core import utils @@ -30,23 +29,44 @@ class GetAttributeListRequestPayload(primitives.Struct): See Section 4.13 of the KMIP 1.1 specification for more information. Attributes: - uid: The unique ID of the managed object with which the retrieved - attributes should be associated. + unique_identifier: The unique ID of the managed object with which the + retrieved attributes should be associated. """ - def __init__(self, uid=None): + + def __init__(self, unique_identifier=None): """ Construct a GetAttributeList request payload. Args: - uid (string): The ID of the managed object with which the retrieved - attributes should be associated. Optional, defaults to None. + unique_identifier (string): The ID of the managed object with + which the retrieved attribute names should be associated. + Optional, defaults to None. """ super(GetAttributeListRequestPayload, self).__init__( enums.Tags.REQUEST_PAYLOAD) - self.uid = uid + self._unique_identifier = None - self.validate() + self.unique_identifier = unique_identifier + + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value + else: + return self._unique_identifier + + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("unique identifier must be a string") def read(self, istream): """ @@ -61,14 +81,14 @@ class GetAttributeListRequestPayload(primitives.Struct): tstream = utils.BytearrayStream(istream.read(self.length)) if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream): - uid = primitives.TextString(tag=enums.Tags.UNIQUE_IDENTIFIER) - uid.read(tstream) - self.uid = uid.value + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(tstream) else: - self.uid = None + self._unique_identifier = None self.is_oversized(tstream) - self.validate() def write(self, ostream): """ @@ -81,36 +101,23 @@ class GetAttributeListRequestPayload(primitives.Struct): """ tstream = utils.BytearrayStream() - if self.uid: - uid = primitives.TextString( - value=self.uid, tag=enums.Tags.UNIQUE_IDENTIFIER) - uid.write(tstream) + if self._unique_identifier: + self._unique_identifier.write(tstream) self.length = tstream.length() super(GetAttributeListRequestPayload, self).write(ostream) ostream.write(tstream.buffer) - def validate(self): - """ - Error check the attributes of the GetAttributeList request payload. - """ - if self.uid is not None: - if not isinstance(self.uid, six.string_types): - raise TypeError( - "uid must be a string; " - "expected (one of): {0}, observed: {1}".format( - six.string_types, type(self.uid))) - def __repr__(self): - uid = "uid={0}".format(self.uid) + uid = "unique_identifier={0}".format(self.unique_identifier) return "GetAttributeListRequestPayload({0})".format(uid) def __str__(self): - return str({'uid': self.uid}) + return str({'unique_identifier': self.unique_identifier}) def __eq__(self, other): if isinstance(other, GetAttributeListRequestPayload): - if self.uid == other.uid: + if self.unique_identifier == other.unique_identifier: return True else: return False @@ -128,45 +135,100 @@ class GetAttributeListResponsePayload(primitives.Struct): """ A response payload for the GetAttributeList operation. - The payload will contain the ID of the managed object with which the - attributes are associated. It will also contain a list of attribute names + The payload contains the ID of the managed object with which the + attributes are associated, along with a list of attribute names identifying the types of attributes associated with the aforementioned - managed object. See Section 4.13 of the KMIP 1.1 specification for more - information. + managed object. Attributes: - uid: The unique ID of the managed object with which the retrieved - attributes should be associated. - attribute_names: The list of attribute names of the attributes - associated with managed object identified by the uid above. + unique_identifier: The unique ID of the managed object with which the + retrieved attributes should be associated. + attribute_names: A list of strings identifying the names of the + attributes associated with the managed object. """ - def __init__(self, uid=None, attribute_names=None): + + def __init__(self, unique_identifier=None, attribute_names=None): """ Construct a GetAttributeList response payload. Args: - uid (string): The ID of the managed object with which the retrieved - attributes should be associated. Optional, defaults to None. - attribute_names (list): A list of strings identifying the names of - the attributes associated with the managed object. Optional, + unique_identifier (string): The ID of the managed object with + which the retrieved attribute names should be associated. + Optional, defaults to None. + attribute_names: A list of strings identifying the names of the + attributes associated with the managed object. Optional, defaults to None. """ + super(GetAttributeListResponsePayload, self).__init__( - enums.Tags.RESPONSE_PAYLOAD) + enums.Tags.RESPONSE_PAYLOAD + ) - self.uid = uid + self._unique_identifier = None + self._attribute_names = list() - if attribute_names: - self.attribute_names = attribute_names + self.unique_identifier = unique_identifier + self.attribute_names = attribute_names + + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value else: - self.attribute_names = list() + return self._unique_identifier - self.validate() + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("unique identifier must be a string") + + @property + def attribute_names(self): + if self._attribute_names: + names = list() + for attribute_name in self._attribute_names: + names.append(attribute_name.value) + return names + else: + return self._attribute_names + + @attribute_names.setter + def attribute_names(self, value): + if value is None: + self._attribute_names = list() + elif isinstance(value, list): + names = list() + for i in range(len(value)): + name = value[i] + if not isinstance(name, six.string_types): + raise TypeError( + "attribute_names must be a list of strings; " + "item {0} has type {1}".format(i + 1, type(name)) + ) + if name not in names: + names.append(name) + self._attribute_names = list() + for name in names: + self._attribute_names.append( + primitives.TextString( + value=name, + tag=enums.Tags.ATTRIBUTE_NAME + ) + ) + else: + raise TypeError("attribute_names must be a list of strings") def read(self, istream): """ - Read the data encoding the GetAttributeList response payload and decode - it into its constituent parts. + Read the data encoding the GetAttributeList response payload and + decode it into its constituent parts. Args: istream (stream): A data stream containing encoded object data, @@ -176,22 +238,21 @@ class GetAttributeListResponsePayload(primitives.Struct): tstream = utils.BytearrayStream(istream.read(self.length)) if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream): - uid = primitives.TextString(tag=enums.Tags.UNIQUE_IDENTIFIER) - uid.read(tstream) - self.uid = uid.value + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(tstream) else: - raise exceptions.InvalidKmipEncoding( - "expected uid encoding not found") + self._unique_identifier = None names = list() - while(self.is_tag_next(enums.Tags.ATTRIBUTE_NAME, tstream)): + while self.is_tag_next(enums.Tags.ATTRIBUTE_NAME, tstream): name = primitives.TextString(tag=enums.Tags.ATTRIBUTE_NAME) name.read(tstream) - names.append(name.value) - self.attribute_names = names + names.append(name) + self._attribute_names = names self.is_oversized(tstream) - self.validate() def write(self, ostream): """ @@ -204,59 +265,39 @@ class GetAttributeListResponsePayload(primitives.Struct): """ tstream = utils.BytearrayStream() - uid = primitives.TextString( - value=self.uid, tag=enums.Tags.UNIQUE_IDENTIFIER) - uid.write(tstream) + if self._unique_identifier: + self._unique_identifier.write(tstream) - for name in self.attribute_names: - name = primitives.TextString( - value=name, tag=enums.Tags.ATTRIBUTE_NAME) - name.write(tstream) + for attribute_name in self._attribute_names: + attribute_name.write(tstream) self.length = tstream.length() super(GetAttributeListResponsePayload, self).write(ostream) ostream.write(tstream.buffer) - def validate(self): - """ - Error check the attributes of the GetAttributeList response payload. - """ - if self.uid is not None: - if not isinstance(self.uid, six.string_types): - raise TypeError( - "uid must be a string; " - "expected (one of): {0}, observed: {1}".format( - six.string_types, type(self.uid))) - - if self.attribute_names: - if not isinstance(self.attribute_names, list): - raise TypeError("attribute names must be a list") - for i in range(len(self.attribute_names)): - name = self.attribute_names[i] - if not isinstance(name, six.string_types): - raise TypeError( - "attribute name ({0} of {1}) must be a string".format( - i + 1, len(self.attribute_names))) - def __repr__(self): - uid = "uid={0}".format(self.uid) - names = "attribute_names={0}".format(self.attribute_names) - return "GetAttributeListResponsePayload({0}, {1})".format(uid, names) + unique_identifier = "unique_identifier={0}".format( + self.unique_identifier + ) + attribute_names = "attribute_names={0}".format(self.attribute_names) + return "GetAttributeListResponsePayload({0}, {1})".format( + unique_identifier, + attribute_names + ) def __str__(self): - return str({'uid': self.uid, 'attribute_names': self.attribute_names}) + return str({ + 'unique_identifier': self.unique_identifier, + 'attribute_names': self.attribute_names + }) def __eq__(self, other): if isinstance(other, GetAttributeListResponsePayload): - if self.uid != other.uid: - return False - elif ((isinstance(self.attribute_names, list) and - isinstance(other.attribute_names, list)) and - len(self.attribute_names) == len(other.attribute_names)): - for name in self.attribute_names: - if name not in other.attribute_names: - return False - return True + if self.unique_identifier == other.unique_identifier: + if set(self.attribute_names) == set(other.attribute_names): + return True + else: + return False else: return False else: diff --git a/kmip/services/kmip_client.py b/kmip/services/kmip_client.py index d078bfa..911d092 100644 --- a/kmip/services/kmip_client.py +++ b/kmip/services/kmip_client.py @@ -591,7 +591,7 @@ class KMIPProxy(KMIP): names = None if payload: - uid = payload.uid + uid = payload.unique_identifier names = payload.attribute_names return GetAttributeListResult( diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 791dae8..3b6b8f7 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -43,6 +43,7 @@ from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get from kmip.core.messages.payloads import get_attributes +from kmip.core.messages.payloads import get_attribute_list from kmip.core.messages.payloads import query from kmip.core.messages.payloads import register @@ -911,6 +912,8 @@ class KmipEngine(object): return self._process_get(payload) elif operation == enums.Operation.GET_ATTRIBUTES: return self._process_get_attributes(payload) + elif operation == enums.Operation.GET_ATTRIBUTE_LIST: + return self._process_get_attribute_list(payload) elif operation == enums.Operation.ACTIVATE: return self._process_activate(payload) elif operation == enums.Operation.DESTROY: @@ -1380,6 +1383,51 @@ class KmipEngine(object): return response_payload + @_kmip_version_supported('1.0') + def _process_get_attribute_list(self, payload): + self._logger.info("Processing operation: GetAttributeList") + + if payload.unique_identifier: + unique_identifier = payload.unique_identifier + else: + unique_identifier = self._id_placeholder + + object_type = self._get_object_type(unique_identifier) + + managed_object = self._data_session.query(object_type).filter( + object_type.unique_identifier == unique_identifier + ).one() + + # Determine if the request should be carried out under the object's + # operation policy. If not, feign ignorance of the object. + is_allowed = self._is_allowed_by_operation_policy( + managed_object.operation_policy_name, + self._client_identity, + managed_object._owner, + managed_object._object_type, + enums.Operation.GET_ATTRIBUTES + ) + if not is_allowed: + raise exceptions.ItemNotFound( + "Could not locate object: {0}".format(unique_identifier) + ) + + object_attributes = self._get_attributes_from_managed_object( + managed_object, + list() + ) + attribute_names = list() + + for object_attribute in object_attributes: + attribute_names.append(object_attribute.attribute_name.value) + + response_payload = get_attribute_list.GetAttributeListResponsePayload( + unique_identifier=unique_identifier, + attribute_names=attribute_names + ) + + return response_payload + @_kmip_version_supported('1.0') def _process_activate(self, payload): self._logger.info("Processing operation: Activate") @@ -1502,6 +1550,7 @@ class KmipEngine(object): contents.Operation(enums.Operation.REGISTER), contents.Operation(enums.Operation.GET), contents.Operation(enums.Operation.GET_ATTRIBUTES), + contents.Operation(enums.Operation.GET_ATTRIBUTE_LIST), contents.Operation(enums.Operation.ACTIVATE), contents.Operation(enums.Operation.DESTROY), contents.Operation(enums.Operation.QUERY) diff --git a/kmip/tests/unit/core/messages/payloads/test_get_attribute_list.py b/kmip/tests/unit/core/messages/payloads/test_get_attribute_list.py index 703f343..4d44d5e 100644 --- a/kmip/tests/unit/core/messages/payloads/test_get_attribute_list.py +++ b/kmip/tests/unit/core/messages/payloads/test_get_attribute_list.py @@ -13,10 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. -import copy import testtools -from kmip.core import exceptions +from kmip.core import enums +from kmip.core import primitives from kmip.core import utils from kmip.core.messages.payloads import get_attribute_list @@ -32,16 +32,17 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): # Encodings taken from Sections 3.1.4 of the KMIP 1.1 testing # documentation. - self.encoding_with_uid = utils.BytearrayStream(( + self.full_encoding = utils.BytearrayStream( b'\x42\x00\x79\x01\x00\x00\x00\x30\x42\x00\x94\x07\x00\x00\x00\x24' b'\x62\x34\x66\x61\x65\x65\x31\x30\x2D\x61\x61\x32\x61\x2D\x34\x34' b'\x34\x36\x2D\x38\x61\x64\x34\x2D\x30\x38\x38\x31\x66\x33\x34\x32' - b'\x32\x39\x35\x39\x00\x00\x00\x00')) + b'\x32\x39\x35\x39\x00\x00\x00\x00' + ) + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x00' + ) - self.encoding_without_uid = utils.BytearrayStream(( - b'\x42\x00\x79\x01\x00\x00\x00\x00')) - - self.uid = 'b4faee10-aa2a-4446-8ad4-0881f3422959' + self.unique_identifier = 'b4faee10-aa2a-4446-8ad4-0881f3422959' def tearDown(self): super(TestGetAttributeListRequestPayload, self).tearDown() @@ -58,17 +59,44 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): Test that a GetAttributeList request payload can be constructed with a valid value. """ - get_attribute_list.GetAttributeListRequestPayload(self.uid) + get_attribute_list.GetAttributeListRequestPayload( + 'test-unique-identifier', + ) - def test_validate_with_invalid_uid(self): + def test_unique_identifier(self): """ - Test that a TypeError exception is raised when an invalid ID is used - to construct a GetAttributeList request payload. + Test that the unique_identifier attribute of a GetAttributeList + request payload can be properly set and retrieved. """ - kwargs = {'uid': 0} + payload = get_attribute_list.GetAttributeListRequestPayload() + + self.assertIsNone(payload.unique_identifier) + self.assertIsNone(payload._unique_identifier) + + payload.unique_identifier = 'test-unique-identifier' + + self.assertEqual('test-unique-identifier', payload.unique_identifier) + self.assertEqual( + primitives.TextString( + value='test-unique-identifier', + tag=enums.Tags.UNIQUE_IDENTIFIER + ), + payload._unique_identifier + ) + + def test_unique_identifier_with_invalid_value(self): + """ + Test that a TypeError is raised when an invalid ID is used to set + the unique_identifier attribute of a GetAttributeList request payload. + """ + payload = get_attribute_list.GetAttributeListRequestPayload() + args = (payload, 'unique_identifier', 0) self.assertRaisesRegexp( - TypeError, "uid must be a string", - get_attribute_list.GetAttributeListRequestPayload, **kwargs) + TypeError, + "unique identifier must be a string", + setattr, + *args + ) def test_read(self): """ @@ -76,49 +104,90 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): stream. """ payload = get_attribute_list.GetAttributeListRequestPayload() - payload.read(self.encoding_with_uid) - self.assertEqual(self.uid, payload.uid) - def test_read_with_no_uid(self): + self.assertEqual(None, payload._unique_identifier) + + payload.read(self.full_encoding) + + self.assertEqual(self.unique_identifier, payload.unique_identifier) + self.assertEqual( + primitives.TextString( + value=self.unique_identifier, + tag=enums.Tags.UNIQUE_IDENTIFIER + ), + payload._unique_identifier + ) + + def test_read_with_no_content(self): """ - Test that a GetAttributeList request payload with no ID can be read - from a data stream. + Test that a GetAttributeList response payload with no ID or attribute + names can be read from a data stream. """ payload = get_attribute_list.GetAttributeListRequestPayload() - payload.read(self.encoding_without_uid) - self.assertEqual(None, payload.uid) + + self.assertEqual(None, payload._unique_identifier) + + payload.read(self.empty_encoding) + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload._unique_identifier) def test_write(self): """ Test that a GetAttributeList request payload can be written to a data stream. """ - payload = get_attribute_list.GetAttributeListRequestPayload(self.uid) + payload = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) stream = utils.BytearrayStream() payload.write(stream) - self.assertEqual(len(self.encoding_with_uid), len(stream)) - self.assertEqual(self.encoding_with_uid, stream) + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) - def test_write_with_no_uid(self): + def test_write_with_no_content(self): """ - Test that a GetAttributeList request payload with no ID can be written - to a data stream. + Test that a GetAttributeList request payload with no ID or attribute + names can be written to a data stream. """ payload = get_attribute_list.GetAttributeListRequestPayload() stream = utils.BytearrayStream() payload.write(stream) - self.assertEqual(len(self.encoding_without_uid), len(stream)) - self.assertEqual(self.encoding_without_uid, stream) + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) def test_repr(self): """ Test that repr can be applied to a GetAttributeList request payload. """ - payload = get_attribute_list.GetAttributeListRequestPayload(self.uid) - args = "uid={0}".format(payload.uid) - expected = "GetAttributeListRequestPayload({0})".format(args) + payload = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) + expected = "GetAttributeListRequestPayload({0})".format( + unique_identifier + ) + observed = repr(payload) + self.assertEqual(expected, observed) + + def test_repr_with_no_content(self): + """ + Test that repr can be applied to a GetAttributeList request payload + with no ID or attribute names. + """ + payload = get_attribute_list.GetAttributeListRequestPayload( + None + ) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) + expected = "GetAttributeListRequestPayload({0})".format( + unique_identifier + ) observed = repr(payload) self.assertEqual(expected, observed) @@ -126,8 +195,26 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): """ Test that str can be applied to a GetAttributeList request payload. """ - payload = get_attribute_list.GetAttributeListRequestPayload(self.uid) - expected = str({'uid': payload.uid}) + payload = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) + expected = str({ + 'unique_identifier': self.unique_identifier + }) + observed = str(payload) + self.assertEqual(expected, observed) + + def test_str_with_no_content(self): + """ + Test that str can be applied to a GetAttributeList request payload + with no ID or attribute names. + """ + payload = get_attribute_list.GetAttributeListRequestPayload( + None + ) + expected = str({ + 'unique_identifier': None + }) observed = str(payload) self.assertEqual(expected, observed) @@ -136,19 +223,27 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): Test that the equality operator returns True when comparing two GetAttributeList request payloads with the same data. """ - a = get_attribute_list.GetAttributeListRequestPayload(self.uid) - b = get_attribute_list.GetAttributeListRequestPayload(self.uid) + a = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) + b = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) self.assertTrue(a == b) self.assertTrue(b == a) - def test_equal_on_not_equal_uid(self): + def test_equal_on_not_equal_unique_identifier(self): """ Test that the equality operator returns False when comparing two - GetAttributeList request payloads with different data. + GetAttributeList request payloads with different IDs. """ - a = get_attribute_list.GetAttributeListRequestPayload(self.uid) - b = get_attribute_list.GetAttributeListRequestPayload('invalid') + a = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) + b = get_attribute_list.GetAttributeListRequestPayload( + 'invalid' + ) self.assertFalse(a == b) self.assertFalse(b == a) @@ -159,7 +254,9 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): GetAttributeList request payload to a non-GetAttributeList request payload. """ - a = get_attribute_list.GetAttributeListRequestPayload(self.uid) + a = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) b = "invalid" self.assertFalse(a == b) @@ -170,19 +267,27 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): Test that the inequality operator returns False when comparing two GetAttributeList request payloads with the same internal data. """ - a = get_attribute_list.GetAttributeListRequestPayload(self.uid) - b = get_attribute_list.GetAttributeListRequestPayload(self.uid) + a = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) + b = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) self.assertFalse(a != b) self.assertFalse(b != a) - def test_not_equal_on_not_equal_uid(self): + def test_not_equal_on_not_equal_unique_identifier(self): """ Test that the inequality operator returns True when comparing two - GetAttributeList request payloads with different data. + GetAttributeList request payloads with different IDs. """ - a = get_attribute_list.GetAttributeListRequestPayload(self.uid) - b = get_attribute_list.GetAttributeListRequestPayload('invalid') + a = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) + b = get_attribute_list.GetAttributeListRequestPayload( + 'invalid' + ) self.assertTrue(a != b) self.assertTrue(b != a) @@ -193,7 +298,9 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): GetAttributeList request payload to a non-GetAttributeList request payload. """ - a = get_attribute_list.GetAttributeListRequestPayload(self.uid) + a = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) b = "invalid" self.assertTrue(a != b) @@ -202,8 +309,7 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): class TestGetAttributeListResponsePayload(testtools.TestCase): """ - Test encodings obtained from Sections 12.1 and 12.2 of the KMIP 1.1 Test - Cases documentation. + Test suite for the GetAttributeList response payload. """ def setUp(self): @@ -211,7 +317,7 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): # Encodings taken from Sections 3.1.4 of the KMIP 1.1 testing # documentation. - self.encoding_with_uid_with_names = utils.BytearrayStream(( + self.full_encoding = utils.BytearrayStream( b'\x42\x00\x7C\x01\x00\x00\x01\x60\x42\x00\x94\x07\x00\x00\x00\x24' b'\x62\x34\x66\x61\x65\x65\x31\x30\x2D\x61\x61\x32\x61\x2D\x34\x34' b'\x34\x36\x2D\x38\x61\x64\x34\x2D\x30\x38\x38\x31\x66\x33\x34\x32' @@ -234,9 +340,10 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): b'\x42\x00\x0A\x07\x00\x00\x00\x13\x43\x6F\x6E\x74\x61\x63\x74\x20' b'\x49\x6E\x66\x6F\x72\x6D\x61\x74\x69\x6F\x6E\x00\x00\x00\x00\x00' b'\x42\x00\x0A\x07\x00\x00\x00\x10\x4C\x61\x73\x74\x20\x43\x68\x61' - b'\x6E\x67\x65\x20\x44\x61\x74\x65')) - self.encoding_without_uid_with_names = utils.BytearrayStream(( - b'\x42\x00\x7C\x01\x00\x00\x01\x60\x42\x00\x0A\x07\x00\x00\x00\x14' + b'\x6E\x67\x65\x20\x44\x61\x74\x65' + ) + self.encoding_sans_unique_identifier = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x01\x30\x42\x00\x0A\x07\x00\x00\x00\x14' b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x4C\x65' b'\x6E\x67\x74\x68\x00\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x17' b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x41\x6C' @@ -255,15 +362,20 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): b'\x42\x00\x0A\x07\x00\x00\x00\x13\x43\x6F\x6E\x74\x61\x63\x74\x20' b'\x49\x6E\x66\x6F\x72\x6D\x61\x74\x69\x6F\x6E\x00\x00\x00\x00\x00' b'\x42\x00\x0A\x07\x00\x00\x00\x10\x4C\x61\x73\x74\x20\x43\x68\x61' - b'\x6E\x67\x65\x20\x44\x61\x74\x65')) - self.encoding_with_uid_without_names = utils.BytearrayStream(( + b'\x6E\x67\x65\x20\x44\x61\x74\x65' + ) + self.encoding_sans_attribute_names = utils.BytearrayStream( b'\x42\x00\x7C\x01\x00\x00\x00\x30\x42\x00\x94\x07\x00\x00\x00\x24' b'\x62\x34\x66\x61\x65\x65\x31\x30\x2D\x61\x61\x32\x61\x2D\x34\x34' b'\x34\x36\x2D\x38\x61\x64\x34\x2D\x30\x38\x38\x31\x66\x33\x34\x32' - b'\x32\x39\x35\x39\x00\x00\x00\x00')) + b'\x32\x39\x35\x39\x00\x00\x00\x00' + ) + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x00' + ) - self.uid = 'b4faee10-aa2a-4446-8ad4-0881f3422959' - self.attribute_names = list(( + self.unique_identifier = 'b4faee10-aa2a-4446-8ad4-0881f3422959' + self.attribute_names = [ 'Cryptographic Length', 'Cryptographic Algorithm', 'State', @@ -275,59 +387,167 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): 'Cryptographic Usage Mask', 'Object Type', 'Contact Information', - 'Last Change Date')) + 'Last Change Date' + ] def tearDown(self): super(TestGetAttributeListResponsePayload, self).tearDown() def test_init(self): """ - Test that a GetAttributeList response payload can be constructed. + Test that a GetAttributeList response payload can be constructed with + no arguments. """ get_attribute_list.GetAttributeListResponsePayload() def test_init_with_args(self): """ - Test that a GetAttributeList response payload can be constructed with - valid values. + Test that a GetAttributeList response payload can be constructed with a + valid value. """ get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + 'test-unique-identifier', + ['test-attribute-name-1', 'test-attribute-name-2'] + ) - def test_validate_with_invalid_uid(self): + def test_unique_identifier(self): """ - Test that a TypeError exception is raised when an invalid ID is used - to construct a GetAttributeList response payload. + Test that the unique_identifier attribute of a GetAttributeList + response payload can be properly set and retrieved. """ - kwargs = {'uid': 0, 'attribute_names': self.attribute_names} + payload = get_attribute_list.GetAttributeListResponsePayload() + + self.assertIsNone(payload.unique_identifier) + self.assertIsNone(payload._unique_identifier) + + payload.unique_identifier = 'test-unique-identifier' + + self.assertEqual('test-unique-identifier', payload.unique_identifier) + self.assertEqual( + primitives.TextString( + value='test-unique-identifier', + tag=enums.Tags.UNIQUE_IDENTIFIER + ), + payload._unique_identifier + ) + + def test_unique_identifier_with_invalid_value(self): + """ + Test that a TypeError is raised when an invalid ID is used to set + the unique_identifier attribute of a GetAttributeList response + payload. + """ + payload = get_attribute_list.GetAttributeListResponsePayload() + args = (payload, 'unique_identifier', 0) self.assertRaisesRegexp( - TypeError, "uid must be a string", - get_attribute_list.GetAttributeListResponsePayload, **kwargs) + TypeError, + "unique identifier must be a string", + setattr, + *args + ) - def test_validate_with_invalid_attribute_names(self): + def test_attribute_names(self): """ - Test that a TypeError exception is raised when an invalid attribute - name list is used to construct a GetAttributeList response payload. + Test that the attribute_names attribute of a GetAttributeList response + payload can be properly set and retrieved. """ - kwargs = {'uid': self.uid, 'attribute_names': 'invalid'} + payload = get_attribute_list.GetAttributeListResponsePayload() + + self.assertEqual(list(), payload.attribute_names) + self.assertEqual(list(), payload._attribute_names) + + payload.attribute_names = [ + 'test-attribute-name-1', + 'test-attribute-name-2' + ] + + self.assertEqual(2, len(payload.attribute_names)) + self.assertEqual(2, len(payload._attribute_names)) + self.assertIn('test-attribute-name-1', payload.attribute_names) + self.assertIn('test-attribute-name-2', payload.attribute_names) + self.assertIn( + primitives.TextString( + value='test-attribute-name-1', + tag=enums.Tags.ATTRIBUTE_NAME + ), + payload._attribute_names + ) + self.assertIn( + primitives.TextString( + value='test-attribute-name-2', + tag=enums.Tags.ATTRIBUTE_NAME + ), + payload._attribute_names + ) + + def test_attribute_names_with_invalid_value(self): + """ + Test that a TypeError is raised when an invalid list is used to set + the attribute_names attribute of a GetAttributeList response payload. + """ + payload = get_attribute_list.GetAttributeListResponsePayload() + args = (payload, 'attribute_names', 0) self.assertRaisesRegexp( - TypeError, "attribute names must be a list", - get_attribute_list.GetAttributeListResponsePayload, **kwargs) + TypeError, + "attribute_names must be a list of strings", + setattr, + *args + ) - def test_validate_with_invalid_attribute_name(self): + def test_attribute_names_with_invalid_attribute_name(self): """ - Test that a TypeError exception is raised when an invalid attribute - name is used to construct a GetAttributeList response payload object. + Test that a TypeError is raised when an invalid attribute name is + included in the list used to set the attribute_names attribute of a + GetAttributeList response payload. """ - kwargs = {'uid': self.uid, 'attribute_names': [0]} - self.assertRaises( - TypeError, get_attribute_list.GetAttributeListResponsePayload, - **kwargs) + payload = get_attribute_list.GetAttributeListResponsePayload() + args = ( + payload, + 'attribute_names', + ['test-attribute-name-1', 0] + ) + self.assertRaisesRegexp( + TypeError, + "attribute_names must be a list of strings; " + "item 2 has type {0}".format(type(0)), + setattr, + *args + ) - kwargs = {'uid': self.uid, 'attribute_names': ['', 0, '']} - self.assertRaises( - TypeError, get_attribute_list.GetAttributeListResponsePayload, - **kwargs) + def test_attribute_names_with_duplicates(self): + """ + Test that duplicate attribute names are silently removed when setting + the attribute_names attribute of a GetAttributeList response payload. + """ + payload = get_attribute_list.GetAttributeListResponsePayload() + + self.assertEqual(list(), payload.attribute_names) + self.assertEqual(list(), payload._attribute_names) + + payload.attribute_names = [ + 'test-attribute-name-1', + 'test-attribute-name-1', + 'test-attribute-name-2' + ] + + self.assertEqual(2, len(payload.attribute_names)) + self.assertEqual(2, len(payload._attribute_names)) + self.assertIn('test-attribute-name-1', payload.attribute_names) + self.assertIn('test-attribute-name-2', payload.attribute_names) + self.assertIn( + primitives.TextString( + value='test-attribute-name-1', + tag=enums.Tags.ATTRIBUTE_NAME + ), + payload._attribute_names + ) + self.assertIn( + primitives.TextString( + value='test-attribute-name-2', + tag=enums.Tags.ATTRIBUTE_NAME + ), + payload._attribute_names + ) def test_read(self): """ @@ -335,31 +555,99 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): stream. """ payload = get_attribute_list.GetAttributeListResponsePayload() - payload.read(self.encoding_with_uid_with_names) - self.assertEqual(self.uid, payload.uid) - self.assertEqual(self.attribute_names, payload.attribute_names) + self.assertEqual(None, payload._unique_identifier) + self.assertEqual(list(), payload._attribute_names) - def test_read_with_no_uid(self): + payload.read(self.full_encoding) + + self.assertEqual(self.unique_identifier, payload.unique_identifier) + self.assertEqual( + primitives.TextString( + value=self.unique_identifier, + tag=enums.Tags.UNIQUE_IDENTIFIER + ), + payload._unique_identifier + ) + self.assertEqual( + set(self.attribute_names), + set(payload.attribute_names) + ) + for attribute_name in self.attribute_names: + self.assertIn( + primitives.TextString( + value=attribute_name, + tag=enums.Tags.ATTRIBUTE_NAME + ), + payload._attribute_names + ) + + def test_read_with_no_unique_identifier(self): """ - Test that an InvalidKmipEncoding error gets raised when attempting to - read a GetAttributeList response encoding with no ID data. + Test that a GetAttributeList response payload with no ID can be read + from a data stream. """ payload = get_attribute_list.GetAttributeListResponsePayload() - self.assertRaisesRegexp( - exceptions.InvalidKmipEncoding, "expected uid encoding not found", - payload.read, self.encoding_without_uid_with_names) + + self.assertEqual(None, payload._unique_identifier) + self.assertEqual(list(), payload._attribute_names) + + payload.read(self.encoding_sans_unique_identifier) + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload._unique_identifier) + self.assertEqual( + set(self.attribute_names), + set(payload.attribute_names) + ) + for attribute_name in self.attribute_names: + self.assertIn( + primitives.TextString( + value=attribute_name, + tag=enums.Tags.ATTRIBUTE_NAME + ), + payload._attribute_names + ) def test_read_with_no_attribute_names(self): """ - Test that a GetAttributeList response payload without attribute name - data can be read from a data stream. + Test that a GetAttributeList response payload with no attribute names + can be read from a data stream. """ payload = get_attribute_list.GetAttributeListResponsePayload() - payload.read(self.encoding_with_uid_without_names) - self.assertEqual(self.uid, payload.uid) + self.assertEqual(None, payload._unique_identifier) + self.assertEqual(list(), payload._attribute_names) + + payload.read(self.encoding_sans_attribute_names) + + self.assertEqual(self.unique_identifier, payload.unique_identifier) + self.assertEqual( + primitives.TextString( + value=self.unique_identifier, + tag=enums.Tags.UNIQUE_IDENTIFIER + ), + payload._unique_identifier + ) self.assertEqual(list(), payload.attribute_names) + self.assertEqual(list(), payload._attribute_names) + + def test_read_with_no_content(self): + """ + Test that a GetAttributeList response payload with no ID or attribute + names can be read from a data stream. + """ + payload = get_attribute_list.GetAttributeListResponsePayload() + + self.assertEqual(None, payload._unique_identifier) + self.assertEqual(list(), payload._attribute_names) + + payload.read(self.empty_encoding) + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload._unique_identifier) + self.assertEqual(list(), payload.attribute_names) + self.assertEqual(list(), payload._attribute_names) def test_write(self): """ @@ -367,35 +655,147 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): stream. """ payload = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) stream = utils.BytearrayStream() payload.write(stream) - self.assertEqual(len(self.encoding_with_uid_with_names), len(stream)) - self.assertEqual(self.encoding_with_uid_with_names, stream) + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) - def test_write_with_no_attribute_names(self): + def test_write_with_no_unique_identifier(self): """ - Test that a GetAttributeList response payload with no attribute name - data can be written to a data stream. + Test that a GetAttributeList response payload with no ID can be + written to a data stream. """ - payload = get_attribute_list.GetAttributeListResponsePayload(self.uid) + payload = get_attribute_list.GetAttributeListResponsePayload( + None, + self.attribute_names + ) stream = utils.BytearrayStream() payload.write(stream) self.assertEqual( - len(self.encoding_with_uid_without_names), len(stream)) - self.assertEqual(self.encoding_with_uid_without_names, stream) + len(self.encoding_sans_unique_identifier), + len(stream) + ) + self.assertEqual( + str(self.encoding_sans_unique_identifier), + str(stream) + ) + + def test_write_with_no_attribute_names(self): + """ + Test that a GetAttributeList response payload with no attribute names + can be written to a data stream. + """ + payload = get_attribute_list.GetAttributeListResponsePayload( + self.unique_identifier, + None + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.encoding_sans_attribute_names), len(stream)) + self.assertEqual(str(self.encoding_sans_attribute_names), str(stream)) + + def test_write_with_no_content(self): + """ + Test that a GetAttributeList response payload with no ID or attribute + names can be written to a data stream. + """ + payload = get_attribute_list.GetAttributeListResponsePayload() + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) def test_repr(self): """ Test that repr can be applied to a GetAttributeList response payload. """ payload = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) - args = "uid={0}, attribute_names={1}".format( - payload.uid, payload.attribute_names) - expected = "GetAttributeListResponsePayload({0})".format(args) + self.unique_identifier, + self.attribute_names + ) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) + attribute_names = "attribute_names={0}".format( + payload.attribute_names + ) + expected = "GetAttributeListResponsePayload({0}, {1})".format( + unique_identifier, + attribute_names + ) + observed = repr(payload) + self.assertEqual(expected, observed) + + def test_repr_with_no_unique_identifier(self): + """ + Test that repr can be applied to a GetAttributeList response payload + with no ID. + """ + payload = get_attribute_list.GetAttributeListResponsePayload( + None, + self.attribute_names + ) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) + attribute_names = "attribute_names={0}".format( + payload.attribute_names + ) + expected = "GetAttributeListResponsePayload({0}, {1})".format( + unique_identifier, + attribute_names + ) + observed = repr(payload) + self.assertEqual(expected, observed) + + def test_repr_with_no_attribute_names(self): + """ + Test that repr can be applied to a GetAttributeList response payload + with no attribute names. + """ + payload = get_attribute_list.GetAttributeListResponsePayload( + self.unique_identifier, + None + ) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) + attribute_names = "attribute_names={0}".format( + payload.attribute_names + ) + expected = "GetAttributeListResponsePayload({0}, {1})".format( + unique_identifier, + attribute_names + ) + observed = repr(payload) + self.assertEqual(expected, observed) + + def test_repr_with_no_content(self): + """ + Test that repr can be applied to a GetAttributeList response payload + with no ID or attribute names. + """ + payload = get_attribute_list.GetAttributeListResponsePayload( + None, + None + ) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) + attribute_names = "attribute_names={0}".format( + payload.attribute_names + ) + expected = "GetAttributeListResponsePayload({0}, {1})".format( + unique_identifier, + attribute_names + ) observed = repr(payload) self.assertEqual(expected, observed) @@ -403,9 +803,62 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): """ Test that str can be applied to a GetAttributeList response payload. """ - payload = get_attribute_list.GetAttributeListResponsePayload(self.uid) - expected = str({'uid': payload.uid, - 'attribute_names': payload.attribute_names}) + payload = get_attribute_list.GetAttributeListResponsePayload( + self.unique_identifier, + self.attribute_names + ) + expected = str({ + 'unique_identifier': self.unique_identifier, + 'attribute_names': self.attribute_names + }) + observed = str(payload) + self.assertEqual(expected, observed) + + def test_str_with_no_id(self): + """ + Test that str can be applied to a GetAttributeList response payload + with no ID. + """ + payload = get_attribute_list.GetAttributeListResponsePayload( + None, + self.attribute_names + ) + expected = str({ + 'unique_identifier': None, + 'attribute_names': self.attribute_names + }) + observed = str(payload) + self.assertEqual(expected, observed) + + def test_str_with_no_attribute_names(self): + """ + Test that str can be applied to a GetAttributeList response payload + with no attribute names. + """ + payload = get_attribute_list.GetAttributeListResponsePayload( + self.unique_identifier, + None + ) + expected = str({ + 'unique_identifier': self.unique_identifier, + 'attribute_names': list() + }) + observed = str(payload) + self.assertEqual(expected, observed) + + def test_str_with_no_content(self): + """ + Test that str can be applied to a GetAttributeList response payload + with no ID or attribute names. + """ + payload = get_attribute_list.GetAttributeListResponsePayload( + None, + None + ) + expected = str({ + 'unique_identifier': None, + 'attribute_names': list() + }) observed = str(payload) self.assertEqual(expected, observed) @@ -415,22 +868,49 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): GetAttributeList response payloads with the same data. """ a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) b = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) self.assertTrue(a == b) self.assertTrue(b == a) - def test_equal_on_not_equal_uid(self): + def test_equal_with_mixed_attribute_names(self): """ - Test that the equality operator returns False when comparing two - GetAttributeList response payloads with different data. + Test that the equality operator returns True when comparing two + GetAttributeList response payload with the same attribute_name sets + but with different attribute name orderings. """ a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) + self.attribute_names.reverse() b = get_attribute_list.GetAttributeListResponsePayload( - 'invalid', self.attribute_names) + self.unique_identifier, + self.attribute_names + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns False when comparing two + GetAttributeList response payloads with different IDs. + """ + a = get_attribute_list.GetAttributeListResponsePayload( + self.unique_identifier, + self.attribute_names + ) + b = get_attribute_list.GetAttributeListResponsePayload( + 'invalid', + self.attribute_names + ) self.assertFalse(a == b) self.assertFalse(b == a) @@ -438,27 +918,16 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): def test_equal_on_not_equal_attribute_names(self): """ Test that the equality operator returns False when comparing two - GetAttributeList response payloads with different data. + GetAttributeList response payloads with different attribute names. """ a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) b = get_attribute_list.GetAttributeListResponsePayload( - self.uid, list()) - - self.assertFalse(a == b) - self.assertFalse(b == a) - - def test_equal_on_not_equal_attribute_name(self): - """ - Test that the equality operator returns False when comparing two - GetAttributeList response payloads with different data. - """ - alt_names = copy.deepcopy(self.attribute_names) - alt_names[0] = 'invalid' - a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) - b = get_attribute_list.GetAttributeListResponsePayload( - self.uid, alt_names) + self.unique_identifier, + None + ) self.assertFalse(a == b) self.assertFalse(b == a) @@ -470,8 +939,10 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): payload. """ a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) - b = 'invalid' + self.unique_identifier, + self.attribute_names + ) + b = "invalid" self.assertFalse(a == b) self.assertFalse(b == a) @@ -482,62 +953,61 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): two GetAttributeList response payloads with the same internal data. """ a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) b = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) self.assertFalse(a != b) self.assertFalse(b != a) - def test_not_equal_on_not_equal_uid(self): + def test_not_equal_on_not_equal_unique_identifier(self): """ Test that the inequality operator returns True when comparing two - GetAttributeList request payloads with different data. + GetAttributeList response payloads with different IDs. """ a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) b = get_attribute_list.GetAttributeListResponsePayload( - 'invalid', self.attribute_names) + 'invalid', + self.attribute_names + ) self.assertTrue(a != b) self.assertTrue(b != a) def test_not_equal_on_not_equal_attribute_names(self): """ - Test that the inequality operator returns False when comparing two - GetAttributeList response payloads with different data. + Test that the inequality operator returns True when comparing two + GetAttributeList response payloads with different attribute names. """ a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) b = get_attribute_list.GetAttributeListResponsePayload( - self.uid, list()) - - self.assertTrue(a != b) - self.assertTrue(b != a) - - def test_not_equal_on_not_equal_attribute_name(self): - """ - Test that the inequality operator returns False when comparing two - GetAttributeList response payloads with different data. - """ - alt_names = copy.deepcopy(self.attribute_names) - alt_names[0] = 'Operation Policy Name' - a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) - b = get_attribute_list.GetAttributeListResponsePayload( - self.uid, alt_names) + self.unique_identifier, + None + ) self.assertTrue(a != b) self.assertTrue(b != a) def test_not_equal_on_type_mismatch(self): """ - Test that the inequality operator returns True when comparing a + Test that the equality operator returns True when comparing a GetAttributeList response payload to a non-GetAttributeList response payload. """ a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) b = "invalid" self.assertTrue(a != b) diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 588db34..e29250a 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -44,6 +44,7 @@ from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get +from kmip.core.messages.payloads import get_attribute_list from kmip.core.messages.payloads import get_attributes from kmip.core.messages.payloads import query from kmip.core.messages.payloads import register @@ -883,6 +884,7 @@ class TestKmipEngine(testtools.TestCase): e._process_register = mock.MagicMock() e._process_get = mock.MagicMock() e._process_get_attributes = mock.MagicMock() + e._process_get_attribute_list = mock.MagicMock() e._process_activate = mock.MagicMock() e._process_destroy = mock.MagicMock() e._process_query = mock.MagicMock() @@ -893,6 +895,7 @@ class TestKmipEngine(testtools.TestCase): e._process_operation(enums.Operation.REGISTER, None) e._process_operation(enums.Operation.GET, None) e._process_operation(enums.Operation.GET_ATTRIBUTES, None) + e._process_operation(enums.Operation.GET_ATTRIBUTE_LIST, None) e._process_operation(enums.Operation.ACTIVATE, None) e._process_operation(enums.Operation.DESTROY, None) e._process_operation(enums.Operation.QUERY, None) @@ -903,6 +906,7 @@ class TestKmipEngine(testtools.TestCase): e._process_register.assert_called_with(None) e._process_get.assert_called_with(None) e._process_get_attributes.assert_called_with(None) + e._process_get_attribute_list.assert_called_with(None) e._process_activate.assert_called_with(None) e._process_destroy.assert_called_with(None) e._process_query.assert_called_with(None) @@ -3866,6 +3870,184 @@ class TestKmipEngine(testtools.TestCase): *args ) + def test_get_attribute_list(self): + """ + Test that a GetAttributeList request can be processed correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + payload = get_attribute_list.GetAttributeListRequestPayload( + unique_identifier='1' + ) + + response_payload = e._process_get_attribute_list(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: GetAttributeList" + ) + self.assertEqual( + '1', + response_payload.unique_identifier + ) + self.assertEqual( + 8, + len(response_payload.attribute_names) + ) + self.assertIn( + "Object Type", + response_payload.attribute_names + ) + self.assertIn( + "Name", + response_payload.attribute_names + ) + self.assertIn( + "Cryptographic Algorithm", + response_payload.attribute_names + ) + self.assertIn( + "Cryptographic Length", + response_payload.attribute_names + ) + self.assertIn( + "Operation Policy Name", + response_payload.attribute_names + ) + self.assertIn( + "Cryptographic Usage Mask", + response_payload.attribute_names + ) + self.assertIn( + "State", + response_payload.attribute_names + ) + self.assertIn( + "Unique Identifier", + response_payload.attribute_names + ) + + def test_get_attribute_list_with_no_arguments(self): + """ + Test that a GetAttributeList request with no arguments can be + processed correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + e._id_placeholder = '1' + + payload = get_attribute_list.GetAttributeListRequestPayload() + + response_payload = e._process_get_attribute_list(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: GetAttributeList" + ) + self.assertEqual( + '1', + response_payload.unique_identifier + ) + self.assertEqual( + 8, + len(response_payload.attribute_names) + ) + self.assertIn( + "Object Type", + response_payload.attribute_names + ) + self.assertIn( + "Name", + response_payload.attribute_names + ) + self.assertIn( + "Cryptographic Algorithm", + response_payload.attribute_names + ) + self.assertIn( + "Cryptographic Length", + response_payload.attribute_names + ) + self.assertIn( + "Operation Policy Name", + response_payload.attribute_names + ) + self.assertIn( + "Cryptographic Usage Mask", + response_payload.attribute_names + ) + self.assertIn( + "State", + response_payload.attribute_names + ) + self.assertIn( + "Unique Identifier", + response_payload.attribute_names + ) + + def test_get_attribute_list_not_allowed_by_policy(self): + """ + Test that an unallowed request is handled correctly by + GetAttributeList. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._client_identity = 'test' + + obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE) + obj_a._owner = 'admin' + + e._data_session.add(obj_a) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + payload = get_attribute_list.GetAttributeListRequestPayload( + unique_identifier=id_a + ) + + # Test by specifying the ID of the object whose attributes should + # be retrieved. + args = [payload] + self.assertRaisesRegex( + exceptions.ItemNotFound, + "Could not locate object: {0}".format(id_a), + e._process_get_attribute_list, + *args + ) + def test_activate(self): """ Test that an Activate request can be processed correctly. @@ -4180,7 +4362,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsNotNone(result.operations) - self.assertEqual(8, len(result.operations)) + self.assertEqual(9, len(result.operations)) self.assertEqual( enums.Operation.CREATE, result.operations[0].value @@ -4202,17 +4384,21 @@ class TestKmipEngine(testtools.TestCase): result.operations[4].value ) self.assertEqual( - enums.Operation.ACTIVATE, + enums.Operation.GET_ATTRIBUTE_LIST, result.operations[5].value ) self.assertEqual( - enums.Operation.DESTROY, + enums.Operation.ACTIVATE, result.operations[6].value ) self.assertEqual( - enums.Operation.QUERY, + enums.Operation.DESTROY, result.operations[7].value ) + self.assertEqual( + enums.Operation.QUERY, + result.operations[8].value + ) self.assertEqual(list(), result.object_types) self.assertIsNotNone(result.vendor_identification) self.assertEqual( @@ -4231,7 +4417,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsNotNone(result.operations) - self.assertEqual(9, len(result.operations)) + self.assertEqual(10, len(result.operations)) self.assertEqual( enums.Operation.DISCOVER_VERSIONS, result.operations[-1].value diff --git a/kmip/tests/unit/services/test_kmip_client.py b/kmip/tests/unit/services/test_kmip_client.py index 1b5e67b..0c515e4 100644 --- a/kmip/tests/unit/services/test_kmip_client.py +++ b/kmip/tests/unit/services/test_kmip_client.py @@ -421,7 +421,7 @@ class TestKMIPClient(TestCase): self.assertIsInstance( batch_item.request_payload, get_attribute_list.GetAttributeListRequestPayload) - self.assertEqual(uid, batch_item.request_payload.uid) + self.assertEqual(uid, batch_item.request_payload.unique_identifier) def test_process_batch_items(self): batch_item = ResponseBatchItem( @@ -631,7 +631,7 @@ class TestKMIPClient(TestCase): uid = '00000000-1111-2222-3333-444444444444' names = ['Cryptographic Algorithm', 'Cryptographic Length'] payload = get_attribute_list.GetAttributeListResponsePayload( - uid=uid, attribute_names=names) + unique_identifier=uid, attribute_names=names) batch_item = ResponseBatchItem( operation=Operation(OperationEnum.GET_ATTRIBUTE_LIST), response_payload=payload)