diff --git a/kmip/pie/objects.py b/kmip/pie/objects.py index b6ff775..ff4f0e4 100644 --- a/kmip/pie/objects.py +++ b/kmip/pie/objects.py @@ -833,6 +833,15 @@ class SecretData(CryptographicObject): data_type: The type of the secret value. """ + __tablename__ = 'secret_data_objects' + unique_identifier = Column('uid', Integer, + ForeignKey('crypto_objects.uid'), + primary_key=True) + data_type = Column('data_type', sql.EnumType(enums.SecretDataType)) + __mapper_args__ = { + 'polymorphic_identity': enums.ObjectType.SECRET_DATA + } + def __init__(self, value, data_type, masks=None, name='Secret Data'): """ Create a SecretData object. @@ -921,6 +930,10 @@ class SecretData(CryptographicObject): return NotImplemented +event.listen(SecretData._names, 'append', + sql.attribute_append_factory("name_index"), retval=False) + + class OpaqueObject(ManagedObject): """ The OpaqueObject class of the simplified KMIP object hierarchy. diff --git a/kmip/tests/unit/pie/objects/test_secret_data.py b/kmip/tests/unit/pie/objects/test_secret_data.py index 335a26c..5a12132 100644 --- a/kmip/tests/unit/pie/objects/test_secret_data.py +++ b/kmip/tests/unit/pie/objects/test_secret_data.py @@ -17,7 +17,10 @@ import binascii import testtools from kmip.core import enums -from kmip.pie import objects +from kmip.pie.objects import ManagedObject, SecretData +from kmip.pie import sqltypes +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker class TestSecretData(testtools.TestCase): @@ -33,6 +36,8 @@ class TestSecretData(testtools.TestCase): b'\x53\x65\x63\x72\x65\x74\x50\x61\x73\x73\x77\x6F\x72\x64') self.bytes_b = ( b'\x53\x65\x63\x72\x65\x74\x50\x61\x73\x73\x77\x6F\x72\x65') + self.engine = create_engine('sqlite:///:memory:', echo=True) + sqltypes.Base.metadata.create_all(self.engine) def tearDown(self): super(TestSecretData, self).tearDown() @@ -41,7 +46,7 @@ class TestSecretData(testtools.TestCase): """ Test that a SecretData object can be instantiated. """ - secret = objects.SecretData( + secret = SecretData( self.bytes_a, enums.SecretDataType.PASSWORD) self.assertEqual(secret.value, self.bytes_a) @@ -53,7 +58,7 @@ class TestSecretData(testtools.TestCase): """ Test that a SecretData object can be instantiated with all arguments. """ - key = objects.SecretData( + key = SecretData( self.bytes_a, enums.SecretDataType.PASSWORD, masks=[enums.CryptographicUsageMask.VERIFY], @@ -70,7 +75,7 @@ class TestSecretData(testtools.TestCase): Test that the object type can be retrieved from the SecretData. """ expected = enums.ObjectType.SECRET_DATA - key = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) + key = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) observed = key.object_type self.assertEqual(expected, observed) @@ -80,7 +85,7 @@ class TestSecretData(testtools.TestCase): construct a SecretData. """ args = (0, enums.SecretDataType.PASSWORD) - self.assertRaises(TypeError, objects.SecretData, *args) + self.assertRaises(TypeError, SecretData, *args) def test_validate_on_invalid_data_type(self): """ @@ -88,7 +93,7 @@ class TestSecretData(testtools.TestCase): construct a SecretData. """ args = (self.bytes_a, 'invalid') - self.assertRaises(TypeError, objects.SecretData, *args) + self.assertRaises(TypeError, SecretData, *args) def test_validate_on_invalid_masks(self): """ @@ -97,7 +102,7 @@ class TestSecretData(testtools.TestCase): """ args = (self.bytes_a, enums.SecretDataType.PASSWORD) kwargs = {'masks': 'invalid'} - self.assertRaises(TypeError, objects.SecretData, *args, **kwargs) + self.assertRaises(TypeError, SecretData, *args, **kwargs) def test_validate_on_invalid_mask(self): """ @@ -106,7 +111,7 @@ class TestSecretData(testtools.TestCase): """ args = (self.bytes_a, enums.SecretDataType.PASSWORD) kwargs = {'masks': ['invalid']} - self.assertRaises(TypeError, objects.SecretData, *args, **kwargs) + self.assertRaises(TypeError, SecretData, *args, **kwargs) def test_validate_on_invalid_name(self): """ @@ -115,13 +120,13 @@ class TestSecretData(testtools.TestCase): """ args = (self.bytes_a, enums.SecretDataType.PASSWORD) kwargs = {'name': 0} - self.assertRaises(TypeError, objects.SecretData, *args, **kwargs) + self.assertRaises(TypeError, SecretData, *args, **kwargs) def test_repr(self): """ Test that repr can be applied to a SecretData. """ - key = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) + key = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) args = "value={0}, data_type={1}".format( binascii.hexlify(self.bytes_a), enums.SecretDataType.PASSWORD) expected = "SecretData({0})".format(args) @@ -132,7 +137,7 @@ class TestSecretData(testtools.TestCase): """ Test that str can be applied to a SecretData. """ - key = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) + key = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) expected = str(binascii.hexlify(self.bytes_a)) observed = str(key) self.assertEqual(expected, observed) @@ -142,8 +147,8 @@ class TestSecretData(testtools.TestCase): Test that the equality operator returns True when comparing two SecretData objects with the same data. """ - a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) - b = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) + a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) + b = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) self.assertTrue(a == b) self.assertTrue(b == a) @@ -152,8 +157,8 @@ class TestSecretData(testtools.TestCase): Test that the equality operator returns False when comparing two SecretData objects with different data. """ - a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) - b = objects.SecretData(self.bytes_b, enums.SecretDataType.PASSWORD) + a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) + b = SecretData(self.bytes_b, enums.SecretDataType.PASSWORD) self.assertFalse(a == b) self.assertFalse(b == a) @@ -162,8 +167,8 @@ class TestSecretData(testtools.TestCase): Test that the equality operator returns False when comparing two SecretData objects with different data. """ - a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) - b = objects.SecretData(self.bytes_a, enums.SecretDataType.SEED) + a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) + b = SecretData(self.bytes_a, enums.SecretDataType.SEED) self.assertFalse(a == b) self.assertFalse(b == a) @@ -172,7 +177,7 @@ class TestSecretData(testtools.TestCase): Test that the equality operator returns False when comparing a SecretData object to a non-SecretData object. """ - a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) + a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) b = "invalid" self.assertFalse(a == b) self.assertFalse(b == a) @@ -182,8 +187,8 @@ class TestSecretData(testtools.TestCase): Test that the inequality operator returns False when comparing two SecretData objects with the same internal data. """ - a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) - b = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) + a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) + b = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) self.assertFalse(a != b) self.assertFalse(b != a) @@ -192,8 +197,8 @@ class TestSecretData(testtools.TestCase): Test that the equality operator returns True when comparing two SecretData objects with different data. """ - a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) - b = objects.SecretData(self.bytes_b, enums.SecretDataType.PASSWORD) + a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) + b = SecretData(self.bytes_b, enums.SecretDataType.PASSWORD) self.assertTrue(a != b) self.assertTrue(b != a) @@ -202,8 +207,8 @@ class TestSecretData(testtools.TestCase): Test that the equality operator returns True when comparing two SecretData objects with different data. """ - a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) - b = objects.SecretData(self.bytes_a, enums.SecretDataType.SEED) + a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) + b = SecretData(self.bytes_a, enums.SecretDataType.SEED) self.assertTrue(a != b) self.assertTrue(b != a) @@ -212,7 +217,277 @@ class TestSecretData(testtools.TestCase): Test that the equality operator returns True when comparing a SecretData object to a non-SecretData object. """ - a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) + a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) b = "invalid" self.assertTrue(a != b) self.assertTrue(b != a) + + def test_save(self): + """ + Test that the object can be saved using SQLAlchemy. This will add it to + the database, verify that no exceptions are thrown, and check that its + unique identifier was set. + """ + obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD) + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(obj) + session.commit() + + def test_get(self): + """ + Test that the object can be saved and then retrieved using SQLAlchemy. + This adds is to the database and then retrieves it by ID and verifies + some of the attributes. + """ + test_name = 'bowser' + obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD, + name=test_name) + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(obj) + session.commit() + + session = Session() + get_obj = session.query(SecretData).filter( + ManagedObject.unique_identifier == obj.unique_identifier + ).one() + session.commit() + self.assertEqual(1, len(get_obj.names)) + self.assertEqual([test_name], get_obj.names) + self.assertEqual(self.bytes_a, get_obj.value) + self.assertEqual(enums.ObjectType.SECRET_DATA, get_obj.object_type) + self.assertEqual(enums.SecretDataType.PASSWORD, get_obj.data_type) + + def test_add_multiple_names(self): + """ + Test that multiple names can be added to a managed object. This + verifies a few properties. First this verifies that names can be added + using simple strings. It also verifies that the index for each + subsequent string is set accordingly. Finally this tests that the names + can be saved and retrieved from the database. + """ + expected_names = ['bowser', 'frumpy', 'big fat cat'] + obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD, + name=expected_names[0]) + obj.names.append(expected_names[1]) + obj.names.append(expected_names[2]) + self.assertEquals(3, obj.name_index) + expected_mo_names = list() + for i, name in enumerate(expected_names): + expected_mo_names.append(sqltypes.ManagedObjectName(name, i)) + self.assertEquals(expected_mo_names, obj._names) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(obj) + session.commit() + + session = Session() + get_obj = session.query(SecretData).filter( + ManagedObject.unique_identifier == obj.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_mo_names, get_obj._names) + + def test_remove_name(self): + """ + Tests that a name can be removed from the list of names. This will + verify that the list of names is correct. It will verify that updating + this object removes the name from the database. + """ + names = ['bowser', 'frumpy', 'big fat cat'] + remove_index = 1 + obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD, + name=names[0]) + obj.names.append(names[1]) + obj.names.append(names[2]) + obj.names.pop(remove_index) + self.assertEquals(3, obj.name_index) + + expected_names = list() + expected_mo_names = list() + for i, name in enumerate(names): + if i != remove_index: + expected_names.append(name) + expected_mo_names.append(sqltypes.ManagedObjectName(name, i)) + self.assertEquals(expected_names, obj.names) + self.assertEquals(expected_mo_names, obj._names) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(obj) + session.commit() + + session = Session() + get_obj = session.query(SecretData).filter( + ManagedObject.unique_identifier == obj.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names) + + def test_remove_and_add_name(self): + """ + Tests that names can be removed from the list of names and more added. + This will verify that the list of names is correct. It will verify that + updating this object removes the name from the database. It will verify + that the indices for the removed names are not reused. + """ + names = ['bowser', 'frumpy', 'big fat cat'] + obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD, + name=names[0]) + obj.names.append(names[1]) + obj.names.append(names[2]) + obj.names.pop() + obj.names.pop() + obj.names.append('dog') + self.assertEquals(4, obj.name_index) + + expected_names = ['bowser', 'dog'] + expected_mo_names = list() + expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0], + 0)) + expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1], + 3)) + self.assertEquals(expected_names, obj.names) + self.assertEquals(expected_mo_names, obj._names) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(obj) + session.commit() + + session = Session() + get_obj = session.query(SecretData).filter( + ManagedObject.unique_identifier == obj.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names) + + def test_update_with_add_name(self): + """ + Tests that a SecretData already stored in the database can be + updated. This will store a SecretData in the database. It will add a + name to it in one session, and then retrieve it in another session to + verify that it has all of the correct names. + + This test and the subsequent test_udpate_* methods are different than + the name tests above because these are updating objects already stored + in the database. This tests will simulate what happens when the KMIP + client calls an add attribute method. + """ + first_name = 'bowser' + obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD, + name=first_name) + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(obj) + session.commit() + + added_name = 'frumpy' + expected_names = [first_name, added_name] + expected_mo_names = list() + for i, name in enumerate(expected_names): + expected_mo_names.append(sqltypes.ManagedObjectName(name, i)) + + session = Session() + update_obj = session.query(SecretData).filter( + ManagedObject.unique_identifier == obj.unique_identifier + ).one() + update_obj.names.append(added_name) + session.commit() + + session = Session() + get_obj = session.query(SecretData).filter( + ManagedObject.unique_identifier == obj.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names) + + def test_update_with_remove_name(self): + """ + Tests that a SecretData already stored in the database can be + updated. This will store a SecretData in the database. It will + remove a name from it in one session, and then retrieve it in another + session to verify that it has all of the correct names. + """ + names = ['bowser', 'frumpy', 'big fat cat'] + remove_index = 1 + obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD, + name=names[0]) + obj.names.append(names[1]) + obj.names.append(names[2]) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(obj) + session.commit() + + expected_names = list() + expected_mo_names = list() + for i, name in enumerate(names): + if i != remove_index: + expected_names.append(name) + expected_mo_names.append(sqltypes.ManagedObjectName(name, i)) + + session = Session() + update_obj = session.query(SecretData).filter( + ManagedObject.unique_identifier == obj.unique_identifier + ).one() + update_obj.names.pop(remove_index) + session.commit() + + session = Session() + get_obj = session.query(SecretData).filter( + ManagedObject.unique_identifier == obj.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names) + + def test_update_with_remove_and_add_name(self): + """ + Tests that a SecretData already stored in the database can be + updated. This will store a SecretData in the database. It will + remove a name and add another one to it in one session, and then + retrieve it in another session to verify that it has all of the correct + names. This simulates multiple operation being sent for the same + object. + """ + names = ['bowser', 'frumpy', 'big fat cat'] + obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD, + name=names[0]) + obj.names.append(names[1]) + obj.names.append(names[2]) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(obj) + session.commit() + + session = Session() + update_obj = session.query(SecretData).filter( + ManagedObject.unique_identifier == obj.unique_identifier + ).one() + update_obj.names.pop() + update_obj.names.pop() + update_obj.names.append('dog') + session.commit() + + expected_names = ['bowser', 'dog'] + expected_mo_names = list() + expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0], + 0)) + expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1], + 3)) + + session = Session() + get_obj = session.query(SecretData).filter( + ManagedObject.unique_identifier == obj.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names)