diff --git a/kmip/pie/objects.py b/kmip/pie/objects.py index f91090f..8bbe780 100644 --- a/kmip/pie/objects.py +++ b/kmip/pie/objects.py @@ -18,7 +18,6 @@ import sqlalchemy from sqlalchemy import Column, event, ForeignKey, Integer, String, VARBINARY from sqlalchemy import Boolean from sqlalchemy.ext.associationproxy import association_proxy -from sqlalchemy.orm import relationship import binascii import six @@ -27,6 +26,28 @@ from kmip.core import enums from kmip.pie import sqltypes as sql +app_specific_info_map = sqlalchemy.Table( + "app_specific_info_map", + sql.Base.metadata, + sqlalchemy.Column( + "managed_object_id", + sqlalchemy.Integer, + sqlalchemy.ForeignKey( + "managed_objects.uid", + ondelete="CASCADE" + ) + ), + sqlalchemy.Column( + "app_specific_info_id", + sqlalchemy.Integer, + sqlalchemy.ForeignKey( + "app_specific_info.id", + ondelete="CASCADE" + ) + ) +) + + class ManagedObject(sql.Base): """ The abstract base class of the simplified KMIP object hierarchy. @@ -51,8 +72,11 @@ class ManagedObject(sql.Base): _class_type = Column('class_type', String(50)) value = Column('value', VARBINARY(1024)) name_index = Column(Integer, default=0) - _names = relationship('ManagedObjectName', back_populates='mo', - cascade='all, delete-orphan') + _names = sqlalchemy.orm.relationship( + "ManagedObjectName", + back_populates="mo", + cascade="all, delete-orphan" + ) names = association_proxy('_names', 'name') operation_policy_name = Column( 'operation_policy_name', @@ -62,6 +86,13 @@ class ManagedObject(sql.Base): initial_date = Column(Integer, default=0) _owner = Column('owner', String(50), default=None) + app_specific_info = sqlalchemy.orm.relationship( + "ApplicationSpecificInformation", + secondary=app_specific_info_map, + back_populates="managed_objects", + passive_deletes=True + ) + __mapper_args__ = { 'polymorphic_identity': 'ManagedObject', 'polymorphic_on': _class_type @@ -1713,3 +1744,100 @@ class OpaqueObject(ManagedObject): event.listen(OpaqueObject._names, 'append', sql.attribute_append_factory("name_index"), retval=False) + + +class ApplicationSpecificInformation(sql.Base): + __tablename__ = "app_specific_info" + id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) + _application_namespace = sqlalchemy.Column( + "application_namespace", + sqlalchemy.String + ) + _application_data = sqlalchemy.Column( + "application_data", + sqlalchemy.String + ) + managed_objects = sqlalchemy.orm.relationship( + "ManagedObject", + secondary=app_specific_info_map, + back_populates="app_specific_info" + ) + + def __init__(self, + application_namespace=None, + application_data=None): + """ + Create an ApplicationSpecificInformation attribute. + + Args: + application_namespace (str): A string specifying the application + namespace. Required. + application_data (str): A string specifying the application data. + Required. + """ + super(ApplicationSpecificInformation, self).__init__() + + self.application_namespace = application_namespace + self.application_data = application_data + + @property + def application_namespace(self): + return self._application_namespace + + @application_namespace.setter + def application_namespace(self, value): + if (value is None) or (isinstance(value, six.string_types)): + self._application_namespace = value + else: + raise TypeError("The application namespace must be a string.") + + @property + def application_data(self): + return self._application_data + + @application_data.setter + def application_data(self, value): + if (value is None) or (isinstance(value, six.string_types)): + self._application_data = value + else: + raise TypeError("The application data must be a string.") + + def __repr__(self): + application_namespace = "application_namespace={}".format( + self.application_namespace + ) + application_data = "application_data={}".format(self.application_data) + + return "ApplicationSpecificInformation({})".format( + ", ".join( + [ + application_namespace, + application_data + ] + ) + ) + + def __str__(self): + return str( + { + "application_namespace": self.application_namespace, + "application_data": self.application_data + } + ) + + def __eq__(self, other): + if isinstance(other, ApplicationSpecificInformation): + if self.application_namespace != other.application_namespace: + return False + elif self.application_data != other.application_data: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, ApplicationSpecificInformation): + return not (self == other) + else: + return NotImplemented diff --git a/kmip/tests/unit/pie/objects/test_application_specific_information.py b/kmip/tests/unit/pie/objects/test_application_specific_information.py new file mode 100644 index 0000000..ea7fb97 --- /dev/null +++ b/kmip/tests/unit/pie/objects/test_application_specific_information.py @@ -0,0 +1,259 @@ +# Copyright (c) 2019 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sqlalchemy +import testtools + +from kmip.pie import objects +from kmip.pie import sqltypes + + +class TestApplicationSpecificInformation(testtools.TestCase): + """ + Test suite for ApplicationSpecificInformation. + """ + + def setUp(self): + super(TestApplicationSpecificInformation, self).setUp() + + def tearDown(self): + super(TestApplicationSpecificInformation, self).tearDown() + + def test_init(self): + """ + Test that an ApplicationSpecificInformation object can be instantiated. + """ + app_specific_info = objects.ApplicationSpecificInformation() + + self.assertIsNone(app_specific_info.application_namespace) + self.assertIsNone(app_specific_info.application_data) + + def test_invalid_application_namespace(self): + """ + Test that a TypeError is raised when an invalid application namespace + value is used to construct an ApplicationSpecificInformation attribute. + """ + kwargs = {"application_namespace": []} + self.assertRaisesRegex( + TypeError, + "The application namespace must be a string.", + objects.ApplicationSpecificInformation, + **kwargs + ) + + args = ( + objects.ApplicationSpecificInformation(), + "application_namespace", + [] + ) + self.assertRaisesRegex( + TypeError, + "The application namespace must be a string.", + setattr, + *args + ) + + def test_invalid_application_data(self): + """ + Test that a TypeError is raised when an invalid application data value + is used to construct an ApplicationSpecificInformation attribute. + """ + kwargs = {"application_data": []} + self.assertRaisesRegex( + TypeError, + "The application data must be a string.", + objects.ApplicationSpecificInformation, + **kwargs + ) + + args = ( + objects.ApplicationSpecificInformation(), + "application_data", + [] + ) + self.assertRaisesRegex( + TypeError, + "The application data must be a string.", + setattr, + *args + ) + + def test_repr(self): + """ + Test that repr can be applied to an ApplicationSpecificInformation + attribute. + """ + app_specific_info = objects.ApplicationSpecificInformation( + application_namespace="ssl", + application_data="www.example.com" + ) + + args = [ + "application_namespace={}".format("ssl"), + "application_data={}".format("www.example.com") + ] + + expected = "ApplicationSpecificInformation({})".format(", ".join(args)) + observed = repr(app_specific_info) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to an ApplicationSpecificInformation + attribute. + """ + app_specific_info = objects.ApplicationSpecificInformation( + application_namespace="ssl", + application_data="www.example.com" + ) + + expected = str( + { + "application_namespace": "ssl", + "application_data": "www.example.com" + } + ) + observed = str(app_specific_info) + + self.assertEqual(expected, observed) + + def test_comparison_on_equal(self): + """ + Test that the equality/inequality operators return True/False when + comparing two ApplicationSpecificInformation attributes with the same + data. + """ + a = objects.ApplicationSpecificInformation() + b = objects.ApplicationSpecificInformation() + + self.assertTrue(a == b) + self.assertTrue(b == a) + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = objects.ApplicationSpecificInformation( + application_namespace="ssl", + application_data="www.example.com" + ) + b = objects.ApplicationSpecificInformation( + application_namespace="ssl", + application_data="www.example.com" + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_comparison_on_different_application_namespaces(self): + """ + Test that the equality/inequality operators return False/True when + comparing two ApplicationSpecificInformation attributes with different + application namespaces. + """ + a = objects.ApplicationSpecificInformation( + application_namespace="a" + ) + b = objects.ApplicationSpecificInformation( + application_namespace="b" + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_comparison_on_different_application_data(self): + """ + Test that the equality/inequality operators return False/True when + comparing two ApplicationSpecificInformation attributes with different + application data. + """ + a = objects.ApplicationSpecificInformation( + application_data="a" + ) + b = objects.ApplicationSpecificInformation( + application_data="b" + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_comparison_on_type_mismatch(self): + """ + Test that the equality/inequality operators return False/True when + comparing an ApplicationSpecificInformation attribute to a non + ApplicationSpecificInformation attribute. + """ + a = objects.ApplicationSpecificInformation() + b = "invalid" + + self.assertFalse(a == b) + self.assertFalse(b == a) + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_save(self): + """ + Test that an ApplicationSpecificInformation attribute can be saved + using SQLAlchemy. This test will add an attribute instance to the + database, verify that no exceptions are thrown, and check that its + ID was set. + """ + app_specific_info = objects.ApplicationSpecificInformation( + application_namespace="ssl", + application_data="www.example.com" + ) + + engine = sqlalchemy.create_engine("sqlite:///:memory:", echo=True) + sqltypes.Base.metadata.create_all(engine) + + session = sqlalchemy.orm.sessionmaker(bind=engine)() + session.add(app_specific_info) + session.commit() + + self.assertIsNotNone(app_specific_info.id) + + def test_get(self): + """ + Test that an ApplicationSpecificInformation attribute can be saved + and then retrieved using SQLAlchemy. This test adds the attribute to + the database and then retrieves it by ID and verifies its values. + """ + app_specific_info = objects.ApplicationSpecificInformation( + application_namespace="ssl", + application_data="www.example.com" + ) + + engine = sqlalchemy.create_engine("sqlite:///:memory:", echo=True) + sqltypes.Base.metadata.create_all(engine) + + session = sqlalchemy.orm.sessionmaker(bind=engine)() + session.add(app_specific_info) + session.commit() + + session = sqlalchemy.orm.sessionmaker(bind=engine)() + retrieved_info = session.query( + objects.ApplicationSpecificInformation + ).filter( + objects.ApplicationSpecificInformation.id == app_specific_info.id + ).one() + session.commit() + + self.assertEqual("ssl", retrieved_info.application_namespace) + self.assertEqual("www.example.com", retrieved_info.application_data)