diff --git a/kmip/core/objects.py b/kmip/core/objects.py index 2f2e25d..42a5343 100644 --- a/kmip/core/objects.py +++ b/kmip/core/objects.py @@ -175,6 +175,236 @@ class Attribute(Struct): return NotImplemented +class AttributeReference(primitives.Struct): + """ + A structure containing reference information for an attribute. + + This is intended for use with KMIP 2.0+. + + Attributes: + vendor_identification: A string identifying the vendor associated + with the attribute. + attribute_name: A string containing the attribute name. + """ + + def __init__(self, vendor_identification=None, attribute_name=None): + """ + Construct an AttributeReference structure. + + Args: + vendor_identification (string): A string identifying the vendor + associated with the attribute. Optional, defaults to None. + Required for read/write. + attribute_name (string): A string containing the attribute name. + Optional, defaults to None. Required for read/write. + """ + super(AttributeReference, self).__init__( + tag=enums.Tags.ATTRIBUTE_REFERENCE + ) + + self._vendor_identification = None + self._attribute_name = None + + self.vendor_identification = vendor_identification + self.attribute_name = attribute_name + + @property + def vendor_identification(self): + if self._vendor_identification: + return self._vendor_identification.value + else: + return None + + @vendor_identification.setter + def vendor_identification(self, value): + if value is None: + self._vendor_identification = None + elif isinstance(value, six.string_types): + self._vendor_identification = primitives.TextString( + value, + tag=enums.Tags.VENDOR_IDENTIFICATION + ) + else: + raise TypeError("Vendor identification must be a string.") + + @property + def attribute_name(self): + if self._attribute_name: + return self._attribute_name.value + else: + return None + + @attribute_name.setter + def attribute_name(self, value): + if value is None: + self._attribute_name = None + elif isinstance(value, six.string_types): + self._attribute_name = primitives.TextString( + value, + tag=enums.Tags.ATTRIBUTE_NAME + ) + else: + raise TypeError("Attribute name must be a string.") + + def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_2_0): + """ + Read the data stream and decode the AttributeReference structure into + its parts. + + Args: + input_buffer (stream): A data stream containing encoded object + data, supporting a read method. + kmip_version (enum): A KMIPVersion enumeration defining the KMIP + version with which the object will be decoded. Optional, + defaults to KMIP 2.0. + + Raises: + InvalidKmipEncoding: Raised if the vendor identification or + attribute name is missing from the encoding. + VersionNotSupported: Raised when a KMIP version is provided that + does not support the AttributeReference structure. + """ + if kmip_version < enums.KMIPVersion.KMIP_2_0: + raise exceptions.VersionNotSupported( + "KMIP {} does not support the AttributeReference " + "object.".format( + kmip_version.value + ) + ) + + super(AttributeReference, self).read( + input_buffer, + kmip_version=kmip_version + ) + local_buffer = BytearrayStream(input_buffer.read(self.length)) + + if self.is_tag_next(enums.Tags.VENDOR_IDENTIFICATION, local_buffer): + self._vendor_identification = primitives.TextString( + tag=enums.Tags.VENDOR_IDENTIFICATION + ) + self._vendor_identification.read( + local_buffer, + kmip_version=kmip_version + ) + else: + raise exceptions.InvalidKmipEncoding( + "The AttributeReference encoding is missing the vendor " + "identification string." + ) + + if self.is_tag_next(enums.Tags.ATTRIBUTE_NAME, local_buffer): + self._attribute_name = primitives.TextString( + tag=enums.Tags.ATTRIBUTE_NAME + ) + self._attribute_name.read( + local_buffer, + kmip_version=kmip_version + ) + else: + raise exceptions.InvalidKmipEncoding( + "The AttributeReference encoding is missing the attribute " + "name string." + ) + + self.is_oversized(local_buffer) + + def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_2_0): + """ + Write the AttributeReference structure encoding to the data stream. + + Args: + output_buffer (stream): A data stream in which to encode + Attributes structure data, supporting a write method. + kmip_version (enum): A KMIPVersion enumeration defining the KMIP + version with which the object will be encoded. Optional, + defaults to KMIP 2.0. + + Raises: + InvalidField: Raised if the vendor identification or attribute name + fields are not defined. + VersionNotSupported: Raised when a KMIP version is provided that + does not support the AttributeReference structure. + """ + if kmip_version < enums.KMIPVersion.KMIP_2_0: + raise exceptions.VersionNotSupported( + "KMIP {} does not support the AttributeReference " + "object.".format( + kmip_version.value + ) + ) + + local_buffer = BytearrayStream() + + if self._vendor_identification: + self._vendor_identification.write( + local_buffer, + kmip_version=kmip_version + ) + else: + raise exceptions.InvalidField( + "The AttributeReference is missing the vendor identification " + "field." + ) + + if self._attribute_name: + self._attribute_name.write( + local_buffer, + kmip_version=kmip_version + ) + else: + raise exceptions.InvalidField( + "The AttributeReference is missing the attribute name field." + ) + + self.length = local_buffer.length() + super(AttributeReference, self).write( + output_buffer, + kmip_version=kmip_version + ) + output_buffer.write(local_buffer.buffer) + + def __repr__(self): + v = "vendor_identification={}".format( + '"{}"'.format( + self.vendor_identification + ) if self.vendor_identification else None + ) + a = "attribute_name={}".format( + '"{}"'.format(self.attribute_name) if self.attribute_name else None + ) + values = ", ".join([v, a]) + return "AttributeReference({})".format(values) + + def __str__(self): + v = '"vendor_identification": "{}"'.format( + "{}".format( + self.vendor_identification + ) if self.vendor_identification else None + ) + a = '"attribute_name": "{}"'.format( + "{}".format(self.attribute_name) if self.attribute_name else None + ) + values = ", ".join([v, a]) + return '{' + values + '}' + + def __eq__(self, other): + if isinstance(other, AttributeReference): + if self.vendor_identification != other.vendor_identification: + return False + elif self.attribute_name != other.attribute_name: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, AttributeReference): + return not (self == other) + else: + return NotImplemented + + class Attributes(primitives.Struct): """ A collection of KMIP attributes. diff --git a/kmip/tests/unit/core/objects/test_objects.py b/kmip/tests/unit/core/objects/test_objects.py index 0339662..2cb748c 100644 --- a/kmip/tests/unit/core/objects/test_objects.py +++ b/kmip/tests/unit/core/objects/test_objects.py @@ -136,6 +136,396 @@ class TestAttributeClass(TestCase): self.assertTrue(self.attributeObj_a != self.attributeObj_b) +class TestAttributeReference(testtools.TestCase): + + def setUp(self): + super(TestAttributeReference, self).setUp() + + # This encoding matches the following set of values. + # AttributeReference + # Vendor Identification - Acme Corporation + # Attribute Name - Delivery Date + self.full_encoding = utils.BytearrayStream( + b'\x42\x01\x3B\x01\x00\x00\x00\x30' + b'\x42\x00\x9D\x07\x00\x00\x00\x10' + b'\x41\x63\x6D\x65\x20\x43\x6F\x72\x70\x6F\x72\x61\x74\x69\x6F\x6E' + b'\x42\x00\x0A\x07\x00\x00\x00\x0D' + b'\x44\x65\x6C\x69\x76\x65\x72\x79\x20\x44\x61\x74\x65\x00\x00\x00' + ) + + # This encoding matches the following set of values. + # AttributeReference + # Attribute Name - Delivery Date + self.no_vendor_identification_encoding = utils.BytearrayStream( + b'\x42\x01\x3B\x01\x00\x00\x00\x18' + b'\x42\x00\x0A\x07\x00\x00\x00\x0D' + b'\x44\x65\x6C\x69\x76\x65\x72\x79\x20\x44\x61\x74\x65\x00\x00\x00' + ) + + # This encoding matches the following set of values. + # AttributeReference + # Vendor Identification - Acme Corporation + self.no_attribute_name_encoding = utils.BytearrayStream( + b'\x42\x01\x3B\x01\x00\x00\x00\x18' + b'\x42\x00\x9D\x07\x00\x00\x00\x10' + b'\x41\x63\x6D\x65\x20\x43\x6F\x72\x70\x6F\x72\x61\x74\x69\x6F\x6E' + ) + + def tearDown(self): + super(TestAttributeReference, self).tearDown() + + def test_invalid_vendor_identification(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the vendor identification of an AttributeReference structure. + """ + kwargs = {"vendor_identification": 0} + self.assertRaisesRegex( + TypeError, + "Vendor identification must be a string.", + objects.AttributeReference, + **kwargs + ) + + args = (objects.AttributeReference(), "vendor_identification", 0) + self.assertRaisesRegex( + TypeError, + "Vendor identification must be a string.", + setattr, + *args + ) + + def test_invalid_attribute_name(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the attribute name of an AttributeReference structure. + """ + kwargs = {"attribute_name": 0} + self.assertRaisesRegex( + TypeError, + "Attribute name must be a string.", + objects.AttributeReference, + **kwargs + ) + + args = (objects.AttributeReference(), "attribute_name", 0) + self.assertRaisesRegex( + TypeError, + "Attribute name must be a string.", + setattr, + *args + ) + + def test_read(self): + """ + Test that an AttributeReference structure can be correctly read in + from a data stream. + """ + attribute_reference = objects.AttributeReference() + + self.assertIsNone(attribute_reference.vendor_identification) + self.assertIsNone(attribute_reference.attribute_name) + + attribute_reference.read(self.full_encoding) + + self.assertEqual( + "Acme Corporation", + attribute_reference.vendor_identification + ) + self.assertEqual( + "Delivery Date", + attribute_reference.attribute_name + ) + + def test_read_unsupported_kmip_version(self): + """ + Test that a VersionNotSupported error is raised during the decoding of + an AttributeReference structure when the structure is read for an + unsupported KMIP version. + """ + attribute_reference = objects.AttributeReference() + + args = (self.full_encoding, ) + kwargs = {"kmip_version": enums.KMIPVersion.KMIP_1_4} + self.assertRaisesRegex( + exceptions.VersionNotSupported, + "KMIP 1.4 does not support the AttributeReference object.", + attribute_reference.read, + *args, + **kwargs + ) + + def test_read_missing_vendor_identification(self): + """ + Test that an InvalidKmipEncoding error is raised during the decoding + of an AttributeReference structure when the vendor identification is + missing from the encoding. + """ + attribute_reference = objects.AttributeReference() + + self.assertIsNone(attribute_reference.vendor_identification) + + args = (self.no_vendor_identification_encoding, ) + self.assertRaisesRegex( + exceptions.InvalidKmipEncoding, + "The AttributeReference encoding is missing the vendor " + "identification string.", + attribute_reference.read, + *args + ) + + def test_read_missing_attribute_name(self): + """ + Test that an InvalidKmipEncoding error is raised during the decoding + of an AttributeReference structure when the attribute name is missing + from the encoding. + """ + attribute_reference = objects.AttributeReference() + + self.assertIsNone(attribute_reference.attribute_name) + + args = (self.no_attribute_name_encoding, ) + self.assertRaisesRegex( + exceptions.InvalidKmipEncoding, + "The AttributeReference encoding is missing the attribute name " + "string.", + attribute_reference.read, + *args + ) + + def test_write(self): + """ + Test that an AttributeReference structure can be written to a data + stream. + """ + attribute_reference = objects.AttributeReference( + vendor_identification="Acme Corporation", + attribute_name="Delivery Date" + ) + + buffer = utils.BytearrayStream() + attribute_reference.write(buffer) + + self.assertEqual(len(self.full_encoding), len(buffer)) + self.assertEqual(str(self.full_encoding), str(buffer)) + + def test_write_unsupported_kmip_version(self): + """ + Test that a VersionNotSupported error is raised during the encoding of + an AttributeReference structure when the structure is written for an + unsupported KMIP version. + """ + attribute_reference = objects.AttributeReference( + vendor_identification="Acme Corporation", + attribute_name="Delivery Date" + ) + + args = (utils.BytearrayStream(), ) + kwargs = {"kmip_version": enums.KMIPVersion.KMIP_1_4} + self.assertRaisesRegex( + exceptions.VersionNotSupported, + "KMIP 1.4 does not support the AttributeReference object.", + attribute_reference.write, + *args, + **kwargs + ) + + def test_write_missing_vendor_identification(self): + """ + Test that an InvalidField error is raised during the encoding of an + AttributeReference structure when the structure is missing the vendor + identification field. + """ + attribute_reference = objects.AttributeReference( + attribute_name="Delivery Date" + ) + + args = (utils.BytearrayStream(), ) + self.assertRaisesRegex( + exceptions.InvalidField, + "The AttributeReference is missing the vendor identification " + "field.", + attribute_reference.write, + *args + ) + + def test_write_missing_attribute_name(self): + """ + Test that an InvalidField error is raised during the encoding of an + AttributeReference structure when the structure is missing the + attribute name field. + """ + attribute_reference = objects.AttributeReference( + vendor_identification="Acme Corporation" + ) + + args = (utils.BytearrayStream(), ) + self.assertRaisesRegex( + exceptions.InvalidField, + "The AttributeReference is missing the attribute name field.", + attribute_reference.write, + *args + ) + + def test_repr(self): + """ + Test that repr can be applied to an AttributeReference structure. + """ + attribute_reference = objects.AttributeReference( + vendor_identification="Acme Corporation", + attribute_name="Delivery Date" + ) + v = 'vendor_identification="Acme Corporation"' + a = 'attribute_name="Delivery Date"' + r = "AttributeReference({}, {})".format(v, a) + + self.assertEqual(r, repr(attribute_reference)) + + def test_str(self): + """ + Test that str can be applied to an AttributeReference structure. + """ + attribute_reference = objects.AttributeReference( + vendor_identification="Acme Corporation", + attribute_name="Delivery Date" + ) + v = '"vendor_identification": "Acme Corporation"' + a = '"attribute_name": "Delivery Date"' + r = "{" + "{}, {}".format(v, a) + "}" + + self.assertEqual(r, str(attribute_reference)) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + AttributeReference structures with the same data. + """ + a = objects.AttributeReference() + b = objects.AttributeReference() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = objects.AttributeReference( + vendor_identification="Acme Corporation", + attribute_name="Delivery Date" + ) + b = objects.AttributeReference( + vendor_identification="Acme Corporation", + attribute_name="Delivery Date" + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_vendor_identification(self): + """ + Test that the equality operator returns False when comparing two + AttributeReference structures with different vendor identification + fields. + """ + a = objects.AttributeReference( + vendor_identification="Acme Corporation 1" + ) + b = objects.AttributeReference( + vendor_identification="Acme Corporation 2" + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_attribute_name(self): + """ + Test that the equality operator returns False when comparing two + AttributeReference structures with different attribute name fields. + """ + a = objects.AttributeReference( + attribute_name="Attribute 1" + ) + b = objects.AttributeReference( + attribute_name="Attribute 2" + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + AttributeReference structures with different types. + """ + a = objects.AttributeReference() + b = "invalid" + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + AttributeReference structures with the same data. + """ + a = objects.AttributeReference() + b = objects.AttributeReference() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = objects.AttributeReference( + vendor_identification="Acme Corporation", + attribute_name="Delivery Date" + ) + b = objects.AttributeReference( + vendor_identification="Acme Corporation", + attribute_name="Delivery Date" + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_vendor_identification(self): + """ + Test that the inequality operator returns True when comparing two + AttributeReference structures with different vendor identification + fields. + """ + a = objects.AttributeReference( + vendor_identification="Acme Corporation 1" + ) + b = objects.AttributeReference( + vendor_identification="Acme Corporation 2" + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_attribute_name(self): + """ + Test that the inequality operator returns True when comparing two + AttributeReference structures with different attribute name fields. + """ + a = objects.AttributeReference( + attribute_name="Attribute 1" + ) + b = objects.AttributeReference( + attribute_name="Attribute 2" + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + AttributeReference structures with different types. + """ + a = objects.AttributeReference() + b = "invalid" + + self.assertTrue(a != b) + self.assertTrue(b != a) + + class TestAttributes(TestCase): def setUp(self):