Added SQLAlchemy Code for Asymmetric Keys

The code for persisting public and private keys in a database has been
added along with the corresponding unit tests.
This commit is contained in:
Nathan Reller 2016-02-19 09:36:43 -05:00
parent 5e52f9d943
commit c3680f7609
3 changed files with 680 additions and 72 deletions

View File

@ -388,6 +388,15 @@ class PublicKey(Key):
names: The list of string names of the PublicKey.
"""
__tablename__ = 'public_keys'
unique_identifier = Column('uid', Integer,
ForeignKey('keys.uid'),
primary_key=True)
__mapper_args__ = {
'polymorphic_identity': enums.ObjectType.PUBLIC_KEY
}
def __init__(self, algorithm, length, value,
format_type=enums.KeyFormatType.X_509, masks=None,
name='Public Key'):
@ -507,6 +516,10 @@ class PublicKey(Key):
return NotImplemented
event.listen(PublicKey._names, 'append',
sql.attribute_append_factory("name_index"), retval=False)
class PrivateKey(Key):
"""
The PrivateKey class of the simplified KMIP object hierarchy.
@ -526,6 +539,15 @@ class PrivateKey(Key):
to 'Private Key'.
"""
__tablename__ = 'private_keys'
unique_identifier = Column('uid', Integer,
ForeignKey('keys.uid'),
primary_key=True)
__mapper_args__ = {
'polymorphic_identity': enums.ObjectType.PRIVATE_KEY
}
def __init__(self, algorithm, length, value, format_type, masks=None,
name='Private Key'):
"""
@ -643,6 +665,10 @@ class PrivateKey(Key):
return NotImplemented
event.listen(PrivateKey._names, 'append',
sql.attribute_append_factory("name_index"), retval=False)
class Certificate(CryptographicObject):
"""
The Certificate class of the simplified KMIP object hierarchy.

View File

@ -31,7 +31,10 @@ import binascii
import testtools
from kmip.core import enums
from kmip.pie import objects
from kmip.pie import sqltypes
from kmip.pie.objects import ManagedObject, PrivateKey
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
class TestPrivateKey(testtools.TestCase):
@ -160,6 +163,8 @@ class TestPrivateKey(testtools.TestCase):
b'\x17\xE1\xF0\xC9\xB2\x3A\xFF\xA4\xD4\x96\x61\x8D\xBC\x02\x49\x86'
b'\xED\x69\x0B\xBB\x7B\x02\x57\x68\xFF\x9D\xF8\xAC\x15\x41\x6F\x48'
b'\x9F\x81\x29\xC3\x23\x41\xA8\xB4\x4F')
self.engine = create_engine('sqlite:///:memory:', echo=True)
sqltypes.Base.metadata.create_all(self.engine)
def tearDown(self):
super(TestPrivateKey, self).tearDown()
@ -168,7 +173,7 @@ class TestPrivateKey(testtools.TestCase):
"""
Test that a PrivateKey object can be instantiated.
"""
key = objects.PrivateKey(
key = PrivateKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
@ -184,7 +189,7 @@ class TestPrivateKey(testtools.TestCase):
"""
Test that a PrivateKey object can be instantiated with all arguments.
"""
key = objects.PrivateKey(
key = PrivateKey(
enums.CryptographicAlgorithm.RSA,
1024,
self.bytes_1024,
@ -208,7 +213,7 @@ class TestPrivateKey(testtools.TestCase):
Test that the object type can be retrieved from the PrivateKey.
"""
expected = enums.ObjectType.PRIVATE_KEY
key = objects.PrivateKey(
key = PrivateKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
observed = key.object_type
@ -220,7 +225,7 @@ class TestPrivateKey(testtools.TestCase):
used to construct a PrivateKey.
"""
args = ('invalid', 1024, self.bytes_1024, enums.KeyFormatType.PKCS_8)
self.assertRaises(TypeError, objects.PrivateKey, *args)
self.assertRaises(TypeError, PrivateKey, *args)
def test_validate_on_invalid_length(self):
"""
@ -229,7 +234,7 @@ class TestPrivateKey(testtools.TestCase):
"""
args = (enums.CryptographicAlgorithm.RSA, 'invalid', self.bytes_1024,
enums.KeyFormatType.PKCS_8)
self.assertRaises(TypeError, objects.PrivateKey, *args)
self.assertRaises(TypeError, PrivateKey, *args)
def test_validate_on_invalid_value(self):
"""
@ -238,7 +243,7 @@ class TestPrivateKey(testtools.TestCase):
"""
args = (enums.CryptographicAlgorithm.RSA, 1024, 0,
enums.KeyFormatType.PKCS_8)
self.assertRaises(TypeError, objects.PrivateKey, *args)
self.assertRaises(TypeError, PrivateKey, *args)
def test_validate_on_invalid_format_type(self):
"""
@ -247,7 +252,7 @@ class TestPrivateKey(testtools.TestCase):
"""
args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
'invalid')
self.assertRaises(TypeError, objects.PrivateKey, *args)
self.assertRaises(TypeError, PrivateKey, *args)
def test_validate_on_invalid_format_type_value(self):
"""
@ -256,7 +261,7 @@ class TestPrivateKey(testtools.TestCase):
"""
args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.OPAQUE)
self.assertRaises(ValueError, objects.PrivateKey, *args)
self.assertRaises(ValueError, PrivateKey, *args)
def test_validate_on_invalid_masks(self):
"""
@ -266,7 +271,7 @@ class TestPrivateKey(testtools.TestCase):
args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
kwargs = {'masks': 'invalid'}
self.assertRaises(TypeError, objects.PrivateKey, *args, **kwargs)
self.assertRaises(TypeError, PrivateKey, *args, **kwargs)
def test_validate_on_invalid_mask(self):
"""
@ -276,7 +281,7 @@ class TestPrivateKey(testtools.TestCase):
args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
kwargs = {'masks': ['invalid']}
self.assertRaises(TypeError, objects.PrivateKey, *args, **kwargs)
self.assertRaises(TypeError, PrivateKey, *args, **kwargs)
def test_validate_on_invalid_name(self):
"""
@ -286,13 +291,13 @@ class TestPrivateKey(testtools.TestCase):
args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
kwargs = {'name': 0}
self.assertRaises(TypeError, objects.PrivateKey, *args, **kwargs)
self.assertRaises(TypeError, PrivateKey, *args, **kwargs)
def test_repr(self):
"""
Test that repr can be applied to a PrivateKey.
"""
key = objects.PrivateKey(
key = PrivateKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
args = "algorithm={0}, length={1}, value={2}, format_type={3}".format(
@ -306,7 +311,7 @@ class TestPrivateKey(testtools.TestCase):
"""
Test that str can be applied to a PrivateKey.
"""
key = objects.PrivateKey(
key = PrivateKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
expected = str(binascii.hexlify(self.bytes_1024))
@ -318,10 +323,10 @@ class TestPrivateKey(testtools.TestCase):
Test that the equality operator returns True when comparing two
PrivateKey objects with the same data.
"""
a = objects.PrivateKey(
a = PrivateKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
b = objects.PrivateKey(
b = PrivateKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
self.assertTrue(a == b)
@ -332,10 +337,10 @@ class TestPrivateKey(testtools.TestCase):
Test that the equality operator returns False when comparing two
PrivateKey objects with different data.
"""
a = objects.PrivateKey(
a = PrivateKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
b = objects.PrivateKey(
b = PrivateKey(
enums.CryptographicAlgorithm.AES, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
self.assertFalse(a == b)
@ -346,10 +351,10 @@ class TestPrivateKey(testtools.TestCase):
Test that the equality operator returns False when comparing two
PrivateKey objects with different data.
"""
a = objects.PrivateKey(
a = PrivateKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
b = objects.PrivateKey(
b = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
self.assertFalse(a == b)
@ -360,10 +365,10 @@ class TestPrivateKey(testtools.TestCase):
Test that the equality operator returns False when comparing two
PrivateKey objects with different data.
"""
a = objects.PrivateKey(
a = PrivateKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
b = objects.PrivateKey(
b = PrivateKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_2048,
enums.KeyFormatType.PKCS_8)
self.assertFalse(a == b)
@ -374,10 +379,10 @@ class TestPrivateKey(testtools.TestCase):
Test that the equality operator returns False when comparing two
PrivateKey objects with different data.
"""
a = objects.PrivateKey(
a = PrivateKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
b = objects.PrivateKey(
b = PrivateKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_1)
self.assertFalse(a == b)
@ -388,7 +393,7 @@ class TestPrivateKey(testtools.TestCase):
Test that the equality operator returns False when comparing a
PrivateKey object to a non-PrivateKey object.
"""
a = objects.PrivateKey(
a = PrivateKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
b = "invalid"
@ -400,10 +405,10 @@ class TestPrivateKey(testtools.TestCase):
Test that the inequality operator returns False when comparing
two PrivateKey objects with the same internal data.
"""
a = objects.PrivateKey(
a = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1)
b = objects.PrivateKey(
b = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1)
self.assertFalse(a != b)
@ -414,10 +419,10 @@ class TestPrivateKey(testtools.TestCase):
Test that the equality operator returns True when comparing two
PrivateKey objects with different data.
"""
a = objects.PrivateKey(
a = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1)
b = objects.PrivateKey(
b = PrivateKey(
enums.CryptographicAlgorithm.AES, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1)
self.assertTrue(a != b)
@ -428,10 +433,10 @@ class TestPrivateKey(testtools.TestCase):
Test that the equality operator returns True when comparing two
PrivateKey objects with different data.
"""
a = objects.PrivateKey(
a = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_8)
b = objects.PrivateKey(
b = PrivateKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
self.assertTrue(a != b)
@ -442,10 +447,10 @@ class TestPrivateKey(testtools.TestCase):
Test that the equality operator returns True when comparing two
PrivateKey objects with different data.
"""
a = objects.PrivateKey(
a = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_8)
b = objects.PrivateKey(
b = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_1024,
enums.KeyFormatType.PKCS_8)
self.assertTrue(a != b)
@ -456,10 +461,10 @@ class TestPrivateKey(testtools.TestCase):
Test that the equality operator returns True when comparing two
PrivateKey objects with different data.
"""
a = objects.PrivateKey(
a = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_8)
b = objects.PrivateKey(
b = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1)
self.assertTrue(a != b)
@ -470,9 +475,295 @@ class TestPrivateKey(testtools.TestCase):
Test that the equality operator returns True when comparing a
PrivateKey object to a non-PrivateKey object.
"""
a = objects.PrivateKey(
a = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1)
b = "invalid"
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_save(self):
"""
Test that the object can be saved using SQLAlchemy. This will add it to
the database, verify that no exceptions are thrown, and check that its
unique identifier was set.
"""
key = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
self.assertIsNotNone(key.unique_identifier)
def test_get(self):
"""
Test that the object can be saved and then retrieved using SQLAlchemy.
This adds is to the database and then retrieves it by ID and verifies
some of the attributes.
"""
test_name = 'bowser'
masks = [enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.WRAP_KEY]
key = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1, masks=masks, name=test_name)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
session = Session()
get_obj = session.query(PrivateKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEqual(1, len(get_obj.names))
self.assertEqual([test_name], get_obj.names)
self.assertEqual(enums.ObjectType.PRIVATE_KEY, get_obj.object_type)
self.assertEqual(self.bytes_2048, get_obj.value)
self.assertEqual(enums.CryptographicAlgorithm.RSA,
get_obj.cryptographic_algorithm)
self.assertEqual(2048, get_obj.cryptographic_length)
self.assertEqual(enums.KeyFormatType.PKCS_1, get_obj.key_format_type)
self.assertEqual(masks, get_obj.cryptographic_usage_masks)
def test_add_multiple_names(self):
"""
Test that multiple names can be added to a managed object. This
verifies a few properties. First this verifies that names can be added
using simple strings. It also verifies that the index for each
subsequent string is set accordingly. Finally this tests that the names
can be saved and retrieved from the database.
"""
expected_names = ['bowser', 'frumpy', 'big fat cat']
key = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1, name=expected_names[0])
key.names.append(expected_names[1])
key.names.append(expected_names[2])
self.assertEquals(3, key.name_index)
expected_mo_names = list()
for i, name in enumerate(expected_names):
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
self.assertEquals(expected_mo_names, key._names)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
session = Session()
get_obj = session.query(PrivateKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_mo_names, get_obj._names)
def test_remove_name(self):
"""
Tests that a name can be removed from the list of names. This will
verify that the list of names is correct. It will verify that updating
this object removes the name from the database.
"""
names = ['bowser', 'frumpy', 'big fat cat']
remove_index = 1
key = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1, name=names[0])
key.names.append(names[1])
key.names.append(names[2])
key.names.pop(remove_index)
self.assertEquals(3, key.name_index)
expected_names = list()
expected_mo_names = list()
for i, name in enumerate(names):
if i != remove_index:
expected_names.append(name)
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
self.assertEquals(expected_names, key.names)
self.assertEquals(expected_mo_names, key._names)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
session = Session()
get_obj = session.query(PrivateKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_names, get_obj.names)
self.assertEquals(expected_mo_names, get_obj._names)
def test_remove_and_add_name(self):
"""
Tests that names can be removed from the list of names and more added.
This will verify that the list of names is correct. It will verify that
updating this object removes the name from the database. It will verify
that the indices for the removed names are not reused.
"""
names = ['bowser', 'frumpy', 'big fat cat']
key = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1, name=names[0])
key.names.append(names[1])
key.names.append(names[2])
key.names.pop()
key.names.pop()
key.names.append('dog')
self.assertEquals(4, key.name_index)
expected_names = ['bowser', 'dog']
expected_mo_names = list()
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0],
0))
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1],
3))
self.assertEquals(expected_names, key.names)
self.assertEquals(expected_mo_names, key._names)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
session = Session()
get_obj = session.query(PrivateKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_names, get_obj.names)
self.assertEquals(expected_mo_names, get_obj._names)
def test_update_with_add_name(self):
"""
Tests that an OpaqueObject already stored in the database can be
updated. This will store an OpaqueObject in the database. It will add a
name to it in one session, and then retrieve it in another session to
verify that it has all of the correct names.
This test and the subsequent test_udpate_* methods are different than
the name tests above because these are updating objects already stored
in the database. This tests will simulate what happens when the KMIP
client calls an add attribute method.
"""
first_name = 'bowser'
key = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1, name=first_name)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
added_name = 'frumpy'
expected_names = [first_name, added_name]
expected_mo_names = list()
for i, name in enumerate(expected_names):
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
session = Session()
update_key = session.query(PrivateKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
update_key.names.append(added_name)
session.commit()
session = Session()
get_obj = session.query(PrivateKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_names, get_obj.names)
self.assertEquals(expected_mo_names, get_obj._names)
def test_update_with_remove_name(self):
"""
Tests that an OpaqueObject already stored in the database can be
updated. This will store an OpaqueObject in the database. It will
remove a name from it in one session, and then retrieve it in another
session to verify that it has all of the correct names.
"""
names = ['bowser', 'frumpy', 'big fat cat']
remove_index = 1
key = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1, name=names[0])
key.names.append(names[1])
key.names.append(names[2])
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
expected_names = list()
expected_mo_names = list()
for i, name in enumerate(names):
if i != remove_index:
expected_names.append(name)
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
session = Session()
update_key = session.query(PrivateKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
update_key.names.pop(remove_index)
session.commit()
session = Session()
get_obj = session.query(PrivateKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_names, get_obj.names)
self.assertEquals(expected_mo_names, get_obj._names)
def test_update_with_remove_and_add_name(self):
"""
Tests that an OpaqueObject already stored in the database can be
updated. This will store an OpaqueObject in the database. It will
remove a name and add another one to it in one session, and then
retrieve it in another session to verify that it has all of the correct
names. This simulates multiple operation being sent for the same
object.
"""
names = ['bowser', 'frumpy', 'big fat cat']
key = PrivateKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1, name=names[0])
key.names.append(names[1])
key.names.append(names[2])
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
session = Session()
update_key = session.query(PrivateKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
update_key.names.pop()
update_key.names.pop()
update_key.names.append('dog')
session.commit()
expected_names = ['bowser', 'dog']
expected_mo_names = list()
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0],
0))
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1],
3))
session = Session()
get_obj = session.query(PrivateKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_names, get_obj.names)
self.assertEquals(expected_mo_names, get_obj._names)

