diff --git a/kmip/pie/objects.py b/kmip/pie/objects.py index 7567b82..b6ff775 100644 --- a/kmip/pie/objects.py +++ b/kmip/pie/objects.py @@ -388,6 +388,15 @@ class PublicKey(Key): names: The list of string names of the PublicKey. """ + __tablename__ = 'public_keys' + unique_identifier = Column('uid', Integer, + ForeignKey('keys.uid'), + primary_key=True) + + __mapper_args__ = { + 'polymorphic_identity': enums.ObjectType.PUBLIC_KEY + } + def __init__(self, algorithm, length, value, format_type=enums.KeyFormatType.X_509, masks=None, name='Public Key'): @@ -507,6 +516,10 @@ class PublicKey(Key): return NotImplemented +event.listen(PublicKey._names, 'append', + sql.attribute_append_factory("name_index"), retval=False) + + class PrivateKey(Key): """ The PrivateKey class of the simplified KMIP object hierarchy. @@ -526,6 +539,15 @@ class PrivateKey(Key): to 'Private Key'. """ + __tablename__ = 'private_keys' + unique_identifier = Column('uid', Integer, + ForeignKey('keys.uid'), + primary_key=True) + + __mapper_args__ = { + 'polymorphic_identity': enums.ObjectType.PRIVATE_KEY + } + def __init__(self, algorithm, length, value, format_type, masks=None, name='Private Key'): """ @@ -643,6 +665,10 @@ class PrivateKey(Key): return NotImplemented +event.listen(PrivateKey._names, 'append', + sql.attribute_append_factory("name_index"), retval=False) + + class Certificate(CryptographicObject): """ The Certificate class of the simplified KMIP object hierarchy. diff --git a/kmip/tests/unit/pie/objects/test_private_key.py b/kmip/tests/unit/pie/objects/test_private_key.py index d94acfb..63848d0 100644 --- a/kmip/tests/unit/pie/objects/test_private_key.py +++ b/kmip/tests/unit/pie/objects/test_private_key.py @@ -31,7 +31,10 @@ import binascii import testtools from kmip.core import enums -from kmip.pie import objects +from kmip.pie import sqltypes +from kmip.pie.objects import ManagedObject, PrivateKey +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker class TestPrivateKey(testtools.TestCase): @@ -160,6 +163,8 @@ class TestPrivateKey(testtools.TestCase): b'\x17\xE1\xF0\xC9\xB2\x3A\xFF\xA4\xD4\x96\x61\x8D\xBC\x02\x49\x86' b'\xED\x69\x0B\xBB\x7B\x02\x57\x68\xFF\x9D\xF8\xAC\x15\x41\x6F\x48' b'\x9F\x81\x29\xC3\x23\x41\xA8\xB4\x4F') + self.engine = create_engine('sqlite:///:memory:', echo=True) + sqltypes.Base.metadata.create_all(self.engine) def tearDown(self): super(TestPrivateKey, self).tearDown() @@ -168,7 +173,7 @@ class TestPrivateKey(testtools.TestCase): """ Test that a PrivateKey object can be instantiated. """ - key = objects.PrivateKey( + key = PrivateKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8) @@ -184,7 +189,7 @@ class TestPrivateKey(testtools.TestCase): """ Test that a PrivateKey object can be instantiated with all arguments. """ - key = objects.PrivateKey( + key = PrivateKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, @@ -208,7 +213,7 @@ class TestPrivateKey(testtools.TestCase): Test that the object type can be retrieved from the PrivateKey. """ expected = enums.ObjectType.PRIVATE_KEY - key = objects.PrivateKey( + key = PrivateKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8) observed = key.object_type @@ -220,7 +225,7 @@ class TestPrivateKey(testtools.TestCase): used to construct a PrivateKey. """ args = ('invalid', 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8) - self.assertRaises(TypeError, objects.PrivateKey, *args) + self.assertRaises(TypeError, PrivateKey, *args) def test_validate_on_invalid_length(self): """ @@ -229,7 +234,7 @@ class TestPrivateKey(testtools.TestCase): """ args = (enums.CryptographicAlgorithm.RSA, 'invalid', self.bytes_1024, enums.KeyFormatType.PKCS_8) - self.assertRaises(TypeError, objects.PrivateKey, *args) + self.assertRaises(TypeError, PrivateKey, *args) def test_validate_on_invalid_value(self): """ @@ -238,7 +243,7 @@ class TestPrivateKey(testtools.TestCase): """ args = (enums.CryptographicAlgorithm.RSA, 1024, 0, enums.KeyFormatType.PKCS_8) - self.assertRaises(TypeError, objects.PrivateKey, *args) + self.assertRaises(TypeError, PrivateKey, *args) def test_validate_on_invalid_format_type(self): """ @@ -247,7 +252,7 @@ class TestPrivateKey(testtools.TestCase): """ args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, 'invalid') - self.assertRaises(TypeError, objects.PrivateKey, *args) + self.assertRaises(TypeError, PrivateKey, *args) def test_validate_on_invalid_format_type_value(self): """ @@ -256,7 +261,7 @@ class TestPrivateKey(testtools.TestCase): """ args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.OPAQUE) - self.assertRaises(ValueError, objects.PrivateKey, *args) + self.assertRaises(ValueError, PrivateKey, *args) def test_validate_on_invalid_masks(self): """ @@ -266,7 +271,7 @@ class TestPrivateKey(testtools.TestCase): args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8) kwargs = {'masks': 'invalid'} - self.assertRaises(TypeError, objects.PrivateKey, *args, **kwargs) + self.assertRaises(TypeError, PrivateKey, *args, **kwargs) def test_validate_on_invalid_mask(self): """ @@ -276,7 +281,7 @@ class TestPrivateKey(testtools.TestCase): args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8) kwargs = {'masks': ['invalid']} - self.assertRaises(TypeError, objects.PrivateKey, *args, **kwargs) + self.assertRaises(TypeError, PrivateKey, *args, **kwargs) def test_validate_on_invalid_name(self): """ @@ -286,13 +291,13 @@ class TestPrivateKey(testtools.TestCase): args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8) kwargs = {'name': 0} - self.assertRaises(TypeError, objects.PrivateKey, *args, **kwargs) + self.assertRaises(TypeError, PrivateKey, *args, **kwargs) def test_repr(self): """ Test that repr can be applied to a PrivateKey. """ - key = objects.PrivateKey( + key = PrivateKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8) args = "algorithm={0}, length={1}, value={2}, format_type={3}".format( @@ -306,7 +311,7 @@ class TestPrivateKey(testtools.TestCase): """ Test that str can be applied to a PrivateKey. """ - key = objects.PrivateKey( + key = PrivateKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8) expected = str(binascii.hexlify(self.bytes_1024)) @@ -318,10 +323,10 @@ class TestPrivateKey(testtools.TestCase): Test that the equality operator returns True when comparing two PrivateKey objects with the same data. """ - a = objects.PrivateKey( + a = PrivateKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8) - b = objects.PrivateKey( + b = PrivateKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8) self.assertTrue(a == b) @@ -332,10 +337,10 @@ class TestPrivateKey(testtools.TestCase): Test that the equality operator returns False when comparing two PrivateKey objects with different data. """ - a = objects.PrivateKey( + a = PrivateKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8) - b = objects.PrivateKey( + b = PrivateKey( enums.CryptographicAlgorithm.AES, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8) self.assertFalse(a == b) @@ -346,10 +351,10 @@ class TestPrivateKey(testtools.TestCase): Test that the equality operator returns False when comparing two PrivateKey objects with different data. """ - a = objects.PrivateKey( + a = PrivateKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8) - b = objects.PrivateKey( + b = PrivateKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_1024, enums.KeyFormatType.PKCS_8) self.assertFalse(a == b) @@ -360,10 +365,10 @@ class TestPrivateKey(testtools.TestCase): Test that the equality operator returns False when comparing two PrivateKey objects with different data. """ - a = objects.PrivateKey( + a = PrivateKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8) - b = objects.PrivateKey( + b = PrivateKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_2048, enums.KeyFormatType.PKCS_8) self.assertFalse(a == b) @@ -374,10 +379,10 @@ class TestPrivateKey(testtools.TestCase): Test that the equality operator returns False when comparing two PrivateKey objects with different data. """ - a = objects.PrivateKey( + a = PrivateKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8) - b = objects.PrivateKey( + b = PrivateKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_1) self.assertFalse(a == b) @@ -388,7 +393,7 @@ class TestPrivateKey(testtools.TestCase): Test that the equality operator returns False when comparing a PrivateKey object to a non-PrivateKey object. """ - a = objects.PrivateKey( + a = PrivateKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8) b = "invalid" @@ -400,10 +405,10 @@ class TestPrivateKey(testtools.TestCase): Test that the inequality operator returns False when comparing two PrivateKey objects with the same internal data. """ - a = objects.PrivateKey( + a = PrivateKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, enums.KeyFormatType.PKCS_1) - b = objects.PrivateKey( + b = PrivateKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, enums.KeyFormatType.PKCS_1) self.assertFalse(a != b) @@ -414,10 +419,10 @@ class TestPrivateKey(testtools.TestCase): Test that the equality operator returns True when comparing two PrivateKey objects with different data. """ - a = objects.PrivateKey( + a = PrivateKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, enums.KeyFormatType.PKCS_1) - b = objects.PrivateKey( + b = PrivateKey( enums.CryptographicAlgorithm.AES, 2048, self.bytes_2048, enums.KeyFormatType.PKCS_1) self.assertTrue(a != b) @@ -428,10 +433,10 @@ class TestPrivateKey(testtools.TestCase): Test that the equality operator returns True when comparing two PrivateKey objects with different data. """ - a = objects.PrivateKey( + a = PrivateKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, enums.KeyFormatType.PKCS_8) - b = objects.PrivateKey( + b = PrivateKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8) self.assertTrue(a != b) @@ -442,10 +447,10 @@ class TestPrivateKey(testtools.TestCase): Test that the equality operator returns True when comparing two PrivateKey objects with different data. """ - a = objects.PrivateKey( + a = PrivateKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, enums.KeyFormatType.PKCS_8) - b = objects.PrivateKey( + b = PrivateKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_1024, enums.KeyFormatType.PKCS_8) self.assertTrue(a != b) @@ -456,10 +461,10 @@ class TestPrivateKey(testtools.TestCase): Test that the equality operator returns True when comparing two PrivateKey objects with different data. """ - a = objects.PrivateKey( + a = PrivateKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, enums.KeyFormatType.PKCS_8) - b = objects.PrivateKey( + b = PrivateKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, enums.KeyFormatType.PKCS_1) self.assertTrue(a != b) @@ -470,9 +475,295 @@ class TestPrivateKey(testtools.TestCase): Test that the equality operator returns True when comparing a PrivateKey object to a non-PrivateKey object. """ - a = objects.PrivateKey( + a = PrivateKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, enums.KeyFormatType.PKCS_1) b = "invalid" self.assertTrue(a != b) self.assertTrue(b != a) + + def test_save(self): + """ + Test that the object can be saved using SQLAlchemy. This will add it to + the database, verify that no exceptions are thrown, and check that its + unique identifier was set. + """ + key = PrivateKey( + enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, + enums.KeyFormatType.PKCS_1) + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + self.assertIsNotNone(key.unique_identifier) + + def test_get(self): + """ + Test that the object can be saved and then retrieved using SQLAlchemy. + This adds is to the database and then retrieves it by ID and verifies + some of the attributes. + """ + test_name = 'bowser' + masks = [enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.WRAP_KEY] + key = PrivateKey( + enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, + enums.KeyFormatType.PKCS_1, masks=masks, name=test_name) + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + session = Session() + get_obj = session.query(PrivateKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEqual(1, len(get_obj.names)) + self.assertEqual([test_name], get_obj.names) + self.assertEqual(enums.ObjectType.PRIVATE_KEY, get_obj.object_type) + self.assertEqual(self.bytes_2048, get_obj.value) + self.assertEqual(enums.CryptographicAlgorithm.RSA, + get_obj.cryptographic_algorithm) + self.assertEqual(2048, get_obj.cryptographic_length) + self.assertEqual(enums.KeyFormatType.PKCS_1, get_obj.key_format_type) + self.assertEqual(masks, get_obj.cryptographic_usage_masks) + + def test_add_multiple_names(self): + """ + Test that multiple names can be added to a managed object. This + verifies a few properties. First this verifies that names can be added + using simple strings. It also verifies that the index for each + subsequent string is set accordingly. Finally this tests that the names + can be saved and retrieved from the database. + """ + expected_names = ['bowser', 'frumpy', 'big fat cat'] + key = PrivateKey( + enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, + enums.KeyFormatType.PKCS_1, name=expected_names[0]) + key.names.append(expected_names[1]) + key.names.append(expected_names[2]) + self.assertEquals(3, key.name_index) + expected_mo_names = list() + for i, name in enumerate(expected_names): + expected_mo_names.append(sqltypes.ManagedObjectName(name, i)) + self.assertEquals(expected_mo_names, key._names) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + session = Session() + get_obj = session.query(PrivateKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_mo_names, get_obj._names) + + def test_remove_name(self): + """ + Tests that a name can be removed from the list of names. This will + verify that the list of names is correct. It will verify that updating + this object removes the name from the database. + """ + names = ['bowser', 'frumpy', 'big fat cat'] + remove_index = 1 + key = PrivateKey( + enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, + enums.KeyFormatType.PKCS_1, name=names[0]) + key.names.append(names[1]) + key.names.append(names[2]) + key.names.pop(remove_index) + self.assertEquals(3, key.name_index) + + expected_names = list() + expected_mo_names = list() + for i, name in enumerate(names): + if i != remove_index: + expected_names.append(name) + expected_mo_names.append(sqltypes.ManagedObjectName(name, i)) + self.assertEquals(expected_names, key.names) + self.assertEquals(expected_mo_names, key._names) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + session = Session() + get_obj = session.query(PrivateKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names) + + def test_remove_and_add_name(self): + """ + Tests that names can be removed from the list of names and more added. + This will verify that the list of names is correct. It will verify that + updating this object removes the name from the database. It will verify + that the indices for the removed names are not reused. + """ + names = ['bowser', 'frumpy', 'big fat cat'] + key = PrivateKey( + enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, + enums.KeyFormatType.PKCS_1, name=names[0]) + key.names.append(names[1]) + key.names.append(names[2]) + key.names.pop() + key.names.pop() + key.names.append('dog') + self.assertEquals(4, key.name_index) + + expected_names = ['bowser', 'dog'] + expected_mo_names = list() + expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0], + 0)) + expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1], + 3)) + self.assertEquals(expected_names, key.names) + self.assertEquals(expected_mo_names, key._names) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + session = Session() + get_obj = session.query(PrivateKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names) + + def test_update_with_add_name(self): + """ + Tests that an OpaqueObject already stored in the database can be + updated. This will store an OpaqueObject in the database. It will add a + name to it in one session, and then retrieve it in another session to + verify that it has all of the correct names. + + This test and the subsequent test_udpate_* methods are different than + the name tests above because these are updating objects already stored + in the database. This tests will simulate what happens when the KMIP + client calls an add attribute method. + """ + first_name = 'bowser' + key = PrivateKey( + enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, + enums.KeyFormatType.PKCS_1, name=first_name) + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + added_name = 'frumpy' + expected_names = [first_name, added_name] + expected_mo_names = list() + for i, name in enumerate(expected_names): + expected_mo_names.append(sqltypes.ManagedObjectName(name, i)) + + session = Session() + update_key = session.query(PrivateKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + update_key.names.append(added_name) + session.commit() + + session = Session() + get_obj = session.query(PrivateKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names) + + def test_update_with_remove_name(self): + """ + Tests that an OpaqueObject already stored in the database can be + updated. This will store an OpaqueObject in the database. It will + remove a name from it in one session, and then retrieve it in another + session to verify that it has all of the correct names. + """ + names = ['bowser', 'frumpy', 'big fat cat'] + remove_index = 1 + key = PrivateKey( + enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, + enums.KeyFormatType.PKCS_1, name=names[0]) + key.names.append(names[1]) + key.names.append(names[2]) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + expected_names = list() + expected_mo_names = list() + for i, name in enumerate(names): + if i != remove_index: + expected_names.append(name) + expected_mo_names.append(sqltypes.ManagedObjectName(name, i)) + + session = Session() + update_key = session.query(PrivateKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + update_key.names.pop(remove_index) + session.commit() + + session = Session() + get_obj = session.query(PrivateKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names) + + def test_update_with_remove_and_add_name(self): + """ + Tests that an OpaqueObject already stored in the database can be + updated. This will store an OpaqueObject in the database. It will + remove a name and add another one to it in one session, and then + retrieve it in another session to verify that it has all of the correct + names. This simulates multiple operation being sent for the same + object. + """ + names = ['bowser', 'frumpy', 'big fat cat'] + key = PrivateKey( + enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, + enums.KeyFormatType.PKCS_1, name=names[0]) + key.names.append(names[1]) + key.names.append(names[2]) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + session = Session() + update_key = session.query(PrivateKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + update_key.names.pop() + update_key.names.pop() + update_key.names.append('dog') + session.commit() + + expected_names = ['bowser', 'dog'] + expected_mo_names = list() + expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0], + 0)) + expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1], + 3)) + + session = Session() + get_obj = session.query(PrivateKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names) diff --git a/kmip/tests/unit/pie/objects/test_public_key.py b/kmip/tests/unit/pie/objects/test_public_key.py index ee1e119..d269ea6 100644 --- a/kmip/tests/unit/pie/objects/test_public_key.py +++ b/kmip/tests/unit/pie/objects/test_public_key.py @@ -17,7 +17,10 @@ import binascii import testtools from kmip.core import enums -from kmip.pie import objects +from kmip.pie import sqltypes +from kmip.pie.objects import ManagedObject, PublicKey +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker class TestPublicKey(testtools.TestCase): @@ -59,6 +62,8 @@ class TestPublicKey(testtools.TestCase): b'\x94\x6A\x9A\xC9\x9B\x1C\x28\x15\xC3\x61\x2A\x29\xA8\x2D\x73\xA1' b'\xF9\x93\x74\xFE\x30\xE5\x49\x51\x66\x2A\x6E\xDA\x29\xC6\xFC\x41' b'\x13\x35\xD5\xDC\x74\x26\xB0\xF6\x05\x02\x03\x01\x00\x01') + self.engine = create_engine('sqlite:///:memory:', echo=True) + sqltypes.Base.metadata.create_all(self.engine) def tearDown(self): super(TestPublicKey, self).tearDown() @@ -67,7 +72,7 @@ class TestPublicKey(testtools.TestCase): """ Test that a PublicKey object can be instantiated. """ - key = objects.PublicKey( + key = PublicKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024) self.assertEqual( @@ -82,7 +87,7 @@ class TestPublicKey(testtools.TestCase): """ Test that a PublicKey object can be instantiated with all arguments. """ - key = objects.PublicKey( + key = PublicKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, @@ -106,7 +111,7 @@ class TestPublicKey(testtools.TestCase): Test that the object type can be retrieved from the PublicKey. """ expected = enums.ObjectType.PUBLIC_KEY - key = objects.PublicKey( + key = PublicKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.X_509) observed = key.object_type @@ -118,7 +123,7 @@ class TestPublicKey(testtools.TestCase): used to construct a PublicKey. """ args = ('invalid', 1024, self.bytes_1024, enums.KeyFormatType.X_509) - self.assertRaises(TypeError, objects.PublicKey, *args) + self.assertRaises(TypeError, PublicKey, *args) def test_validate_on_invalid_length(self): """ @@ -127,7 +132,7 @@ class TestPublicKey(testtools.TestCase): """ args = (enums.CryptographicAlgorithm.RSA, 'invalid', self.bytes_1024, enums.KeyFormatType.X_509) - self.assertRaises(TypeError, objects.PublicKey, *args) + self.assertRaises(TypeError, PublicKey, *args) def test_validate_on_invalid_value(self): """ @@ -136,7 +141,7 @@ class TestPublicKey(testtools.TestCase): """ args = (enums.CryptographicAlgorithm.RSA, 1024, 0, enums.KeyFormatType.X_509) - self.assertRaises(TypeError, objects.PublicKey, *args) + self.assertRaises(TypeError, PublicKey, *args) def test_validate_on_invalid_format_type(self): """ @@ -145,7 +150,7 @@ class TestPublicKey(testtools.TestCase): """ args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, 'invalid') - self.assertRaises(TypeError, objects.PublicKey, *args) + self.assertRaises(TypeError, PublicKey, *args) def test_validate_on_invalid_format_type_value(self): """ @@ -154,7 +159,7 @@ class TestPublicKey(testtools.TestCase): """ args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.OPAQUE) - self.assertRaises(ValueError, objects.PublicKey, *args) + self.assertRaises(ValueError, PublicKey, *args) def test_validate_on_invalid_masks(self): """ @@ -164,7 +169,7 @@ class TestPublicKey(testtools.TestCase): args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.X_509) kwargs = {'masks': 'invalid'} - self.assertRaises(TypeError, objects.PublicKey, *args, **kwargs) + self.assertRaises(TypeError, PublicKey, *args, **kwargs) def test_validate_on_invalid_mask(self): """ @@ -174,7 +179,7 @@ class TestPublicKey(testtools.TestCase): args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.X_509) kwargs = {'masks': ['invalid']} - self.assertRaises(TypeError, objects.PublicKey, *args, **kwargs) + self.assertRaises(TypeError, PublicKey, *args, **kwargs) def test_validate_on_invalid_name(self): """ @@ -184,13 +189,13 @@ class TestPublicKey(testtools.TestCase): args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.X_509) kwargs = {'name': 0} - self.assertRaises(TypeError, objects.PublicKey, *args, **kwargs) + self.assertRaises(TypeError, PublicKey, *args, **kwargs) def test_repr(self): """ Test that repr can be applied to a PublicKey. """ - key = objects.PublicKey( + key = PublicKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.X_509) args = "algorithm={0}, length={1}, value={2}, format_type={3}".format( @@ -204,7 +209,7 @@ class TestPublicKey(testtools.TestCase): """ Test that str can be applied to a PublicKey. """ - key = objects.PublicKey( + key = PublicKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.X_509) expected = str(binascii.hexlify(self.bytes_1024)) @@ -216,10 +221,10 @@ class TestPublicKey(testtools.TestCase): Test that the equality operator returns True when comparing two PublicKey objects with the same data. """ - a = objects.PublicKey( + a = PublicKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.X_509) - b = objects.PublicKey( + b = PublicKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.X_509) self.assertTrue(a == b) @@ -230,10 +235,10 @@ class TestPublicKey(testtools.TestCase): Test that the equality operator returns False when comparing two PublicKey objects with different data. """ - a = objects.PublicKey( + a = PublicKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.X_509) - b = objects.PublicKey( + b = PublicKey( enums.CryptographicAlgorithm.AES, 1024, self.bytes_1024, enums.KeyFormatType.X_509) self.assertFalse(a == b) @@ -244,10 +249,10 @@ class TestPublicKey(testtools.TestCase): Test that the equality operator returns False when comparing two PublicKey objects with different data. """ - a = objects.PublicKey( + a = PublicKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.X_509) - b = objects.PublicKey( + b = PublicKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_1024, enums.KeyFormatType.X_509) self.assertFalse(a == b) @@ -258,10 +263,10 @@ class TestPublicKey(testtools.TestCase): Test that the equality operator returns False when comparing two PublicKey objects with different data. """ - a = objects.PublicKey( + a = PublicKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.X_509) - b = objects.PublicKey( + b = PublicKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_2048, enums.KeyFormatType.X_509) self.assertFalse(a == b) @@ -272,10 +277,10 @@ class TestPublicKey(testtools.TestCase): Test that the equality operator returns False when comparing two PublicKey objects with different data. """ - a = objects.PublicKey( + a = PublicKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.X_509) - b = objects.PublicKey( + b = PublicKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_1) self.assertFalse(a == b) @@ -286,7 +291,7 @@ class TestPublicKey(testtools.TestCase): Test that the equality operator returns False when comparing a PublicKey object to a non-PublicKey object. """ - a = objects.PublicKey( + a = PublicKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.X_509) b = "invalid" @@ -298,10 +303,10 @@ class TestPublicKey(testtools.TestCase): Test that the inequality operator returns False when comparing two PublicKey objects with the same internal data. """ - a = objects.PublicKey( + a = PublicKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, enums.KeyFormatType.PKCS_1) - b = objects.PublicKey( + b = PublicKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, enums.KeyFormatType.PKCS_1) self.assertFalse(a != b) @@ -312,10 +317,10 @@ class TestPublicKey(testtools.TestCase): Test that the equality operator returns True when comparing two PublicKey objects with different data. """ - a = objects.PublicKey( + a = PublicKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, enums.KeyFormatType.PKCS_1) - b = objects.PublicKey( + b = PublicKey( enums.CryptographicAlgorithm.AES, 2048, self.bytes_2048, enums.KeyFormatType.PKCS_1) self.assertTrue(a != b) @@ -326,10 +331,10 @@ class TestPublicKey(testtools.TestCase): Test that the equality operator returns True when comparing two PublicKey objects with different data. """ - a = objects.PublicKey( + a = PublicKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, enums.KeyFormatType.PKCS_1) - b = objects.PublicKey( + b = PublicKey( enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024, enums.KeyFormatType.PKCS_1) self.assertTrue(a != b) @@ -340,10 +345,10 @@ class TestPublicKey(testtools.TestCase): Test that the equality operator returns True when comparing two PublicKey objects with different data. """ - a = objects.PublicKey( + a = PublicKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, enums.KeyFormatType.PKCS_1) - b = objects.PublicKey( + b = PublicKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_1024, enums.KeyFormatType.PKCS_1) self.assertTrue(a != b) @@ -354,10 +359,10 @@ class TestPublicKey(testtools.TestCase): Test that the equality operator returns True when comparing two PublicKey objects with different data. """ - a = objects.PublicKey( + a = PublicKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, enums.KeyFormatType.PKCS_1) - b = objects.PublicKey( + b = PublicKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, enums.KeyFormatType.X_509) self.assertTrue(a != b) @@ -368,9 +373,295 @@ class TestPublicKey(testtools.TestCase): Test that the equality operator returns True when comparing a PublicKey object to a non-PublicKey object. """ - a = objects.PublicKey( + a = PublicKey( enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, enums.KeyFormatType.PKCS_1) b = "invalid" self.assertTrue(a != b) self.assertTrue(b != a) + + def test_save(self): + """ + Test that the object can be saved using SQLAlchemy. This will add it to + the database, verify that no exceptions are thrown, and check that its + unique identifier was set. + """ + key = PublicKey( + enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, + enums.KeyFormatType.PKCS_1) + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + self.assertIsNotNone(key.unique_identifier) + + def test_get(self): + """ + Test that the object can be saved and then retrieved using SQLAlchemy. + This adds is to the database and then retrieves it by ID and verifies + some of the attributes. + """ + test_name = 'bowser' + masks = [enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.WRAP_KEY] + key = PublicKey( + enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, + enums.KeyFormatType.PKCS_1, masks=masks, name=test_name) + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + session = Session() + get_obj = session.query(PublicKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEqual(1, len(get_obj.names)) + self.assertEqual([test_name], get_obj.names) + self.assertEqual(enums.ObjectType.PUBLIC_KEY, get_obj.object_type) + self.assertEqual(self.bytes_2048, get_obj.value) + self.assertEqual(enums.CryptographicAlgorithm.RSA, + get_obj.cryptographic_algorithm) + self.assertEqual(2048, get_obj.cryptographic_length) + self.assertEqual(enums.KeyFormatType.PKCS_1, get_obj.key_format_type) + self.assertEqual(masks, get_obj.cryptographic_usage_masks) + + def test_add_multiple_names(self): + """ + Test that multiple names can be added to a managed object. This + verifies a few properties. First this verifies that names can be added + using simple strings. It also verifies that the index for each + subsequent string is set accordingly. Finally this tests that the names + can be saved and retrieved from the database. + """ + expected_names = ['bowser', 'frumpy', 'big fat cat'] + key = PublicKey( + enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, + enums.KeyFormatType.PKCS_1, name=expected_names[0]) + key.names.append(expected_names[1]) + key.names.append(expected_names[2]) + self.assertEquals(3, key.name_index) + expected_mo_names = list() + for i, name in enumerate(expected_names): + expected_mo_names.append(sqltypes.ManagedObjectName(name, i)) + self.assertEquals(expected_mo_names, key._names) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + session = Session() + get_obj = session.query(PublicKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_mo_names, get_obj._names) + + def test_remove_name(self): + """ + Tests that a name can be removed from the list of names. This will + verify that the list of names is correct. It will verify that updating + this object removes the name from the database. + """ + names = ['bowser', 'frumpy', 'big fat cat'] + remove_index = 1 + key = PublicKey( + enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, + enums.KeyFormatType.PKCS_1, name=names[0]) + key.names.append(names[1]) + key.names.append(names[2]) + key.names.pop(remove_index) + self.assertEquals(3, key.name_index) + + expected_names = list() + expected_mo_names = list() + for i, name in enumerate(names): + if i != remove_index: + expected_names.append(name) + expected_mo_names.append(sqltypes.ManagedObjectName(name, i)) + self.assertEquals(expected_names, key.names) + self.assertEquals(expected_mo_names, key._names) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + session = Session() + get_obj = session.query(PublicKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names) + + def test_remove_and_add_name(self): + """ + Tests that names can be removed from the list of names and more added. + This will verify that the list of names is correct. It will verify that + updating this object removes the name from the database. It will verify + that the indices for the removed names are not reused. + """ + names = ['bowser', 'frumpy', 'big fat cat'] + key = PublicKey( + enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, + enums.KeyFormatType.PKCS_1, name=names[0]) + key.names.append(names[1]) + key.names.append(names[2]) + key.names.pop() + key.names.pop() + key.names.append('dog') + self.assertEquals(4, key.name_index) + + expected_names = ['bowser', 'dog'] + expected_mo_names = list() + expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0], + 0)) + expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1], + 3)) + self.assertEquals(expected_names, key.names) + self.assertEquals(expected_mo_names, key._names) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + session = Session() + get_obj = session.query(PublicKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names) + + def test_update_with_add_name(self): + """ + Tests that an OpaqueObject already stored in the database can be + updated. This will store an OpaqueObject in the database. It will add a + name to it in one session, and then retrieve it in another session to + verify that it has all of the correct names. + + This test and the subsequent test_udpate_* methods are different than + the name tests above because these are updating objects already stored + in the database. This tests will simulate what happens when the KMIP + client calls an add attribute method. + """ + first_name = 'bowser' + key = PublicKey( + enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, + enums.KeyFormatType.PKCS_1, name=first_name) + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + added_name = 'frumpy' + expected_names = [first_name, added_name] + expected_mo_names = list() + for i, name in enumerate(expected_names): + expected_mo_names.append(sqltypes.ManagedObjectName(name, i)) + + session = Session() + update_key = session.query(PublicKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + update_key.names.append(added_name) + session.commit() + + session = Session() + get_obj = session.query(PublicKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names) + + def test_update_with_remove_name(self): + """ + Tests that an OpaqueObject already stored in the database can be + updated. This will store an OpaqueObject in the database. It will + remove a name from it in one session, and then retrieve it in another + session to verify that it has all of the correct names. + """ + names = ['bowser', 'frumpy', 'big fat cat'] + remove_index = 1 + key = PublicKey( + enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, + enums.KeyFormatType.PKCS_1, name=names[0]) + key.names.append(names[1]) + key.names.append(names[2]) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + expected_names = list() + expected_mo_names = list() + for i, name in enumerate(names): + if i != remove_index: + expected_names.append(name) + expected_mo_names.append(sqltypes.ManagedObjectName(name, i)) + + session = Session() + update_key = session.query(PublicKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + update_key.names.pop(remove_index) + session.commit() + + session = Session() + get_obj = session.query(PublicKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names) + + def test_update_with_remove_and_add_name(self): + """ + Tests that an OpaqueObject already stored in the database can be + updated. This will store an OpaqueObject in the database. It will + remove a name and add another one to it in one session, and then + retrieve it in another session to verify that it has all of the correct + names. This simulates multiple operation being sent for the same + object. + """ + names = ['bowser', 'frumpy', 'big fat cat'] + key = PublicKey( + enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048, + enums.KeyFormatType.PKCS_1, name=names[0]) + key.names.append(names[1]) + key.names.append(names[2]) + + Session = sessionmaker(bind=self.engine) + session = Session() + session.add(key) + session.commit() + + session = Session() + update_key = session.query(PublicKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + update_key.names.pop() + update_key.names.pop() + update_key.names.append('dog') + session.commit() + + expected_names = ['bowser', 'dog'] + expected_mo_names = list() + expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0], + 0)) + expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1], + 3)) + + session = Session() + get_obj = session.query(PublicKey).filter( + ManagedObject.unique_identifier == key.unique_identifier + ).one() + session.commit() + self.assertEquals(expected_names, get_obj.names) + self.assertEquals(expected_mo_names, get_obj._names)