Added SQLAlchemy Code for Symmetric Key

The code for persisting symmetric keys in a database has been added
along with the corresponding unit tests.

The usage mask list for cryptographic objects is stored as an integer bitmask.
The conversion takes place with a new SQLAlchemy type.

Switched ManagedObject value type to VARBINARY. This prevents errors from
occuring when trying to convert to a string.
This commit is contained in:
Nathan Reller 2016-02-16 15:59:49 -05:00
parent 4d6caf1de7
commit c21f07634b
3 changed files with 406 additions and 55 deletions

View File

@ -14,7 +14,7 @@
# under the License. # under the License.
from abc import abstractmethod from abc import abstractmethod
from sqlalchemy import Column, event, ForeignKey, Integer, VARCHAR from sqlalchemy import Column, event, ForeignKey, Integer, VARBINARY
from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
@ -46,7 +46,7 @@ class ManagedObject(sql.Base):
__tablename__ = 'managed_objects' __tablename__ = 'managed_objects'
unique_identifier = Column('uid', Integer, primary_key=True) unique_identifier = Column('uid', Integer, primary_key=True)
_object_type = Column('object_type', sql.EnumType(enums.ObjectType)) _object_type = Column('object_type', sql.EnumType(enums.ObjectType))
value = Column('value', VARCHAR(1024)) value = Column('value', VARBINARY(1024))
name_index = Column(Integer, default=0) name_index = Column(Integer, default=0)
_names = relationship('ManagedObjectName', back_populates='mo', _names = relationship('ManagedObjectName', back_populates='mo',
cascade='all, delete-orphan') cascade='all, delete-orphan')
@ -142,6 +142,16 @@ class CryptographicObject(ManagedObject):
describing how the CryptographicObject will be used. describing how the CryptographicObject will be used.
""" """
__tablename__ = 'crypto_objects'
unique_identifier = Column('uid', Integer,
ForeignKey('managed_objects.uid'),
primary_key=True)
cryptographic_usage_masks = Column('cryptographic_usage_mask',
sql.UsageMaskType)
__mapper_args__ = {
'polymorphic_identity': 0x80000001
}
@abstractmethod @abstractmethod
def __init__(self): def __init__(self):
""" """
@ -188,6 +198,20 @@ class Key(CryptographicObject):
the key value. the key value.
""" """
__tablename__ = 'keys'
unique_identifier = Column('uid', Integer,
ForeignKey('crypto_objects.uid'),
primary_key=True)
cryptographic_algorithm = Column(
'cryptographic_algorithm', sql.EnumType(enums.CryptographicAlgorithm))
cryptographic_length = Column('cryptographic_length', Integer)
key_format_type = Column(
'key_format_type', sql.EnumType(enums.KeyFormatType))
__mapper_args__ = {
'polymorphic_identity': 0x80000002
}
@abstractmethod @abstractmethod
def __init__(self): def __init__(self):
""" """
@ -226,6 +250,15 @@ class SymmetricKey(Key):
names: The string names of the SymmetricKey. names: The string names of the SymmetricKey.
""" """
__tablename__ = 'symmetric_keys'
unique_identifier = Column('uid', Integer,
ForeignKey('keys.uid'),
primary_key=True)
__mapper_args__ = {
'polymorphic_identity': enums.ObjectType.SYMMETRIC_KEY
}
def __init__(self, algorithm, length, value, masks=None, def __init__(self, algorithm, length, value, masks=None,
name='Symmetric Key'): name='Symmetric Key'):
""" """
@ -252,9 +285,7 @@ class SymmetricKey(Key):
self.names = [name] self.names = [name]
if masks: if masks:
self.cryptographic_usage_masks = masks self.cryptographic_usage_masks.extend(masks)
else:
self.cryptographic_usage_masks = list()
# All remaining attributes are not considered part of the public API # All remaining attributes are not considered part of the public API
# and are subject to change. # and are subject to change.
@ -282,8 +313,6 @@ class SymmetricKey(Key):
"enumeration") "enumeration")
elif not isinstance(self.cryptographic_length, six.integer_types): elif not isinstance(self.cryptographic_length, six.integer_types):
raise TypeError("key length must be an integer") raise TypeError("key length must be an integer")
elif not isinstance(self.cryptographic_usage_masks, list):
raise TypeError("key usage masks must be a list")
mask_count = len(self.cryptographic_usage_masks) mask_count = len(self.cryptographic_usage_masks)
for i in range(mask_count): for i in range(mask_count):
@ -337,6 +366,10 @@ class SymmetricKey(Key):
return NotImplemented return NotImplemented
event.listen(SymmetricKey._names, 'append',
sql.attribute_append_factory("name_index"), retval=False)
class PublicKey(Key): class PublicKey(Key):
""" """
The PublicKey class of the simplified KMIP object hierarchy. The PublicKey class of the simplified KMIP object hierarchy.
@ -389,8 +422,6 @@ class PublicKey(Key):
if masks: if masks:
self.cryptographic_usage_masks = masks self.cryptographic_usage_masks = masks
else:
self.cryptographic_usage_masks = list()
# All remaining attributes are not considered part of the public API # All remaining attributes are not considered part of the public API
# and are subject to change. # and are subject to change.
@ -422,8 +453,6 @@ class PublicKey(Key):
elif self.key_format_type not in self._valid_formats: elif self.key_format_type not in self._valid_formats:
raise ValueError("key format type must be one of {0}".format( raise ValueError("key format type must be one of {0}".format(
self._valid_formats)) self._valid_formats))
elif not isinstance(self.cryptographic_usage_masks, list):
raise TypeError("key usage masks must be a list")
# TODO (peter-hamilton) Verify that the key bytes match the key format # TODO (peter-hamilton) Verify that the key bytes match the key format
@ -529,8 +558,6 @@ class PrivateKey(Key):
if masks: if masks:
self.cryptographic_usage_masks = masks self.cryptographic_usage_masks = masks
else:
self.cryptographic_usage_masks = list()
# All remaining attributes are not considered part of the public API # All remaining attributes are not considered part of the public API
# and are subject to change. # and are subject to change.
@ -562,8 +589,6 @@ class PrivateKey(Key):
elif self.key_format_type not in self._valid_formats: elif self.key_format_type not in self._valid_formats:
raise ValueError("key format type must be one of {0}".format( raise ValueError("key format type must be one of {0}".format(
self._valid_formats)) self._valid_formats))
elif not isinstance(self.cryptographic_usage_masks, list):
raise TypeError("key usage masks must be a list")
# TODO (peter-hamilton) Verify that the key bytes match the key format # TODO (peter-hamilton) Verify that the key bytes match the key format
@ -658,8 +683,6 @@ class Certificate(CryptographicObject):
if masks: if masks:
self.cryptographic_usage_masks = masks self.cryptographic_usage_masks = masks
else:
self.cryptographic_usage_masks = list()
# All remaining attributes are not considered part of the public API # All remaining attributes are not considered part of the public API
# and are subject to change. # and are subject to change.
@ -687,8 +710,6 @@ class Certificate(CryptographicObject):
enums.CertificateTypeEnum): enums.CertificateTypeEnum):
raise TypeError("certificate type must be a CertificateTypeEnum " raise TypeError("certificate type must be a CertificateTypeEnum "
"enumeration") "enumeration")
elif not isinstance(self.cryptographic_usage_masks, list):
raise TypeError("certificate usage masks must be a list")
mask_count = len(self.cryptographic_usage_masks) mask_count = len(self.cryptographic_usage_masks)
for i in range(mask_count): for i in range(mask_count):
@ -808,8 +829,6 @@ class SecretData(CryptographicObject):
if masks: if masks:
self.cryptographic_usage_masks = masks self.cryptographic_usage_masks = masks
else:
self.cryptographic_usage_masks = list()
# All remaining attributes are not considered part of the public API # All remaining attributes are not considered part of the public API
# and are subject to change. # and are subject to change.
@ -831,8 +850,6 @@ class SecretData(CryptographicObject):
elif not isinstance(self.data_type, enums.SecretDataType): elif not isinstance(self.data_type, enums.SecretDataType):
raise TypeError("secret data type must be a SecretDataType " raise TypeError("secret data type must be a SecretDataType "
"enumeration") "enumeration")
elif not isinstance(self.cryptographic_usage_masks, list):
raise TypeError("secret data usage masks must be a list")
mask_count = len(self.cryptographic_usage_masks) mask_count = len(self.cryptographic_usage_masks)
for i in range(mask_count): for i in range(mask_count):