View File

@ -17,7 +17,10 @@ import binascii
import testtools
from kmip.core import enums
from kmip.pie import objects
from kmip.pie import sqltypes
from kmip.pie.objects import ManagedObject, PublicKey
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
class TestPublicKey(testtools.TestCase):
@ -59,6 +62,8 @@ class TestPublicKey(testtools.TestCase):
b'\x94\x6A\x9A\xC9\x9B\x1C\x28\x15\xC3\x61\x2A\x29\xA8\x2D\x73\xA1'
b'\xF9\x93\x74\xFE\x30\xE5\x49\x51\x66\x2A\x6E\xDA\x29\xC6\xFC\x41'
b'\x13\x35\xD5\xDC\x74\x26\xB0\xF6\x05\x02\x03\x01\x00\x01')
self.engine = create_engine('sqlite:///:memory:', echo=True)
sqltypes.Base.metadata.create_all(self.engine)
def tearDown(self):
super(TestPublicKey, self).tearDown()
@ -67,7 +72,7 @@ class TestPublicKey(testtools.TestCase):
"""
Test that a PublicKey object can be instantiated.
"""
key = objects.PublicKey(
key = PublicKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024)
self.assertEqual(
@ -82,7 +87,7 @@ class TestPublicKey(testtools.TestCase):
"""
Test that a PublicKey object can be instantiated with all arguments.
"""
key = objects.PublicKey(
key = PublicKey(
enums.CryptographicAlgorithm.RSA,
1024,
self.bytes_1024,
@ -106,7 +111,7 @@ class TestPublicKey(testtools.TestCase):
Test that the object type can be retrieved from the PublicKey.
"""
expected = enums.ObjectType.PUBLIC_KEY
key = objects.PublicKey(
key = PublicKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.X_509)
observed = key.object_type
@ -118,7 +123,7 @@ class TestPublicKey(testtools.TestCase):
used to construct a PublicKey.
"""
args = ('invalid', 1024, self.bytes_1024, enums.KeyFormatType.X_509)
self.assertRaises(TypeError, objects.PublicKey, *args)
self.assertRaises(TypeError, PublicKey, *args)
def test_validate_on_invalid_length(self):
"""
@ -127,7 +132,7 @@ class TestPublicKey(testtools.TestCase):
"""
args = (enums.CryptographicAlgorithm.RSA, 'invalid', self.bytes_1024,
enums.KeyFormatType.X_509)
self.assertRaises(TypeError, objects.PublicKey, *args)
self.assertRaises(TypeError, PublicKey, *args)
def test_validate_on_invalid_value(self):
"""
@ -136,7 +141,7 @@ class TestPublicKey(testtools.TestCase):
"""
args = (enums.CryptographicAlgorithm.RSA, 1024, 0,
enums.KeyFormatType.X_509)
self.assertRaises(TypeError, objects.PublicKey, *args)
self.assertRaises(TypeError, PublicKey, *args)
def test_validate_on_invalid_format_type(self):
"""
@ -145,7 +150,7 @@ class TestPublicKey(testtools.TestCase):
"""
args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
'invalid')
self.assertRaises(TypeError, objects.PublicKey, *args)
self.assertRaises(TypeError, PublicKey, *args)
def test_validate_on_invalid_format_type_value(self):
"""
@ -154,7 +159,7 @@ class TestPublicKey(testtools.TestCase):
"""
args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.OPAQUE)
self.assertRaises(ValueError, objects.PublicKey, *args)
self.assertRaises(ValueError, PublicKey, *args)
def test_validate_on_invalid_masks(self):
"""
@ -164,7 +169,7 @@ class TestPublicKey(testtools.TestCase):
args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.X_509)
kwargs = {'masks': 'invalid'}
self.assertRaises(TypeError, objects.PublicKey, *args, **kwargs)
self.assertRaises(TypeError, PublicKey, *args, **kwargs)
def test_validate_on_invalid_mask(self):
"""
@ -174,7 +179,7 @@ class TestPublicKey(testtools.TestCase):
args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.X_509)
kwargs = {'masks': ['invalid']}
self.assertRaises(TypeError, objects.PublicKey, *args, **kwargs)
self.assertRaises(TypeError, PublicKey, *args, **kwargs)
def test_validate_on_invalid_name(self):
"""
@ -184,13 +189,13 @@ class TestPublicKey(testtools.TestCase):
args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.X_509)
kwargs = {'name': 0}
self.assertRaises(TypeError, objects.PublicKey, *args, **kwargs)
self.assertRaises(TypeError, PublicKey, *args, **kwargs)
def test_repr(self):
"""
Test that repr can be applied to a PublicKey.
"""
key = objects.PublicKey(
key = PublicKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.X_509)
args = "algorithm={0}, length={1}, value={2}, format_type={3}".format(
@ -204,7 +209,7 @@ class TestPublicKey(testtools.TestCase):
"""
Test that str can be applied to a PublicKey.
"""
key = objects.PublicKey(
key = PublicKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.X_509)
expected = str(binascii.hexlify(self.bytes_1024))
@ -216,10 +221,10 @@ class TestPublicKey(testtools.TestCase):
Test that the equality operator returns True when comparing two
PublicKey objects with the same data.
"""
a = objects.PublicKey(
a = PublicKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.X_509)
b = objects.PublicKey(
b = PublicKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.X_509)
self.assertTrue(a == b)
@ -230,10 +235,10 @@ class TestPublicKey(testtools.TestCase):
Test that the equality operator returns False when comparing two
PublicKey objects with different data.
"""
a = objects.PublicKey(
a = PublicKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.X_509)
b = objects.PublicKey(
b = PublicKey(
enums.CryptographicAlgorithm.AES, 1024, self.bytes_1024,
enums.KeyFormatType.X_509)
self.assertFalse(a == b)
@ -244,10 +249,10 @@ class TestPublicKey(testtools.TestCase):
Test that the equality operator returns False when comparing two
PublicKey objects with different data.
"""
a = objects.PublicKey(
a = PublicKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.X_509)
b = objects.PublicKey(
b = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_1024,
enums.KeyFormatType.X_509)
self.assertFalse(a == b)
@ -258,10 +263,10 @@ class TestPublicKey(testtools.TestCase):
Test that the equality operator returns False when comparing two
PublicKey objects with different data.
"""
a = objects.PublicKey(
a = PublicKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.X_509)
b = objects.PublicKey(
b = PublicKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_2048,
enums.KeyFormatType.X_509)
self.assertFalse(a == b)
@ -272,10 +277,10 @@ class TestPublicKey(testtools.TestCase):
Test that the equality operator returns False when comparing two
PublicKey objects with different data.
"""
a = objects.PublicKey(
a = PublicKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.X_509)
b = objects.PublicKey(
b = PublicKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_1)
self.assertFalse(a == b)
@ -286,7 +291,7 @@ class TestPublicKey(testtools.TestCase):
Test that the equality operator returns False when comparing a
PublicKey object to a non-PublicKey object.
"""
a = objects.PublicKey(
a = PublicKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.X_509)
b = "invalid"
@ -298,10 +303,10 @@ class TestPublicKey(testtools.TestCase):
Test that the inequality operator returns False when comparing
two PublicKey objects with the same internal data.
"""
a = objects.PublicKey(
a = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1)
b = objects.PublicKey(
b = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1)
self.assertFalse(a != b)
@ -312,10 +317,10 @@ class TestPublicKey(testtools.TestCase):
Test that the equality operator returns True when comparing two
PublicKey objects with different data.
"""
a = objects.PublicKey(
a = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1)
b = objects.PublicKey(
b = PublicKey(
enums.CryptographicAlgorithm.AES, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1)
self.assertTrue(a != b)
@ -326,10 +331,10 @@ class TestPublicKey(testtools.TestCase):
Test that the equality operator returns True when comparing two
PublicKey objects with different data.
"""
a = objects.PublicKey(
a = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1)
b = objects.PublicKey(
b = PublicKey(
enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
enums.KeyFormatType.PKCS_1)
self.assertTrue(a != b)
@ -340,10 +345,10 @@ class TestPublicKey(testtools.TestCase):
Test that the equality operator returns True when comparing two
PublicKey objects with different data.
"""
a = objects.PublicKey(
a = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1)
b = objects.PublicKey(
b = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_1024,
enums.KeyFormatType.PKCS_1)
self.assertTrue(a != b)
@ -354,10 +359,10 @@ class TestPublicKey(testtools.TestCase):
Test that the equality operator returns True when comparing two
PublicKey objects with different data.
"""
a = objects.PublicKey(
a = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1)
b = objects.PublicKey(
b = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.X_509)
self.assertTrue(a != b)
@ -368,9 +373,295 @@ class TestPublicKey(testtools.TestCase):
Test that the equality operator returns True when comparing a
PublicKey object to a non-PublicKey object.
"""
a = objects.PublicKey(
a = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1)
b = "invalid"
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_save(self):
"""
Test that the object can be saved using SQLAlchemy. This will add it to
the database, verify that no exceptions are thrown, and check that its
unique identifier was set.
"""
key = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
self.assertIsNotNone(key.unique_identifier)
def test_get(self):
"""
Test that the object can be saved and then retrieved using SQLAlchemy.
This adds is to the database and then retrieves it by ID and verifies
some of the attributes.
"""
test_name = 'bowser'
masks = [enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.WRAP_KEY]
key = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1, masks=masks, name=test_name)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
session = Session()
get_obj = session.query(PublicKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEqual(1, len(get_obj.names))
self.assertEqual([test_name], get_obj.names)
self.assertEqual(enums.ObjectType.PUBLIC_KEY, get_obj.object_type)
self.assertEqual(self.bytes_2048, get_obj.value)
self.assertEqual(enums.CryptographicAlgorithm.RSA,
get_obj.cryptographic_algorithm)
self.assertEqual(2048, get_obj.cryptographic_length)
self.assertEqual(enums.KeyFormatType.PKCS_1, get_obj.key_format_type)
self.assertEqual(masks, get_obj.cryptographic_usage_masks)
def test_add_multiple_names(self):
"""
Test that multiple names can be added to a managed object. This
verifies a few properties. First this verifies that names can be added
using simple strings. It also verifies that the index for each
subsequent string is set accordingly. Finally this tests that the names
can be saved and retrieved from the database.
"""
expected_names = ['bowser', 'frumpy', 'big fat cat']
key = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1, name=expected_names[0])
key.names.append(expected_names[1])
key.names.append(expected_names[2])
self.assertEquals(3, key.name_index)
expected_mo_names = list()
for i, name in enumerate(expected_names):
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
self.assertEquals(expected_mo_names, key._names)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
session = Session()
get_obj = session.query(PublicKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_mo_names, get_obj._names)
def test_remove_name(self):
"""
Tests that a name can be removed from the list of names. This will
verify that the list of names is correct. It will verify that updating
this object removes the name from the database.
"""
names = ['bowser', 'frumpy', 'big fat cat']
remove_index = 1
key = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1, name=names[0])
key.names.append(names[1])
key.names.append(names[2])
key.names.pop(remove_index)
self.assertEquals(3, key.name_index)
expected_names = list()
expected_mo_names = list()
for i, name in enumerate(names):
if i != remove_index:
expected_names.append(name)
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
self.assertEquals(expected_names, key.names)
self.assertEquals(expected_mo_names, key._names)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
session = Session()
get_obj = session.query(PublicKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_names, get_obj.names)
self.assertEquals(expected_mo_names, get_obj._names)
def test_remove_and_add_name(self):
"""
Tests that names can be removed from the list of names and more added.
This will verify that the list of names is correct. It will verify that
updating this object removes the name from the database. It will verify
that the indices for the removed names are not reused.
"""
names = ['bowser', 'frumpy', 'big fat cat']
key = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1, name=names[0])
key.names.append(names[1])
key.names.append(names[2])
key.names.pop()
key.names.pop()
key.names.append('dog')
self.assertEquals(4, key.name_index)
expected_names = ['bowser', 'dog']
expected_mo_names = list()
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0],
0))
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1],
3))
self.assertEquals(expected_names, key.names)
self.assertEquals(expected_mo_names, key._names)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
session = Session()
get_obj = session.query(PublicKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_names, get_obj.names)
self.assertEquals(expected_mo_names, get_obj._names)
def test_update_with_add_name(self):
"""
Tests that an OpaqueObject already stored in the database can be
updated. This will store an OpaqueObject in the database. It will add a
name to it in one session, and then retrieve it in another session to
verify that it has all of the correct names.
This test and the subsequent test_udpate_* methods are different than
the name tests above because these are updating objects already stored
in the database. This tests will simulate what happens when the KMIP
client calls an add attribute method.
"""
first_name = 'bowser'
key = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1, name=first_name)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
added_name = 'frumpy'
expected_names = [first_name, added_name]
expected_mo_names = list()
for i, name in enumerate(expected_names):
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
session = Session()
update_key = session.query(PublicKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
update_key.names.append(added_name)
session.commit()
session = Session()
get_obj = session.query(PublicKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_names, get_obj.names)
self.assertEquals(expected_mo_names, get_obj._names)
def test_update_with_remove_name(self):
"""
Tests that an OpaqueObject already stored in the database can be
updated. This will store an OpaqueObject in the database. It will
remove a name from it in one session, and then retrieve it in another
session to verify that it has all of the correct names.
"""
names = ['bowser', 'frumpy', 'big fat cat']
remove_index = 1
key = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1, name=names[0])
key.names.append(names[1])
key.names.append(names[2])
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
expected_names = list()
expected_mo_names = list()
for i, name in enumerate(names):
if i != remove_index:
expected_names.append(name)
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
session = Session()
update_key = session.query(PublicKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
update_key.names.pop(remove_index)
session.commit()
session = Session()
get_obj = session.query(PublicKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_names, get_obj.names)
self.assertEquals(expected_mo_names, get_obj._names)
def test_update_with_remove_and_add_name(self):
"""
Tests that an OpaqueObject already stored in the database can be
updated. This will store an OpaqueObject in the database. It will
remove a name and add another one to it in one session, and then
retrieve it in another session to verify that it has all of the correct
names. This simulates multiple operation being sent for the same
object.
"""
names = ['bowser', 'frumpy', 'big fat cat']
key = PublicKey(
enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
enums.KeyFormatType.PKCS_1, name=names[0])
key.names.append(names[1])
key.names.append(names[2])
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
session = Session()
update_key = session.query(PublicKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
update_key.names.pop()
update_key.names.pop()
update_key.names.append('dog')
session.commit()
expected_names = ['bowser', 'dog']
expected_mo_names = list()
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0],
0))
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1],
3))
session = Session()
get_obj = session.query(PublicKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_names, get_obj.names)
self.assertEquals(expected_mo_names, get_obj._names)