diff --git a/kmip/core/messages/payloads/get_attribute_list.py b/kmip/core/messages/payloads/get_attribute_list.py index 738e417..4d70acb 100644 --- a/kmip/core/messages/payloads/get_attribute_list.py +++ b/kmip/core/messages/payloads/get_attribute_list.py @@ -16,7 +16,6 @@ import six from kmip.core import enums -from kmip.core import exceptions from kmip.core import primitives from kmip.core import utils @@ -30,23 +29,44 @@ class GetAttributeListRequestPayload(primitives.Struct): See Section 4.13 of the KMIP 1.1 specification for more information. Attributes: - uid: The unique ID of the managed object with which the retrieved - attributes should be associated. + unique_identifier: The unique ID of the managed object with which the + retrieved attributes should be associated. """ - def __init__(self, uid=None): + + def __init__(self, unique_identifier=None): """ Construct a GetAttributeList request payload. Args: - uid (string): The ID of the managed object with which the retrieved - attributes should be associated. Optional, defaults to None. + unique_identifier (string): The ID of the managed object with + which the retrieved attribute names should be associated. + Optional, defaults to None. """ super(GetAttributeListRequestPayload, self).__init__( enums.Tags.REQUEST_PAYLOAD) - self.uid = uid + self._unique_identifier = None - self.validate() + self.unique_identifier = unique_identifier + + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value + else: + return self._unique_identifier + + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("unique identifier must be a string") def read(self, istream): """ @@ -61,14 +81,14 @@ class GetAttributeListRequestPayload(primitives.Struct): tstream = utils.BytearrayStream(istream.read(self.length)) if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream): - uid = primitives.TextString(tag=enums.Tags.UNIQUE_IDENTIFIER) - uid.read(tstream) - self.uid = uid.value + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(tstream) else: - self.uid = None + self._unique_identifier = None self.is_oversized(tstream) - self.validate() def write(self, ostream): """ @@ -81,36 +101,23 @@ class GetAttributeListRequestPayload(primitives.Struct): """ tstream = utils.BytearrayStream() - if self.uid: - uid = primitives.TextString( - value=self.uid, tag=enums.Tags.UNIQUE_IDENTIFIER) - uid.write(tstream) + if self._unique_identifier: + self._unique_identifier.write(tstream) self.length = tstream.length() super(GetAttributeListRequestPayload, self).write(ostream) ostream.write(tstream.buffer) - def validate(self): - """ - Error check the attributes of the GetAttributeList request payload. - """ - if self.uid is not None: - if not isinstance(self.uid, six.string_types): - raise TypeError( - "uid must be a string; " - "expected (one of): {0}, observed: {1}".format( - six.string_types, type(self.uid))) - def __repr__(self): - uid = "uid={0}".format(self.uid) + uid = "unique_identifier={0}".format(self.unique_identifier) return "GetAttributeListRequestPayload({0})".format(uid) def __str__(self): - return str({'uid': self.uid}) + return str({'unique_identifier': self.unique_identifier}) def __eq__(self, other): if isinstance(other, GetAttributeListRequestPayload): - if self.uid == other.uid: + if self.unique_identifier == other.unique_identifier: return True else: return False @@ -128,45 +135,100 @@ class GetAttributeListResponsePayload(primitives.Struct): """ A response payload for the GetAttributeList operation. - The payload will contain the ID of the managed object with which the - attributes are associated. It will also contain a list of attribute names + The payload contains the ID of the managed object with which the + attributes are associated, along with a list of attribute names identifying the types of attributes associated with the aforementioned - managed object. See Section 4.13 of the KMIP 1.1 specification for more - information. + managed object. Attributes: - uid: The unique ID of the managed object with which the retrieved - attributes should be associated. - attribute_names: The list of attribute names of the attributes - associated with managed object identified by the uid above. + unique_identifier: The unique ID of the managed object with which the + retrieved attributes should be associated. + attribute_names: A list of strings identifying the names of the + attributes associated with the managed object. """ - def __init__(self, uid=None, attribute_names=None): + + def __init__(self, unique_identifier=None, attribute_names=None): """ Construct a GetAttributeList response payload. Args: - uid (string): The ID of the managed object with which the retrieved - attributes should be associated. Optional, defaults to None. - attribute_names (list): A list of strings identifying the names of - the attributes associated with the managed object. Optional, + unique_identifier (string): The ID of the managed object with + which the retrieved attribute names should be associated. + Optional, defaults to None. + attribute_names: A list of strings identifying the names of the + attributes associated with the managed object. Optional, defaults to None. """ + super(GetAttributeListResponsePayload, self).__init__( - enums.Tags.RESPONSE_PAYLOAD) + enums.Tags.RESPONSE_PAYLOAD + ) - self.uid = uid + self._unique_identifier = None + self._attribute_names = list() - if attribute_names: - self.attribute_names = attribute_names + self.unique_identifier = unique_identifier + self.attribute_names = attribute_names + + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value else: - self.attribute_names = list() + return self._unique_identifier - self.validate() + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("unique identifier must be a string") + + @property + def attribute_names(self): + if self._attribute_names: + names = list() + for attribute_name in self._attribute_names: + names.append(attribute_name.value) + return names + else: + return self._attribute_names + + @attribute_names.setter + def attribute_names(self, value): + if value is None: + self._attribute_names = list() + elif isinstance(value, list): + names = list() + for i in range(len(value)): + name = value[i] + if not isinstance(name, six.string_types): + raise TypeError( + "attribute_names must be a list of strings; " + "item {0} has type {1}".format(i + 1, type(name)) + ) + if name not in names: + names.append(name) + self._attribute_names = list() + for name in names: + self._attribute_names.append( + primitives.TextString( + value=name, + tag=enums.Tags.ATTRIBUTE_NAME + ) + ) + else: + raise TypeError("attribute_names must be a list of strings") def read(self, istream): """ - Read the data encoding the GetAttributeList response payload and decode - it into its constituent parts. + Read the data encoding the GetAttributeList response payload and + decode it into its constituent parts. Args: istream (stream): A data stream containing encoded object data, @@ -176,22 +238,21 @@ class GetAttributeListResponsePayload(primitives.Struct): tstream = utils.BytearrayStream(istream.read(self.length)) if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream): - uid = primitives.TextString(tag=enums.Tags.UNIQUE_IDENTIFIER) - uid.read(tstream) - self.uid = uid.value + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(tstream) else: - raise exceptions.InvalidKmipEncoding( - "expected uid encoding not found") + self._unique_identifier = None names = list() - while(self.is_tag_next(enums.Tags.ATTRIBUTE_NAME, tstream)): + while self.is_tag_next(enums.Tags.ATTRIBUTE_NAME, tstream): name = primitives.TextString(tag=enums.Tags.ATTRIBUTE_NAME) name.read(tstream) - names.append(name.value) - self.attribute_names = names + names.append(name) + self._attribute_names = names self.is_oversized(tstream) - self.validate() def write(self, ostream): """ @@ -204,59 +265,39 @@ class GetAttributeListResponsePayload(primitives.Struct): """ tstream = utils.BytearrayStream() - uid = primitives.TextString( - value=self.uid, tag=enums.Tags.UNIQUE_IDENTIFIER) - uid.write(tstream) + if self._unique_identifier: + self._unique_identifier.write(tstream) - for name in self.attribute_names: - name = primitives.TextString( - value=name, tag=enums.Tags.ATTRIBUTE_NAME) - name.write(tstream) + for attribute_name in self._attribute_names: + attribute_name.write(tstream) self.length = tstream.length() super(GetAttributeListResponsePayload, self).write(ostream) ostream.write(tstream.buffer) - def validate(self): - """ - Error check the attributes of the GetAttributeList response payload. - """ - if self.uid is not None: - if not isinstance(self.uid, six.string_types): - raise TypeError( - "uid must be a string; " - "expected (one of): {0}, observed: {1}".format( - six.string_types, type(self.uid))) - - if self.attribute_names: - if not isinstance(self.attribute_names, list): - raise TypeError("attribute names must be a list") - for i in range(len(self.attribute_names)): - name = self.attribute_names[i] - if not isinstance(name, six.string_types): - raise TypeError( - "attribute name ({0} of {1}) must be a string".format( - i + 1, len(self.attribute_names))) - def __repr__(self): - uid = "uid={0}".format(self.uid) - names = "attribute_names={0}".format(self.attribute_names) - return "GetAttributeListResponsePayload({0}, {1})".format(uid, names) + unique_identifier = "unique_identifier={0}".format( + self.unique_identifier + ) + attribute_names = "attribute_names={0}".format(self.attribute_names) + return "GetAttributeListResponsePayload({0}, {1})".format( + unique_identifier, + attribute_names + ) def __str__(self): - return str({'uid': self.uid, 'attribute_names': self.attribute_names}) + return str({ + 'unique_identifier': self.unique_identifier, + 'attribute_names': self.attribute_names + }) def __eq__(self, other): if isinstance(other, GetAttributeListResponsePayload): - if self.uid != other.uid: - return False - elif ((isinstance(self.attribute_names, list) and - isinstance(other.attribute_names, list)) and - len(self.attribute_names) == len(other.attribute_names)): - for name in self.attribute_names: - if name not in other.attribute_names: - return False - return True + if self.unique_identifier == other.unique_identifier: + if set(self.attribute_names) == set(other.attribute_names): + return True + else: + return False else: return False else: diff --git a/kmip/services/kmip_client.py b/kmip/services/kmip_client.py index d078bfa..911d092 100644 --- a/kmip/services/kmip_client.py +++ b/kmip/services/kmip_client.py @@ -591,7 +591,7 @@ class KMIPProxy(KMIP): names = None if payload: - uid = payload.uid + uid = payload.unique_identifier names = payload.attribute_names return GetAttributeListResult( diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 791dae8..3b6b8f7 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -43,6 +43,7 @@ from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get from kmip.core.messages.payloads import get_attributes +from kmip.core.messages.payloads import get_attribute_list from kmip.core.messages.payloads import query from kmip.core.messages.payloads import register @@ -911,6 +912,8 @@ class KmipEngine(object): return self._process_get(payload) elif operation == enums.Operation.GET_ATTRIBUTES: return self._process_get_attributes(payload) + elif operation == enums.Operation.GET_ATTRIBUTE_LIST: + return self._process_get_attribute_list(payload) elif operation == enums.Operation.ACTIVATE: return self._process_activate(payload) elif operation == enums.Operation.DESTROY: @@ -1380,6 +1383,51 @@ class KmipEngine(object): return response_payload + @_kmip_version_supported('1.0') + def _process_get_attribute_list(self, payload): + self._logger.info("Processing operation: GetAttributeList") + + if payload.unique_identifier: + unique_identifier = payload.unique_identifier + else: + unique_identifier = self._id_placeholder + + object_type = self._get_object_type(unique_identifier) + + managed_object = self._data_session.query(object_type).filter( + object_type.unique_identifier == unique_identifier + ).one() + + # Determine if the request should be carried out under the object's + # operation policy. If not, feign ignorance of the object. + is_allowed = self._is_allowed_by_operation_policy( + managed_object.operation_policy_name, + self._client_identity, + managed_object._owner, + managed_object._object_type, + enums.Operation.GET_ATTRIBUTES + ) + if not is_allowed: + raise exceptions.ItemNotFound( + "Could not locate object: {0}".format(unique_identifier) + ) + + object_attributes = self._get_attributes_from_managed_object( + managed_object, + list() + ) + attribute_names = list() + + for object_attribute in object_attributes: + attribute_names.append(object_attribute.attribute_name.value) + + response_payload = get_attribute_list.GetAttributeListResponsePayload( + unique_identifier=unique_identifier, + attribute_names=attribute_names + ) + + return response_payload + @_kmip_version_supported('1.0') def _process_activate(self, payload): self._logger.info("Processing operation: Activate") @@ -1502,6 +1550,7 @@ class KmipEngine(object): contents.Operation(enums.Operation.REGISTER), contents.Operation(enums.Operation.GET), contents.Operation(enums.Operation.GET_ATTRIBUTES), + contents.Operation(enums.Operation.GET_ATTRIBUTE_LIST), contents.Operation(enums.Operation.ACTIVATE), contents.Operation(enums.Operation.DESTROY), contents.Operation(enums.Operation.QUERY) diff --git a/kmip/tests/unit/core/messages/payloads/test_get_attribute_list.py b/kmip/tests/unit/core/messages/payloads/test_get_attribute_list.py index 703f343..4d44d5e 100644 --- a/kmip/tests/unit/core/messages/payloads/test_get_attribute_list.py +++ b/kmip/tests/unit/core/messages/payloads/test_get_attribute_list.py @@ -13,10 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. -import copy import testtools -from kmip.core import exceptions +from kmip.core import enums +from kmip.core import primitives from kmip.core import utils from kmip.core.messages.payloads import get_attribute_list @@ -32,16 +32,17 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): # Encodings taken from Sections 3.1.4 of the KMIP 1.1 testing # documentation. - self.encoding_with_uid = utils.BytearrayStream(( + self.full_encoding = utils.BytearrayStream( b'\x42\x00\x79\x01\x00\x00\x00\x30\x42\x00\x94\x07\x00\x00\x00\x24' b'\x62\x34\x66\x61\x65\x65\x31\x30\x2D\x61\x61\x32\x61\x2D\x34\x34' b'\x34\x36\x2D\x38\x61\x64\x34\x2D\x30\x38\x38\x31\x66\x33\x34\x32' - b'\x32\x39\x35\x39\x00\x00\x00\x00')) + b'\x32\x39\x35\x39\x00\x00\x00\x00' + ) + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x00' + ) - self.encoding_without_uid = utils.BytearrayStream(( - b'\x42\x00\x79\x01\x00\x00\x00\x00')) - - self.uid = 'b4faee10-aa2a-4446-8ad4-0881f3422959' + self.unique_identifier = 'b4faee10-aa2a-4446-8ad4-0881f3422959' def tearDown(self): super(TestGetAttributeListRequestPayload, self).tearDown() @@ -58,17 +59,44 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): Test that a GetAttributeList request payload can be constructed with a valid value. """ - get_attribute_list.GetAttributeListRequestPayload(self.uid) + get_attribute_list.GetAttributeListRequestPayload( + 'test-unique-identifier', + ) - def test_validate_with_invalid_uid(self): + def test_unique_identifier(self): """ - Test that a TypeError exception is raised when an invalid ID is used - to construct a GetAttributeList request payload. + Test that the unique_identifier attribute of a GetAttributeList + request payload can be properly set and retrieved. """ - kwargs = {'uid': 0} + payload = get_attribute_list.GetAttributeListRequestPayload() + + self.assertIsNone(payload.unique_identifier) + self.assertIsNone(payload._unique_identifier) + + payload.unique_identifier = 'test-unique-identifier' + + self.assertEqual('test-unique-identifier', payload.unique_identifier) + self.assertEqual( + primitives.TextString( + value='test-unique-identifier', + tag=enums.Tags.UNIQUE_IDENTIFIER + ), + payload._unique_identifier + ) + + def test_unique_identifier_with_invalid_value(self): + """ + Test that a TypeError is raised when an invalid ID is used to set + the unique_identifier attribute of a GetAttributeList request payload. + """ + payload = get_attribute_list.GetAttributeListRequestPayload() + args = (payload, 'unique_identifier', 0) self.assertRaisesRegexp( - TypeError, "uid must be a string", - get_attribute_list.GetAttributeListRequestPayload, **kwargs) + TypeError, + "unique identifier must be a string", + setattr, + *args + ) def test_read(self): """ @@ -76,49 +104,90 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): stream. """ payload = get_attribute_list.GetAttributeListRequestPayload() - payload.read(self.encoding_with_uid) - self.assertEqual(self.uid, payload.uid) - def test_read_with_no_uid(self): + self.assertEqual(None, payload._unique_identifier) + + payload.read(self.full_encoding) + + self.assertEqual(self.unique_identifier, payload.unique_identifier) + self.assertEqual( + primitives.TextString( + value=self.unique_identifier, + tag=enums.Tags.UNIQUE_IDENTIFIER + ), + payload._unique_identifier + ) + + def test_read_with_no_content(self): """ - Test that a GetAttributeList request payload with no ID can be read - from a data stream. + Test that a GetAttributeList response payload with no ID or attribute + names can be read from a data stream. """ payload = get_attribute_list.GetAttributeListRequestPayload() - payload.read(self.encoding_without_uid) - self.assertEqual(None, payload.uid) + + self.assertEqual(None, payload._unique_identifier) + + payload.read(self.empty_encoding) + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload._unique_identifier) def test_write(self): """ Test that a GetAttributeList request payload can be written to a data stream. """ - payload = get_attribute_list.GetAttributeListRequestPayload(self.uid) + payload = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) stream = utils.BytearrayStream() payload.write(stream) - self.assertEqual(len(self.encoding_with_uid), len(stream)) - self.assertEqual(self.encoding_with_uid, stream) + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) - def test_write_with_no_uid(self): + def test_write_with_no_content(self): """ - Test that a GetAttributeList request payload with no ID can be written - to a data stream. + Test that a GetAttributeList request payload with no ID or attribute + names can be written to a data stream. """ payload = get_attribute_list.GetAttributeListRequestPayload() stream = utils.BytearrayStream() payload.write(stream) - self.assertEqual(len(self.encoding_without_uid), len(stream)) - self.assertEqual(self.encoding_without_uid, stream) + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) def test_repr(self): """ Test that repr can be applied to a GetAttributeList request payload. """ - payload = get_attribute_list.GetAttributeListRequestPayload(self.uid) - args = "uid={0}".format(payload.uid) - expected = "GetAttributeListRequestPayload({0})".format(args) + payload = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) + expected = "GetAttributeListRequestPayload({0})".format( + unique_identifier + ) + observed = repr(payload) + self.assertEqual(expected, observed) + + def test_repr_with_no_content(self): + """ + Test that repr can be applied to a GetAttributeList request payload + with no ID or attribute names. + """ + payload = get_attribute_list.GetAttributeListRequestPayload( + None + ) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) + expected = "GetAttributeListRequestPayload({0})".format( + unique_identifier + ) observed = repr(payload) self.assertEqual(expected, observed) @@ -126,8 +195,26 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): """ Test that str can be applied to a GetAttributeList request payload. """ - payload = get_attribute_list.GetAttributeListRequestPayload(self.uid) - expected = str({'uid': payload.uid}) + payload = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) + expected = str({ + 'unique_identifier': self.unique_identifier + }) + observed = str(payload) + self.assertEqual(expected, observed) + + def test_str_with_no_content(self): + """ + Test that str can be applied to a GetAttributeList request payload + with no ID or attribute names. + """ + payload = get_attribute_list.GetAttributeListRequestPayload( + None + ) + expected = str({ + 'unique_identifier': None + }) observed = str(payload) self.assertEqual(expected, observed) @@ -136,19 +223,27 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): Test that the equality operator returns True when comparing two GetAttributeList request payloads with the same data. """ - a = get_attribute_list.GetAttributeListRequestPayload(self.uid) - b = get_attribute_list.GetAttributeListRequestPayload(self.uid) + a = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) + b = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) self.assertTrue(a == b) self.assertTrue(b == a) - def test_equal_on_not_equal_uid(self): + def test_equal_on_not_equal_unique_identifier(self): """ Test that the equality operator returns False when comparing two - GetAttributeList request payloads with different data. + GetAttributeList request payloads with different IDs. """ - a = get_attribute_list.GetAttributeListRequestPayload(self.uid) - b = get_attribute_list.GetAttributeListRequestPayload('invalid') + a = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) + b = get_attribute_list.GetAttributeListRequestPayload( + 'invalid' + ) self.assertFalse(a == b) self.assertFalse(b == a) @@ -159,7 +254,9 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): GetAttributeList request payload to a non-GetAttributeList request payload. """ - a = get_attribute_list.GetAttributeListRequestPayload(self.uid) + a = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) b = "invalid" self.assertFalse(a == b) @@ -170,19 +267,27 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): Test that the inequality operator returns False when comparing two GetAttributeList request payloads with the same internal data. """ - a = get_attribute_list.GetAttributeListRequestPayload(self.uid) - b = get_attribute_list.GetAttributeListRequestPayload(self.uid) + a = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) + b = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) self.assertFalse(a != b) self.assertFalse(b != a) - def test_not_equal_on_not_equal_uid(self): + def test_not_equal_on_not_equal_unique_identifier(self): """ Test that the inequality operator returns True when comparing two - GetAttributeList request payloads with different data. + GetAttributeList request payloads with different IDs. """ - a = get_attribute_list.GetAttributeListRequestPayload(self.uid) - b = get_attribute_list.GetAttributeListRequestPayload('invalid') + a = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) + b = get_attribute_list.GetAttributeListRequestPayload( + 'invalid' + ) self.assertTrue(a != b) self.assertTrue(b != a) @@ -193,7 +298,9 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): GetAttributeList request payload to a non-GetAttributeList request payload. """ - a = get_attribute_list.GetAttributeListRequestPayload(self.uid) + a = get_attribute_list.GetAttributeListRequestPayload( + self.unique_identifier + ) b = "invalid" self.assertTrue(a != b) @@ -202,8 +309,7 @@ class TestGetAttributeListRequestPayload(testtools.TestCase): class TestGetAttributeListResponsePayload(testtools.TestCase): """ - Test encodings obtained from Sections 12.1 and 12.2 of the KMIP 1.1 Test - Cases documentation. + Test suite for the GetAttributeList response payload. """ def setUp(self): @@ -211,7 +317,7 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): # Encodings taken from Sections 3.1.4 of the KMIP 1.1 testing # documentation. - self.encoding_with_uid_with_names = utils.BytearrayStream(( + self.full_encoding = utils.BytearrayStream( b'\x42\x00\x7C\x01\x00\x00\x01\x60\x42\x00\x94\x07\x00\x00\x00\x24' b'\x62\x34\x66\x61\x65\x65\x31\x30\x2D\x61\x61\x32\x61\x2D\x34\x34' b'\x34\x36\x2D\x38\x61\x64\x34\x2D\x30\x38\x38\x31\x66\x33\x34\x32' @@ -234,9 +340,10 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): b'\x42\x00\x0A\x07\x00\x00\x00\x13\x43\x6F\x6E\x74\x61\x63\x74\x20' b'\x49\x6E\x66\x6F\x72\x6D\x61\x74\x69\x6F\x6E\x00\x00\x00\x00\x00' b'\x42\x00\x0A\x07\x00\x00\x00\x10\x4C\x61\x73\x74\x20\x43\x68\x61' - b'\x6E\x67\x65\x20\x44\x61\x74\x65')) - self.encoding_without_uid_with_names = utils.BytearrayStream(( - b'\x42\x00\x7C\x01\x00\x00\x01\x60\x42\x00\x0A\x07\x00\x00\x00\x14' + b'\x6E\x67\x65\x20\x44\x61\x74\x65' + ) + self.encoding_sans_unique_identifier = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x01\x30\x42\x00\x0A\x07\x00\x00\x00\x14' b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x4C\x65' b'\x6E\x67\x74\x68\x00\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x17' b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x41\x6C' @@ -255,15 +362,20 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): b'\x42\x00\x0A\x07\x00\x00\x00\x13\x43\x6F\x6E\x74\x61\x63\x74\x20' b'\x49\x6E\x66\x6F\x72\x6D\x61\x74\x69\x6F\x6E\x00\x00\x00\x00\x00' b'\x42\x00\x0A\x07\x00\x00\x00\x10\x4C\x61\x73\x74\x20\x43\x68\x61' - b'\x6E\x67\x65\x20\x44\x61\x74\x65')) - self.encoding_with_uid_without_names = utils.BytearrayStream(( + b'\x6E\x67\x65\x20\x44\x61\x74\x65' + ) + self.encoding_sans_attribute_names = utils.BytearrayStream( b'\x42\x00\x7C\x01\x00\x00\x00\x30\x42\x00\x94\x07\x00\x00\x00\x24' b'\x62\x34\x66\x61\x65\x65\x31\x30\x2D\x61\x61\x32\x61\x2D\x34\x34' b'\x34\x36\x2D\x38\x61\x64\x34\x2D\x30\x38\x38\x31\x66\x33\x34\x32' - b'\x32\x39\x35\x39\x00\x00\x00\x00')) + b'\x32\x39\x35\x39\x00\x00\x00\x00' + ) + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x00' + ) - self.uid = 'b4faee10-aa2a-4446-8ad4-0881f3422959' - self.attribute_names = list(( + self.unique_identifier = 'b4faee10-aa2a-4446-8ad4-0881f3422959' + self.attribute_names = [ 'Cryptographic Length', 'Cryptographic Algorithm', 'State', @@ -275,59 +387,167 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): 'Cryptographic Usage Mask', 'Object Type', 'Contact Information', - 'Last Change Date')) + 'Last Change Date' + ] def tearDown(self): super(TestGetAttributeListResponsePayload, self).tearDown() def test_init(self): """ - Test that a GetAttributeList response payload can be constructed. + Test that a GetAttributeList response payload can be constructed with + no arguments. """ get_attribute_list.GetAttributeListResponsePayload() def test_init_with_args(self): """ - Test that a GetAttributeList response payload can be constructed with - valid values. + Test that a GetAttributeList response payload can be constructed with a + valid value. """ get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + 'test-unique-identifier', + ['test-attribute-name-1', 'test-attribute-name-2'] + ) - def test_validate_with_invalid_uid(self): + def test_unique_identifier(self): """ - Test that a TypeError exception is raised when an invalid ID is used - to construct a GetAttributeList response payload. + Test that the unique_identifier attribute of a GetAttributeList + response payload can be properly set and retrieved. """ - kwargs = {'uid': 0, 'attribute_names': self.attribute_names} + payload = get_attribute_list.GetAttributeListResponsePayload() + + self.assertIsNone(payload.unique_identifier) + self.assertIsNone(payload._unique_identifier) + + payload.unique_identifier = 'test-unique-identifier' + + self.assertEqual('test-unique-identifier', payload.unique_identifier) + self.assertEqual( + primitives.TextString( + value='test-unique-identifier', + tag=enums.Tags.UNIQUE_IDENTIFIER + ), + payload._unique_identifier + ) + + def test_unique_identifier_with_invalid_value(self): + """ + Test that a TypeError is raised when an invalid ID is used to set + the unique_identifier attribute of a GetAttributeList response + payload. + """ + payload = get_attribute_list.GetAttributeListResponsePayload() + args = (payload, 'unique_identifier', 0) self.assertRaisesRegexp( - TypeError, "uid must be a string", - get_attribute_list.GetAttributeListResponsePayload, **kwargs) + TypeError, + "unique identifier must be a string", + setattr, + *args + ) - def test_validate_with_invalid_attribute_names(self): + def test_attribute_names(self): """ - Test that a TypeError exception is raised when an invalid attribute - name list is used to construct a GetAttributeList response payload. + Test that the attribute_names attribute of a GetAttributeList response + payload can be properly set and retrieved. """ - kwargs = {'uid': self.uid, 'attribute_names': 'invalid'} + payload = get_attribute_list.GetAttributeListResponsePayload() + + self.assertEqual(list(), payload.attribute_names) + self.assertEqual(list(), payload._attribute_names) + + payload.attribute_names = [ + 'test-attribute-name-1', + 'test-attribute-name-2' + ] + + self.assertEqual(2, len(payload.attribute_names)) + self.assertEqual(2, len(payload._attribute_names)) + self.assertIn('test-attribute-name-1', payload.attribute_names) + self.assertIn('test-attribute-name-2', payload.attribute_names) + self.assertIn( + primitives.TextString( + value='test-attribute-name-1', + tag=enums.Tags.ATTRIBUTE_NAME + ), + payload._attribute_names + ) + self.assertIn( + primitives.TextString( + value='test-attribute-name-2', + tag=enums.Tags.ATTRIBUTE_NAME + ), + payload._attribute_names + ) + + def test_attribute_names_with_invalid_value(self): + """ + Test that a TypeError is raised when an invalid list is used to set + the attribute_names attribute of a GetAttributeList response payload. + """ + payload = get_attribute_list.GetAttributeListResponsePayload() + args = (payload, 'attribute_names', 0) self.assertRaisesRegexp( - TypeError, "attribute names must be a list", - get_attribute_list.GetAttributeListResponsePayload, **kwargs) + TypeError, + "attribute_names must be a list of strings", + setattr, + *args + ) - def test_validate_with_invalid_attribute_name(self): + def test_attribute_names_with_invalid_attribute_name(self): """ - Test that a TypeError exception is raised when an invalid attribute - name is used to construct a GetAttributeList response payload object. + Test that a TypeError is raised when an invalid attribute name is + included in the list used to set the attribute_names attribute of a + GetAttributeList response payload. """ - kwargs = {'uid': self.uid, 'attribute_names': [0]} - self.assertRaises( - TypeError, get_attribute_list.GetAttributeListResponsePayload, - **kwargs) + payload = get_attribute_list.GetAttributeListResponsePayload() + args = ( + payload, + 'attribute_names', + ['test-attribute-name-1', 0] + ) + self.assertRaisesRegexp( + TypeError, + "attribute_names must be a list of strings; " + "item 2 has type {0}".format(type(0)), + setattr, + *args + ) - kwargs = {'uid': self.uid, 'attribute_names': ['', 0, '']} - self.assertRaises( - TypeError, get_attribute_list.GetAttributeListResponsePayload, - **kwargs) + def test_attribute_names_with_duplicates(self): + """ + Test that duplicate attribute names are silently removed when setting + the attribute_names attribute of a GetAttributeList response payload. + """ + payload = get_attribute_list.GetAttributeListResponsePayload() + + self.assertEqual(list(), payload.attribute_names) + self.assertEqual(list(), payload._attribute_names) + + payload.attribute_names = [ + 'test-attribute-name-1', + 'test-attribute-name-1', + 'test-attribute-name-2' + ] + + self.assertEqual(2, len(payload.attribute_names)) + self.assertEqual(2, len(payload._attribute_names)) + self.assertIn('test-attribute-name-1', payload.attribute_names) + self.assertIn('test-attribute-name-2', payload.attribute_names) + self.assertIn( + primitives.TextString( + value='test-attribute-name-1', + tag=enums.Tags.ATTRIBUTE_NAME + ), + payload._attribute_names + ) + self.assertIn( + primitives.TextString( + value='test-attribute-name-2', + tag=enums.Tags.ATTRIBUTE_NAME + ), + payload._attribute_names + ) def test_read(self): """ @@ -335,31 +555,99 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): stream. """ payload = get_attribute_list.GetAttributeListResponsePayload() - payload.read(self.encoding_with_uid_with_names) - self.assertEqual(self.uid, payload.uid) - self.assertEqual(self.attribute_names, payload.attribute_names) + self.assertEqual(None, payload._unique_identifier) + self.assertEqual(list(), payload._attribute_names) - def test_read_with_no_uid(self): + payload.read(self.full_encoding) + + self.assertEqual(self.unique_identifier, payload.unique_identifier) + self.assertEqual( + primitives.TextString( + value=self.unique_identifier, + tag=enums.Tags.UNIQUE_IDENTIFIER + ), + payload._unique_identifier + ) + self.assertEqual( + set(self.attribute_names), + set(payload.attribute_names) + ) + for attribute_name in self.attribute_names: + self.assertIn( + primitives.TextString( + value=attribute_name, + tag=enums.Tags.ATTRIBUTE_NAME + ), + payload._attribute_names + ) + + def test_read_with_no_unique_identifier(self): """ - Test that an InvalidKmipEncoding error gets raised when attempting to - read a GetAttributeList response encoding with no ID data. + Test that a GetAttributeList response payload with no ID can be read + from a data stream. """ payload = get_attribute_list.GetAttributeListResponsePayload() - self.assertRaisesRegexp( - exceptions.InvalidKmipEncoding, "expected uid encoding not found", - payload.read, self.encoding_without_uid_with_names) + + self.assertEqual(None, payload._unique_identifier) + self.assertEqual(list(), payload._attribute_names) + + payload.read(self.encoding_sans_unique_identifier) + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload._unique_identifier) + self.assertEqual( + set(self.attribute_names), + set(payload.attribute_names) + ) + for attribute_name in self.attribute_names: + self.assertIn( + primitives.TextString( + value=attribute_name, + tag=enums.Tags.ATTRIBUTE_NAME + ), + payload._attribute_names + ) def test_read_with_no_attribute_names(self): """ - Test that a GetAttributeList response payload without attribute name - data can be read from a data stream. + Test that a GetAttributeList response payload with no attribute names + can be read from a data stream. """ payload = get_attribute_list.GetAttributeListResponsePayload() - payload.read(self.encoding_with_uid_without_names) - self.assertEqual(self.uid, payload.uid) + self.assertEqual(None, payload._unique_identifier) + self.assertEqual(list(), payload._attribute_names) + + payload.read(self.encoding_sans_attribute_names) + + self.assertEqual(self.unique_identifier, payload.unique_identifier) + self.assertEqual( + primitives.TextString( + value=self.unique_identifier, + tag=enums.Tags.UNIQUE_IDENTIFIER + ), + payload._unique_identifier + ) self.assertEqual(list(), payload.attribute_names) + self.assertEqual(list(), payload._attribute_names) + + def test_read_with_no_content(self): + """ + Test that a GetAttributeList response payload with no ID or attribute + names can be read from a data stream. + """ + payload = get_attribute_list.GetAttributeListResponsePayload() + + self.assertEqual(None, payload._unique_identifier) + self.assertEqual(list(), payload._attribute_names) + + payload.read(self.empty_encoding) + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload._unique_identifier) + self.assertEqual(list(), payload.attribute_names) + self.assertEqual(list(), payload._attribute_names) def test_write(self): """ @@ -367,35 +655,147 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): stream. """ payload = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) stream = utils.BytearrayStream() payload.write(stream) - self.assertEqual(len(self.encoding_with_uid_with_names), len(stream)) - self.assertEqual(self.encoding_with_uid_with_names, stream) + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) - def test_write_with_no_attribute_names(self): + def test_write_with_no_unique_identifier(self): """ - Test that a GetAttributeList response payload with no attribute name - data can be written to a data stream. + Test that a GetAttributeList response payload with no ID can be + written to a data stream. """ - payload = get_attribute_list.GetAttributeListResponsePayload(self.uid) + payload = get_attribute_list.GetAttributeListResponsePayload( + None, + self.attribute_names + ) stream = utils.BytearrayStream() payload.write(stream) self.assertEqual( - len(self.encoding_with_uid_without_names), len(stream)) - self.assertEqual(self.encoding_with_uid_without_names, stream) + len(self.encoding_sans_unique_identifier), + len(stream) + ) + self.assertEqual( + str(self.encoding_sans_unique_identifier), + str(stream) + ) + + def test_write_with_no_attribute_names(self): + """ + Test that a GetAttributeList response payload with no attribute names + can be written to a data stream. + """ + payload = get_attribute_list.GetAttributeListResponsePayload( + self.unique_identifier, + None + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.encoding_sans_attribute_names), len(stream)) + self.assertEqual(str(self.encoding_sans_attribute_names), str(stream)) + + def test_write_with_no_content(self): + """ + Test that a GetAttributeList response payload with no ID or attribute + names can be written to a data stream. + """ + payload = get_attribute_list.GetAttributeListResponsePayload() + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) def test_repr(self): """ Test that repr can be applied to a GetAttributeList response payload. """ payload = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) - args = "uid={0}, attribute_names={1}".format( - payload.uid, payload.attribute_names) - expected = "GetAttributeListResponsePayload({0})".format(args) + self.unique_identifier, + self.attribute_names + ) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) + attribute_names = "attribute_names={0}".format( + payload.attribute_names + ) + expected = "GetAttributeListResponsePayload({0}, {1})".format( + unique_identifier, + attribute_names + ) + observed = repr(payload) + self.assertEqual(expected, observed) + + def test_repr_with_no_unique_identifier(self): + """ + Test that repr can be applied to a GetAttributeList response payload + with no ID. + """ + payload = get_attribute_list.GetAttributeListResponsePayload( + None, + self.attribute_names + ) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) + attribute_names = "attribute_names={0}".format( + payload.attribute_names + ) + expected = "GetAttributeListResponsePayload({0}, {1})".format( + unique_identifier, + attribute_names + ) + observed = repr(payload) + self.assertEqual(expected, observed) + + def test_repr_with_no_attribute_names(self): + """ + Test that repr can be applied to a GetAttributeList response payload + with no attribute names. + """ + payload = get_attribute_list.GetAttributeListResponsePayload( + self.unique_identifier, + None + ) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) + attribute_names = "attribute_names={0}".format( + payload.attribute_names + ) + expected = "GetAttributeListResponsePayload({0}, {1})".format( + unique_identifier, + attribute_names + ) + observed = repr(payload) + self.assertEqual(expected, observed) + + def test_repr_with_no_content(self): + """ + Test that repr can be applied to a GetAttributeList response payload + with no ID or attribute names. + """ + payload = get_attribute_list.GetAttributeListResponsePayload( + None, + None + ) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) + attribute_names = "attribute_names={0}".format( + payload.attribute_names + ) + expected = "GetAttributeListResponsePayload({0}, {1})".format( + unique_identifier, + attribute_names + ) observed = repr(payload) self.assertEqual(expected, observed) @@ -403,9 +803,62 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): """ Test that str can be applied to a GetAttributeList response payload. """ - payload = get_attribute_list.GetAttributeListResponsePayload(self.uid) - expected = str({'uid': payload.uid, - 'attribute_names': payload.attribute_names}) + payload = get_attribute_list.GetAttributeListResponsePayload( + self.unique_identifier, + self.attribute_names + ) + expected = str({ + 'unique_identifier': self.unique_identifier, + 'attribute_names': self.attribute_names + }) + observed = str(payload) + self.assertEqual(expected, observed) + + def test_str_with_no_id(self): + """ + Test that str can be applied to a GetAttributeList response payload + with no ID. + """ + payload = get_attribute_list.GetAttributeListResponsePayload( + None, + self.attribute_names + ) + expected = str({ + 'unique_identifier': None, + 'attribute_names': self.attribute_names + }) + observed = str(payload) + self.assertEqual(expected, observed) + + def test_str_with_no_attribute_names(self): + """ + Test that str can be applied to a GetAttributeList response payload + with no attribute names. + """ + payload = get_attribute_list.GetAttributeListResponsePayload( + self.unique_identifier, + None + ) + expected = str({ + 'unique_identifier': self.unique_identifier, + 'attribute_names': list() + }) + observed = str(payload) + self.assertEqual(expected, observed) + + def test_str_with_no_content(self): + """ + Test that str can be applied to a GetAttributeList response payload + with no ID or attribute names. + """ + payload = get_attribute_list.GetAttributeListResponsePayload( + None, + None + ) + expected = str({ + 'unique_identifier': None, + 'attribute_names': list() + }) observed = str(payload) self.assertEqual(expected, observed) @@ -415,22 +868,49 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): GetAttributeList response payloads with the same data. """ a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) b = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) self.assertTrue(a == b) self.assertTrue(b == a) - def test_equal_on_not_equal_uid(self): + def test_equal_with_mixed_attribute_names(self): """ - Test that the equality operator returns False when comparing two - GetAttributeList response payloads with different data. + Test that the equality operator returns True when comparing two + GetAttributeList response payload with the same attribute_name sets + but with different attribute name orderings. """ a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) + self.attribute_names.reverse() b = get_attribute_list.GetAttributeListResponsePayload( - 'invalid', self.attribute_names) + self.unique_identifier, + self.attribute_names + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns False when comparing two + GetAttributeList response payloads with different IDs. + """ + a = get_attribute_list.GetAttributeListResponsePayload( + self.unique_identifier, + self.attribute_names + ) + b = get_attribute_list.GetAttributeListResponsePayload( + 'invalid', + self.attribute_names + ) self.assertFalse(a == b) self.assertFalse(b == a) @@ -438,27 +918,16 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): def test_equal_on_not_equal_attribute_names(self): """ Test that the equality operator returns False when comparing two - GetAttributeList response payloads with different data. + GetAttributeList response payloads with different attribute names. """ a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) b = get_attribute_list.GetAttributeListResponsePayload( - self.uid, list()) - - self.assertFalse(a == b) - self.assertFalse(b == a) - - def test_equal_on_not_equal_attribute_name(self): - """ - Test that the equality operator returns False when comparing two - GetAttributeList response payloads with different data. - """ - alt_names = copy.deepcopy(self.attribute_names) - alt_names[0] = 'invalid' - a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) - b = get_attribute_list.GetAttributeListResponsePayload( - self.uid, alt_names) + self.unique_identifier, + None + ) self.assertFalse(a == b) self.assertFalse(b == a) @@ -470,8 +939,10 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): payload. """ a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) - b = 'invalid' + self.unique_identifier, + self.attribute_names + ) + b = "invalid" self.assertFalse(a == b) self.assertFalse(b == a) @@ -482,62 +953,61 @@ class TestGetAttributeListResponsePayload(testtools.TestCase): two GetAttributeList response payloads with the same internal data. """ a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) b = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) self.assertFalse(a != b) self.assertFalse(b != a) - def test_not_equal_on_not_equal_uid(self): + def test_not_equal_on_not_equal_unique_identifier(self): """ Test that the inequality operator returns True when comparing two - GetAttributeList request payloads with different data. + GetAttributeList response payloads with different IDs. """ a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) b = get_attribute_list.GetAttributeListResponsePayload( - 'invalid', self.attribute_names) + 'invalid', + self.attribute_names + ) self.assertTrue(a != b) self.assertTrue(b != a) def test_not_equal_on_not_equal_attribute_names(self): """ - Test that the inequality operator returns False when comparing two - GetAttributeList response payloads with different data. + Test that the inequality operator returns True when comparing two + GetAttributeList response payloads with different attribute names. """ a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) b = get_attribute_list.GetAttributeListResponsePayload( - self.uid, list()) - - self.assertTrue(a != b) - self.assertTrue(b != a) - - def test_not_equal_on_not_equal_attribute_name(self): - """ - Test that the inequality operator returns False when comparing two - GetAttributeList response payloads with different data. - """ - alt_names = copy.deepcopy(self.attribute_names) - alt_names[0] = 'Operation Policy Name' - a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) - b = get_attribute_list.GetAttributeListResponsePayload( - self.uid, alt_names) + self.unique_identifier, + None + ) self.assertTrue(a != b) self.assertTrue(b != a) def test_not_equal_on_type_mismatch(self): """ - Test that the inequality operator returns True when comparing a + Test that the equality operator returns True when comparing a GetAttributeList response payload to a non-GetAttributeList response payload. """ a = get_attribute_list.GetAttributeListResponsePayload( - self.uid, self.attribute_names) + self.unique_identifier, + self.attribute_names + ) b = "invalid" self.assertTrue(a != b) diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 588db34..e29250a 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -44,6 +44,7 @@ from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get +from kmip.core.messages.payloads import get_attribute_list from kmip.core.messages.payloads import get_attributes from kmip.core.messages.payloads import query from kmip.core.messages.payloads import register @@ -883,6 +884,7 @@ class TestKmipEngine(testtools.TestCase): e._process_register = mock.MagicMock() e._process_get = mock.MagicMock() e._process_get_attributes = mock.MagicMock() + e._process_get_attribute_list = mock.MagicMock() e._process_activate = mock.MagicMock() e._process_destroy = mock.MagicMock() e._process_query = mock.MagicMock() @@ -893,6 +895,7 @@ class TestKmipEngine(testtools.TestCase): e._process_operation(enums.Operation.REGISTER, None) e._process_operation(enums.Operation.GET, None) e._process_operation(enums.Operation.GET_ATTRIBUTES, None) + e._process_operation(enums.Operation.GET_ATTRIBUTE_LIST, None) e._process_operation(enums.Operation.ACTIVATE, None) e._process_operation(enums.Operation.DESTROY, None) e._process_operation(enums.Operation.QUERY, None) @@ -903,6 +906,7 @@ class TestKmipEngine(testtools.TestCase): e._process_register.assert_called_with(None) e._process_get.assert_called_with(None) e._process_get_attributes.assert_called_with(None) + e._process_get_attribute_list.assert_called_with(None) e._process_activate.assert_called_with(None) e._process_destroy.assert_called_with(None) e._process_query.assert_called_with(None) @@ -3866,6 +3870,184 @@ class TestKmipEngine(testtools.TestCase): *args ) + def test_get_attribute_list(self): + """ + Test that a GetAttributeList request can be processed correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + payload = get_attribute_list.GetAttributeListRequestPayload( + unique_identifier='1' + ) + + response_payload = e._process_get_attribute_list(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: GetAttributeList" + ) + self.assertEqual( + '1', + response_payload.unique_identifier + ) + self.assertEqual( + 8, + len(response_payload.attribute_names) + ) + self.assertIn( + "Object Type", + response_payload.attribute_names + ) + self.assertIn( + "Name", + response_payload.attribute_names + ) + self.assertIn( + "Cryptographic Algorithm", + response_payload.attribute_names + ) + self.assertIn( + "Cryptographic Length", + response_payload.attribute_names + ) + self.assertIn( + "Operation Policy Name", + response_payload.attribute_names + ) + self.assertIn( + "Cryptographic Usage Mask", + response_payload.attribute_names + ) + self.assertIn( + "State", + response_payload.attribute_names + ) + self.assertIn( + "Unique Identifier", + response_payload.attribute_names + ) + + def test_get_attribute_list_with_no_arguments(self): + """ + Test that a GetAttributeList request with no arguments can be + processed correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + e._id_placeholder = '1' + + payload = get_attribute_list.GetAttributeListRequestPayload() + + response_payload = e._process_get_attribute_list(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: GetAttributeList" + ) + self.assertEqual( + '1', + response_payload.unique_identifier + ) + self.assertEqual( + 8, + len(response_payload.attribute_names) + ) + self.assertIn( + "Object Type", + response_payload.attribute_names + ) + self.assertIn( + "Name", + response_payload.attribute_names + ) + self.assertIn( + "Cryptographic Algorithm", + response_payload.attribute_names + ) + self.assertIn( + "Cryptographic Length", + response_payload.attribute_names + ) + self.assertIn( + "Operation Policy Name", + response_payload.attribute_names + ) + self.assertIn( + "Cryptographic Usage Mask", + response_payload.attribute_names + ) + self.assertIn( + "State", + response_payload.attribute_names + ) + self.assertIn( + "Unique Identifier", + response_payload.attribute_names + ) + + def test_get_attribute_list_not_allowed_by_policy(self): + """ + Test that an unallowed request is handled correctly by + GetAttributeList. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._client_identity = 'test' + + obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE) + obj_a._owner = 'admin' + + e._data_session.add(obj_a) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + payload = get_attribute_list.GetAttributeListRequestPayload( + unique_identifier=id_a + ) + + # Test by specifying the ID of the object whose attributes should + # be retrieved. + args = [payload] + self.assertRaisesRegex( + exceptions.ItemNotFound, + "Could not locate object: {0}".format(id_a), + e._process_get_attribute_list, + *args + ) + def test_activate(self): """ Test that an Activate request can be processed correctly. @@ -4180,7 +4362,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsNotNone(result.operations) - self.assertEqual(8, len(result.operations)) + self.assertEqual(9, len(result.operations)) self.assertEqual( enums.Operation.CREATE, result.operations[0].value @@ -4202,17 +4384,21 @@ class TestKmipEngine(testtools.TestCase): result.operations[4].value ) self.assertEqual( - enums.Operation.ACTIVATE, + enums.Operation.GET_ATTRIBUTE_LIST, result.operations[5].value ) self.assertEqual( - enums.Operation.DESTROY, + enums.Operation.ACTIVATE, result.operations[6].value ) self.assertEqual( - enums.Operation.QUERY, + enums.Operation.DESTROY, result.operations[7].value ) + self.assertEqual( + enums.Operation.QUERY, + result.operations[8].value + ) self.assertEqual(list(), result.object_types) self.assertIsNotNone(result.vendor_identification) self.assertEqual( @@ -4231,7 +4417,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsNotNone(result.operations) - self.assertEqual(9, len(result.operations)) + self.assertEqual(10, len(result.operations)) self.assertEqual( enums.Operation.DISCOVER_VERSIONS, result.operations[-1].value diff --git a/kmip/tests/unit/services/test_kmip_client.py b/kmip/tests/unit/services/test_kmip_client.py index 1b5e67b..0c515e4 100644 --- a/kmip/tests/unit/services/test_kmip_client.py +++ b/kmip/tests/unit/services/test_kmip_client.py @@ -421,7 +421,7 @@ class TestKMIPClient(TestCase): self.assertIsInstance( batch_item.request_payload, get_attribute_list.GetAttributeListRequestPayload) - self.assertEqual(uid, batch_item.request_payload.uid) + self.assertEqual(uid, batch_item.request_payload.unique_identifier) def test_process_batch_items(self): batch_item = ResponseBatchItem( @@ -631,7 +631,7 @@ class TestKMIPClient(TestCase): uid = '00000000-1111-2222-3333-444444444444' names = ['Cryptographic Algorithm', 'Cryptographic Length'] payload = get_attribute_list.GetAttributeListResponsePayload( - uid=uid, attribute_names=names) + unique_identifier=uid, attribute_names=names) batch_item = ResponseBatchItem( operation=Operation(OperationEnum.GET_ATTRIBUTE_LIST), response_payload=payload)