View File

@ -33,6 +33,49 @@ def attribute_append_factory(index_attribute):
return attribute_append return attribute_append
class UsageMaskType(types.TypeDecorator):
"""
Converts a list of enums.CryptographicUsageMask Enums in an integer
bitmask. This allows the database to only store an integer instead of a
list of enumbs. This also does the reverse of converting an integer bit
mask into a list enums.CryptographicUsageMask Enums.
"""
impl = types.Integer
def process_bind_param(self, value, dialect):
"""
Returns the integer value of the usage mask bitmask. This value is
stored in the database.
Args:
value(list<enums.CryptographicUsageMask>): list of enums in the
usage mask
dialect(string): SQL dialect
"""
bitmask = 0x00
for e in value:
bitmask = bitmask | e.value
return bitmask
def process_result_value(self, value, dialect):
"""
Returns a new list of enums.CryptographicUsageMask Enums. This converts
the integer value into the list of enums.
Args:
value(int): The integer value stored in the database that is used
to create the list of enums.CryptographicUsageMask Enums.
dialect(string): SQL dialect
"""
masks = list()
if value:
for e in enums.CryptographicUsageMask:
if e.value & value:
masks.append(e)
return masks
class EnumType(types.TypeDecorator): class EnumType(types.TypeDecorator):
""" """
Converts a Python enum to an integer before storing it in the database. Converts a Python enum to an integer before storing it in the database.

View File

@ -17,7 +17,10 @@ import binascii
import testtools import testtools
from kmip.core import enums from kmip.core import enums
from kmip.pie import objects from kmip.pie import sqltypes
from kmip.pie.objects import ManagedObject, SymmetricKey
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
class TestSymmetricKey(testtools.TestCase): class TestSymmetricKey(testtools.TestCase):
@ -44,6 +47,8 @@ class TestSymmetricKey(testtools.TestCase):
b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F' b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F'
b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1A\x1B\x1C\x1D\x1E' b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1A\x1B\x1C\x1D\x1E'
b'\x1F') b'\x1F')
self.engine = create_engine('sqlite:///:memory:', echo=True)
sqltypes.Base.metadata.create_all(self.engine)
def tearDown(self): def tearDown(self):
super(TestSymmetricKey, self).tearDown() super(TestSymmetricKey, self).tearDown()
@ -52,7 +57,7 @@ class TestSymmetricKey(testtools.TestCase):
""" """
Test that a SymmetricKey object can be instantiated. Test that a SymmetricKey object can be instantiated.
""" """
key = objects.SymmetricKey( key = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
self.assertEqual(key.cryptographic_algorithm, self.assertEqual(key.cryptographic_algorithm,
@ -66,7 +71,7 @@ class TestSymmetricKey(testtools.TestCase):
""" """
Test that a SymmetricKey object can be instantiated with all arguments. Test that a SymmetricKey object can be instantiated with all arguments.
""" """
key = objects.SymmetricKey( key = SymmetricKey(
enums.CryptographicAlgorithm.AES, enums.CryptographicAlgorithm.AES,
128, 128,
self.bytes_128a, self.bytes_128a,
@ -88,7 +93,7 @@ class TestSymmetricKey(testtools.TestCase):
Test that the object type can be retrieved from the SymmetricKey. Test that the object type can be retrieved from the SymmetricKey.
""" """
expected = enums.ObjectType.SYMMETRIC_KEY expected = enums.ObjectType.SYMMETRIC_KEY
key = objects.SymmetricKey( key = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
observed = key.object_type observed = key.object_type
@ -101,7 +106,7 @@ class TestSymmetricKey(testtools.TestCase):
""" """
args = ('invalid', 128, self.bytes_128a) args = ('invalid', 128, self.bytes_128a)
self.assertRaises(TypeError, objects.SymmetricKey, *args) self.assertRaises(TypeError, SymmetricKey, *args)
def test_validate_on_invalid_length(self): def test_validate_on_invalid_length(self):
""" """
@ -110,7 +115,7 @@ class TestSymmetricKey(testtools.TestCase):
""" """
args = (enums.CryptographicAlgorithm.AES, 'invalid', self.bytes_128a) args = (enums.CryptographicAlgorithm.AES, 'invalid', self.bytes_128a)
self.assertRaises(TypeError, objects.SymmetricKey, *args) self.assertRaises(TypeError, SymmetricKey, *args)
def test_validate_on_invalid_value(self): def test_validate_on_invalid_value(self):
""" """
@ -119,7 +124,7 @@ class TestSymmetricKey(testtools.TestCase):
""" """
args = (enums.CryptographicAlgorithm.AES, 128, 0) args = (enums.CryptographicAlgorithm.AES, 128, 0)
self.assertRaises(TypeError, objects.SymmetricKey, *args) self.assertRaises(TypeError, SymmetricKey, *args)
def test_validate_on_invalid_masks(self): def test_validate_on_invalid_masks(self):
""" """
@ -129,7 +134,7 @@ class TestSymmetricKey(testtools.TestCase):
args = (enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) args = (enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
kwargs = {'masks': 'invalid'} kwargs = {'masks': 'invalid'}
self.assertRaises(TypeError, objects.SymmetricKey, *args, **kwargs) self.assertRaises(TypeError, SymmetricKey, *args, **kwargs)
def test_validate_on_invalid_mask(self): def test_validate_on_invalid_mask(self):
""" """
@ -139,7 +144,7 @@ class TestSymmetricKey(testtools.TestCase):
args = (enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) args = (enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
kwargs = {'masks': ['invalid']} kwargs = {'masks': ['invalid']}
self.assertRaises(TypeError, objects.SymmetricKey, *args, **kwargs) self.assertRaises(TypeError, SymmetricKey, *args, **kwargs)
def test_validate_on_invalid_name(self): def test_validate_on_invalid_name(self):
""" """
@ -149,7 +154,7 @@ class TestSymmetricKey(testtools.TestCase):
args = (enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) args = (enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
kwargs = {'name': 0} kwargs = {'name': 0}
self.assertRaises(TypeError, objects.SymmetricKey, *args, **kwargs) self.assertRaises(TypeError, SymmetricKey, *args, **kwargs)
def test_validate_on_invalid_length_value(self): def test_validate_on_invalid_length_value(self):
""" """
@ -158,7 +163,7 @@ class TestSymmetricKey(testtools.TestCase):
""" """
args = (enums.CryptographicAlgorithm.AES, 256, self.bytes_128a) args = (enums.CryptographicAlgorithm.AES, 256, self.bytes_128a)
self.assertRaises(ValueError, objects.SymmetricKey, *args) self.assertRaises(ValueError, SymmetricKey, *args)
def test_validate_on_invalid_value_length(self): def test_validate_on_invalid_value_length(self):
""" """
@ -167,13 +172,13 @@ class TestSymmetricKey(testtools.TestCase):
""" """
args = (enums.CryptographicAlgorithm.AES, 128, self.bytes_256a) args = (enums.CryptographicAlgorithm.AES, 128, self.bytes_256a)
self.assertRaises(ValueError, objects.SymmetricKey, *args) self.assertRaises(ValueError, SymmetricKey, *args)
def test_repr(self): def test_repr(self):
""" """
Test that repr can be applied to a SymmetricKey. Test that repr can be applied to a SymmetricKey.
""" """
key = objects.SymmetricKey( key = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
args = "algorithm={0}, length={1}, value={2}".format( args = "algorithm={0}, length={1}, value={2}".format(
@ -188,7 +193,7 @@ class TestSymmetricKey(testtools.TestCase):
""" """
Test that str can be applied to a SymmetricKey. Test that str can be applied to a SymmetricKey.
""" """
key = objects.SymmetricKey( key = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
expected = str(binascii.hexlify(self.bytes_128a)) expected = str(binascii.hexlify(self.bytes_128a))
observed = str(key) observed = str(key)
@ -200,9 +205,9 @@ class TestSymmetricKey(testtools.TestCase):
Test that the equality operator returns True when comparing two Test that the equality operator returns True when comparing two
SymmetricKey objects with the same data. SymmetricKey objects with the same data.
""" """
a = objects.SymmetricKey( a = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
b = objects.SymmetricKey( b = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
self.assertTrue(a == b) self.assertTrue(a == b)
@ -213,9 +218,9 @@ class TestSymmetricKey(testtools.TestCase):
Test that the equality operator returns False when comparing two Test that the equality operator returns False when comparing two
SymmetricKey objects with different data. SymmetricKey objects with different data.
""" """
a = objects.SymmetricKey( a = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
b = objects.SymmetricKey( b = SymmetricKey(
enums.CryptographicAlgorithm.RSA, 128, self.bytes_128a) enums.CryptographicAlgorithm.RSA, 128, self.bytes_128a)
self.assertFalse(a == b) self.assertFalse(a == b)
@ -226,9 +231,9 @@ class TestSymmetricKey(testtools.TestCase):
Test that the equality operator returns False when comparing two Test that the equality operator returns False when comparing two
SymmetricKey objects with different data. SymmetricKey objects with different data.
""" """
a = objects.SymmetricKey( a = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
b = objects.SymmetricKey( b = SymmetricKey(
enums.CryptographicAlgorithm.AES, 256, self.bytes_256a) enums.CryptographicAlgorithm.AES, 256, self.bytes_256a)
b.value = self.bytes_128a b.value = self.bytes_128a
@ -240,9 +245,9 @@ class TestSymmetricKey(testtools.TestCase):
Test that the equality operator returns False when comparing two Test that the equality operator returns False when comparing two
SymmetricKey objects with different data. SymmetricKey objects with different data.
""" """
a = objects.SymmetricKey( a = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
b = objects.SymmetricKey( b = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128b) enums.CryptographicAlgorithm.AES, 128, self.bytes_128b)
self.assertFalse(a == b) self.assertFalse(a == b)
@ -253,7 +258,7 @@ class TestSymmetricKey(testtools.TestCase):
Test that the equality operator returns False when comparing a Test that the equality operator returns False when comparing a
SymmetricKey object to a non-SymmetricKey object. SymmetricKey object to a non-SymmetricKey object.
""" """
a = objects.SymmetricKey( a = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
b = "invalid" b = "invalid"
@ -265,9 +270,9 @@ class TestSymmetricKey(testtools.TestCase):
Test that the inequality operator returns False when comparing Test that the inequality operator returns False when comparing
two SymmetricKey objects with the same internal data. two SymmetricKey objects with the same internal data.
""" """
a = objects.SymmetricKey( a = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
b = objects.SymmetricKey( b = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
self.assertFalse(a != b) self.assertFalse(a != b)
@ -278,9 +283,9 @@ class TestSymmetricKey(testtools.TestCase):
Test that the inequality operator returns True when comparing two Test that the inequality operator returns True when comparing two
SymmetricKey objects with different data. SymmetricKey objects with different data.
""" """
a = objects.SymmetricKey( a = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
b = objects.SymmetricKey( b = SymmetricKey(
enums.CryptographicAlgorithm.RSA, 128, self.bytes_128a) enums.CryptographicAlgorithm.RSA, 128, self.bytes_128a)
self.assertTrue(a != b) self.assertTrue(a != b)
@ -291,9 +296,9 @@ class TestSymmetricKey(testtools.TestCase):
Test that the inequality operator returns True when comparing two Test that the inequality operator returns True when comparing two
SymmetricKey objects with different data. SymmetricKey objects with different data.
""" """
a = objects.SymmetricKey( a = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
b = objects.SymmetricKey( b = SymmetricKey(
enums.CryptographicAlgorithm.AES, 256, self.bytes_256a) enums.CryptographicAlgorithm.AES, 256, self.bytes_256a)
self.assertTrue(a != b) self.assertTrue(a != b)
@ -304,9 +309,9 @@ class TestSymmetricKey(testtools.TestCase):
Test that the inequality operator returns True when comparing two Test that the inequality operator returns True when comparing two
SymmetricKey objects with different data. SymmetricKey objects with different data.
""" """
a = objects.SymmetricKey( a = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
b = objects.SymmetricKey( b = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128b) enums.CryptographicAlgorithm.AES, 128, self.bytes_128b)
self.assertTrue(a != b) self.assertTrue(a != b)
@ -317,9 +322,295 @@ class TestSymmetricKey(testtools.TestCase):
Test that the equality operator returns True when comparing a Test that the equality operator returns True when comparing a
SymmetricKey object to a non-SymmetricKey object. SymmetricKey object to a non-SymmetricKey object.
""" """
a = objects.SymmetricKey( a = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a) enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
b = "invalid" b = "invalid"
self.assertTrue(a != b) self.assertTrue(a != b)
self.assertTrue(b != a) self.assertTrue(b != a)
def test_save(self):
"""
Test that the object can be saved using SQLAlchemy. This will add it to
the database, verify that no exceptions are thrown, and check that its
unique identifier was set.
"""
key = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
self.assertIsNotNone(key.unique_identifier)
def test_get(self):
"""
Test that the object can be saved and then retrieved using SQLAlchemy.
This adds is to the database and then retrieves it by ID and verifies
some of the attributes.
"""
test_name = 'bowser'
masks = [enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT]
key = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a,
masks=masks,
name=test_name)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
session = Session()
get_obj = session.query(SymmetricKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEqual(1, len(get_obj.names))
self.assertEqual([test_name], get_obj.names)
self.assertEqual(enums.ObjectType.SYMMETRIC_KEY, get_obj.object_type)
self.assertEqual(self.bytes_128a, get_obj.value)
self.assertEqual(enums.CryptographicAlgorithm.AES,
get_obj.cryptographic_algorithm)
self.assertEqual(128, get_obj.cryptographic_length)
self.assertEqual(enums.KeyFormatType.RAW, get_obj.key_format_type)
self.assertEqual(masks, get_obj.cryptographic_usage_masks)
def test_add_multiple_names(self):
"""
Test that multiple names can be added to a managed object. This
verifies a few properties. First this verifies that names can be added
using simple strings. It also verifies that the index for each
subsequent string is set accordingly. Finally this tests that the names
can be saved and retrieved from the database.
"""
expected_names = ['bowser', 'frumpy', 'big fat cat']
key = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a,
name=expected_names[0])
key.names.append(expected_names[1])
key.names.append(expected_names[2])
self.assertEquals(3, key.name_index)
expected_mo_names = list()
for i, name in enumerate(expected_names):
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
self.assertEquals(expected_mo_names, key._names)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
session = Session()
get_obj = session.query(SymmetricKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_mo_names, get_obj._names)
def test_remove_name(self):
"""
Tests that a name can be removed from the list of names. This will
verify that the list of names is correct. It will verify that updating
this object removes the name from the database.
"""
names = ['bowser', 'frumpy', 'big fat cat']
remove_index = 1
key = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a,
name=names[0])
key.names.append(names[1])
key.names.append(names[2])
key.names.pop(remove_index)
self.assertEquals(3, key.name_index)
expected_names = list()
expected_mo_names = list()
for i, name in enumerate(names):
if i != remove_index:
expected_names.append(name)
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
self.assertEquals(expected_names, key.names)
self.assertEquals(expected_mo_names, key._names)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
session = Session()
get_obj = session.query(SymmetricKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_names, get_obj.names)
self.assertEquals(expected_mo_names, get_obj._names)
def test_remove_and_add_name(self):
"""
Tests that names can be removed from the list of names and more added.
This will verify that the list of names is correct. It will verify that
updating this object removes the name from the database. It will verify
that the indices for the removed names are not reused.
"""
names = ['bowser', 'frumpy', 'big fat cat']
key = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a,
name=names[0])
key.names.append(names[1])
key.names.append(names[2])
key.names.pop()
key.names.pop()
key.names.append('dog')
self.assertEquals(4, key.name_index)
expected_names = ['bowser', 'dog']
expected_mo_names = list()
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0],
0))
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1],
3))
self.assertEquals(expected_names, key.names)
self.assertEquals(expected_mo_names, key._names)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
session = Session()
get_obj = session.query(SymmetricKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_names, get_obj.names)
self.assertEquals(expected_mo_names, get_obj._names)
def test_update_with_add_name(self):
"""
Tests that an OpaqueObject already stored in the database can be
updated. This will store an OpaqueObject in the database. It will add a
name to it in one session, and then retrieve it in another session to
verify that it has all of the correct names.
This test and the subsequent test_udpate_* methods are different than
the name tests above because these are updating objects already stored
in the database. This tests will simulate what happens when the KMIP
client calls an add attribute method.
"""
first_name = 'bowser'
key = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a,
name=first_name)
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
added_name = 'frumpy'
expected_names = [first_name, added_name]
expected_mo_names = list()
for i, name in enumerate(expected_names):
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
session = Session()
update_key = session.query(SymmetricKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
update_key.names.append(added_name)
session.commit()
session = Session()
get_obj = session.query(SymmetricKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_names, get_obj.names)
self.assertEquals(expected_mo_names, get_obj._names)
def test_update_with_remove_name(self):
"""
Tests that an OpaqueObject already stored in the database can be
updated. This will store an OpaqueObject in the database. It will
remove a name from it in one session, and then retrieve it in another
session to verify that it has all of the correct names.
"""
names = ['bowser', 'frumpy', 'big fat cat']
remove_index = 1
key = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a,
name=names[0])
key.names.append(names[1])
key.names.append(names[2])
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
expected_names = list()
expected_mo_names = list()
for i, name in enumerate(names):
if i != remove_index:
expected_names.append(name)
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
session = Session()
update_key = session.query(SymmetricKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
update_key.names.pop(remove_index)
session.commit()
session = Session()
get_obj = session.query(SymmetricKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_names, get_obj.names)
self.assertEquals(expected_mo_names, get_obj._names)
def test_update_with_remove_and_add_name(self):
"""
Tests that an OpaqueObject already stored in the database can be
updated. This will store an OpaqueObject in the database. It will
remove a name and add another one to it in one session, and then
retrieve it in another session to verify that it has all of the correct
names. This simulates multiple operation being sent for the same
object.
"""
names = ['bowser', 'frumpy', 'big fat cat']
key = SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, self.bytes_128a,
name=names[0])
key.names.append(names[1])
key.names.append(names[2])
Session = sessionmaker(bind=self.engine)
session = Session()
session.add(key)
session.commit()
session = Session()
update_key = session.query(SymmetricKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
update_key.names.pop()
update_key.names.pop()
update_key.names.append('dog')
session.commit()
expected_names = ['bowser', 'dog']
expected_mo_names = list()
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0],
0))
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1],
3))
session = Session()
get_obj = session.query(SymmetricKey).filter(
ManagedObject.unique_identifier == key.unique_identifier
).one()
session.commit()
self.assertEquals(expected_names, get_obj.names)
self.assertEquals(expected_mo_names, get_obj._names)