mirror of https://github.com/OpenKMIP/PyKMIP.git
Added SQLAlchemy Code for Secret Data
The SecretData class has the SQLAlchemy code in it to allow it be stored in the database using the library.
This commit is contained in:
parent
c21f07634b
commit
8d6575fa36
|
@ -807,6 +807,15 @@ class SecretData(CryptographicObject):
|
|||
data_type: The type of the secret value.
|
||||
"""
|
||||
|
||||
__tablename__ = 'secret_data_objects'
|
||||
unique_identifier = Column('uid', Integer,
|
||||
ForeignKey('crypto_objects.uid'),
|
||||
primary_key=True)
|
||||
data_type = Column('data_type', sql.EnumType(enums.SecretDataType))
|
||||
__mapper_args__ = {
|
||||
'polymorphic_identity': enums.ObjectType.SECRET_DATA
|
||||
}
|
||||
|
||||
def __init__(self, value, data_type, masks=None, name='Secret Data'):
|
||||
"""
|
||||
Create a SecretData object.
|
||||
|
@ -895,6 +904,10 @@ class SecretData(CryptographicObject):
|
|||
return NotImplemented
|
||||
|
||||
|
||||
event.listen(SecretData._names, 'append',
|
||||
sql.attribute_append_factory("name_index"), retval=False)
|
||||
|
||||
|
||||
class OpaqueObject(ManagedObject):
|
||||
"""
|
||||
The OpaqueObject class of the simplified KMIP object hierarchy.
|
||||
|
|
|
@ -17,7 +17,10 @@ import binascii
|
|||
import testtools
|
||||
|
||||
from kmip.core import enums
|
||||
from kmip.pie import objects
|
||||
from kmip.pie.objects import ManagedObject, SecretData
|
||||
from kmip.pie import sqltypes
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
|
||||
class TestSecretData(testtools.TestCase):
|
||||
|
@ -33,6 +36,8 @@ class TestSecretData(testtools.TestCase):
|
|||
b'\x53\x65\x63\x72\x65\x74\x50\x61\x73\x73\x77\x6F\x72\x64')
|
||||
self.bytes_b = (
|
||||
b'\x53\x65\x63\x72\x65\x74\x50\x61\x73\x73\x77\x6F\x72\x65')
|
||||
self.engine = create_engine('sqlite:///:memory:', echo=True)
|
||||
sqltypes.Base.metadata.create_all(self.engine)
|
||||
|
||||
def tearDown(self):
|
||||
super(TestSecretData, self).tearDown()
|
||||
|
@ -41,7 +46,7 @@ class TestSecretData(testtools.TestCase):
|
|||
"""
|
||||
Test that a SecretData object can be instantiated.
|
||||
"""
|
||||
secret = objects.SecretData(
|
||||
secret = SecretData(
|
||||
self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
|
||||
self.assertEqual(secret.value, self.bytes_a)
|
||||
|
@ -53,7 +58,7 @@ class TestSecretData(testtools.TestCase):
|
|||
"""
|
||||
Test that a SecretData object can be instantiated with all arguments.
|
||||
"""
|
||||
key = objects.SecretData(
|
||||
key = SecretData(
|
||||
self.bytes_a,
|
||||
enums.SecretDataType.PASSWORD,
|
||||
masks=[enums.CryptographicUsageMask.VERIFY],
|
||||
|
@ -70,7 +75,7 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the object type can be retrieved from the SecretData.
|
||||
"""
|
||||
expected = enums.ObjectType.SECRET_DATA
|
||||
key = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
key = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
observed = key.object_type
|
||||
self.assertEqual(expected, observed)
|
||||
|
||||
|
@ -80,7 +85,7 @@ class TestSecretData(testtools.TestCase):
|
|||
construct a SecretData.
|
||||
"""
|
||||
args = (0, enums.SecretDataType.PASSWORD)
|
||||
self.assertRaises(TypeError, objects.SecretData, *args)
|
||||
self.assertRaises(TypeError, SecretData, *args)
|
||||
|
||||
def test_validate_on_invalid_data_type(self):
|
||||
"""
|
||||
|
@ -88,7 +93,7 @@ class TestSecretData(testtools.TestCase):
|
|||
construct a SecretData.
|
||||
"""
|
||||
args = (self.bytes_a, 'invalid')
|
||||
self.assertRaises(TypeError, objects.SecretData, *args)
|
||||
self.assertRaises(TypeError, SecretData, *args)
|
||||
|
||||
def test_validate_on_invalid_masks(self):
|
||||
"""
|
||||
|
@ -97,7 +102,7 @@ class TestSecretData(testtools.TestCase):
|
|||
"""
|
||||
args = (self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
kwargs = {'masks': 'invalid'}
|
||||
self.assertRaises(TypeError, objects.SecretData, *args, **kwargs)
|
||||
self.assertRaises(TypeError, SecretData, *args, **kwargs)
|
||||
|
||||
def test_validate_on_invalid_mask(self):
|
||||
"""
|
||||
|
@ -106,7 +111,7 @@ class TestSecretData(testtools.TestCase):
|
|||
"""
|
||||
args = (self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
kwargs = {'masks': ['invalid']}
|
||||
self.assertRaises(TypeError, objects.SecretData, *args, **kwargs)
|
||||
self.assertRaises(TypeError, SecretData, *args, **kwargs)
|
||||
|
||||
def test_validate_on_invalid_name(self):
|
||||
"""
|
||||
|
@ -115,13 +120,13 @@ class TestSecretData(testtools.TestCase):
|
|||
"""
|
||||
args = (self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
kwargs = {'name': 0}
|
||||
self.assertRaises(TypeError, objects.SecretData, *args, **kwargs)
|
||||
self.assertRaises(TypeError, SecretData, *args, **kwargs)
|
||||
|
||||
def test_repr(self):
|
||||
"""
|
||||
Test that repr can be applied to a SecretData.
|
||||
"""
|
||||
key = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
key = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
args = "value={0}, data_type={1}".format(
|
||||
binascii.hexlify(self.bytes_a), enums.SecretDataType.PASSWORD)
|
||||
expected = "SecretData({0})".format(args)
|
||||
|
@ -132,7 +137,7 @@ class TestSecretData(testtools.TestCase):
|
|||
"""
|
||||
Test that str can be applied to a SecretData.
|
||||
"""
|
||||
key = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
key = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
expected = str(binascii.hexlify(self.bytes_a))
|
||||
observed = str(key)
|
||||
self.assertEqual(expected, observed)
|
||||
|
@ -142,8 +147,8 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the equality operator returns True when comparing two
|
||||
SecretData objects with the same data.
|
||||
"""
|
||||
a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
self.assertTrue(a == b)
|
||||
self.assertTrue(b == a)
|
||||
|
||||
|
@ -152,8 +157,8 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the equality operator returns False when comparing two
|
||||
SecretData objects with different data.
|
||||
"""
|
||||
a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = objects.SecretData(self.bytes_b, enums.SecretDataType.PASSWORD)
|
||||
a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = SecretData(self.bytes_b, enums.SecretDataType.PASSWORD)
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
||||
|
@ -162,8 +167,8 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the equality operator returns False when comparing two
|
||||
SecretData objects with different data.
|
||||
"""
|
||||
a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = objects.SecretData(self.bytes_a, enums.SecretDataType.SEED)
|
||||
a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = SecretData(self.bytes_a, enums.SecretDataType.SEED)
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
||||
|
@ -172,7 +177,7 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the equality operator returns False when comparing a
|
||||
SecretData object to a non-SecretData object.
|
||||
"""
|
||||
a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = "invalid"
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
@ -182,8 +187,8 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the inequality operator returns False when comparing
|
||||
two SecretData objects with the same internal data.
|
||||
"""
|
||||
a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
self.assertFalse(a != b)
|
||||
self.assertFalse(b != a)
|
||||
|
||||
|
@ -192,8 +197,8 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the equality operator returns True when comparing two
|
||||
SecretData objects with different data.
|
||||
"""
|
||||
a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = objects.SecretData(self.bytes_b, enums.SecretDataType.PASSWORD)
|
||||
a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = SecretData(self.bytes_b, enums.SecretDataType.PASSWORD)
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
|
@ -202,8 +207,8 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the equality operator returns True when comparing two
|
||||
SecretData objects with different data.
|
||||
"""
|
||||
a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = objects.SecretData(self.bytes_a, enums.SecretDataType.SEED)
|
||||
a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = SecretData(self.bytes_a, enums.SecretDataType.SEED)
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
|
@ -212,7 +217,277 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the equality operator returns True when comparing a
|
||||
SecretData object to a non-SecretData object.
|
||||
"""
|
||||
a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = "invalid"
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
def test_save(self):
|
||||
"""
|
||||
Test that the object can be saved using SQLAlchemy. This will add it to
|
||||
the database, verify that no exceptions are thrown, and check that its
|
||||
unique identifier was set.
|
||||
"""
|
||||
obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
def test_get(self):
|
||||
"""
|
||||
Test that the object can be saved and then retrieved using SQLAlchemy.
|
||||
This adds is to the database and then retrieves it by ID and verifies
|
||||
some of the attributes.
|
||||
"""
|
||||
test_name = 'bowser'
|
||||
obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD,
|
||||
name=test_name)
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEqual(1, len(get_obj.names))
|
||||
self.assertEqual([test_name], get_obj.names)
|
||||
self.assertEqual(self.bytes_a, get_obj.value)
|
||||
self.assertEqual(enums.ObjectType.SECRET_DATA, get_obj.object_type)
|
||||
self.assertEqual(enums.SecretDataType.PASSWORD, get_obj.data_type)
|
||||
|
||||
def test_add_multiple_names(self):
|
||||
"""
|
||||
Test that multiple names can be added to a managed object. This
|
||||
verifies a few properties. First this verifies that names can be added
|
||||
using simple strings. It also verifies that the index for each
|
||||
subsequent string is set accordingly. Finally this tests that the names
|
||||
can be saved and retrieved from the database.
|
||||
"""
|
||||
expected_names = ['bowser', 'frumpy', 'big fat cat']
|
||||
obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD,
|
||||
name=expected_names[0])
|
||||
obj.names.append(expected_names[1])
|
||||
obj.names.append(expected_names[2])
|
||||
self.assertEquals(3, obj.name_index)
|
||||
expected_mo_names = list()
|
||||
for i, name in enumerate(expected_names):
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
|
||||
self.assertEquals(expected_mo_names, obj._names)
|
||||
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
||||
def test_remove_name(self):
|
||||
"""
|
||||
Tests that a name can be removed from the list of names. This will
|
||||
verify that the list of names is correct. It will verify that updating
|
||||
this object removes the name from the database.
|
||||
"""
|
||||
names = ['bowser', 'frumpy', 'big fat cat']
|
||||
remove_index = 1
|
||||
obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD,
|
||||
name=names[0])
|
||||
obj.names.append(names[1])
|
||||
obj.names.append(names[2])
|
||||
obj.names.pop(remove_index)
|
||||
self.assertEquals(3, obj.name_index)
|
||||
|
||||
expected_names = list()
|
||||
expected_mo_names = list()
|
||||
for i, name in enumerate(names):
|
||||
if i != remove_index:
|
||||
expected_names.append(name)
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
|
||||
self.assertEquals(expected_names, obj.names)
|
||||
self.assertEquals(expected_mo_names, obj._names)
|
||||
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_names, get_obj.names)
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
||||
def test_remove_and_add_name(self):
|
||||
"""
|
||||
Tests that names can be removed from the list of names and more added.
|
||||
This will verify that the list of names is correct. It will verify that
|
||||
updating this object removes the name from the database. It will verify
|
||||
that the indices for the removed names are not reused.
|
||||
"""
|
||||
names = ['bowser', 'frumpy', 'big fat cat']
|
||||
obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD,
|
||||
name=names[0])
|
||||
obj.names.append(names[1])
|
||||
obj.names.append(names[2])
|
||||
obj.names.pop()
|
||||
obj.names.pop()
|
||||
obj.names.append('dog')
|
||||
self.assertEquals(4, obj.name_index)
|
||||
|
||||
expected_names = ['bowser', 'dog']
|
||||
expected_mo_names = list()
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0],
|
||||
0))
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1],
|
||||
3))
|
||||
self.assertEquals(expected_names, obj.names)
|
||||
self.assertEquals(expected_mo_names, obj._names)
|
||||
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_names, get_obj.names)
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
||||
def test_update_with_add_name(self):
|
||||
"""
|
||||
Tests that a SecretData already stored in the database can be
|
||||
updated. This will store a SecretData in the database. It will add a
|
||||
name to it in one session, and then retrieve it in another session to
|
||||
verify that it has all of the correct names.
|
||||
|
||||
This test and the subsequent test_udpate_* methods are different than
|
||||
the name tests above because these are updating objects already stored
|
||||
in the database. This tests will simulate what happens when the KMIP
|
||||
client calls an add attribute method.
|
||||
"""
|
||||
first_name = 'bowser'
|
||||
obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD,
|
||||
name=first_name)
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
added_name = 'frumpy'
|
||||
expected_names = [first_name, added_name]
|
||||
expected_mo_names = list()
|
||||
for i, name in enumerate(expected_names):
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
|
||||
|
||||
session = Session()
|
||||
update_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
update_obj.names.append(added_name)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_names, get_obj.names)
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
||||
def test_update_with_remove_name(self):
|
||||
"""
|
||||
Tests that a SecretData already stored in the database can be
|
||||
updated. This will store a SecretData in the database. It will
|
||||
remove a name from it in one session, and then retrieve it in another
|
||||
session to verify that it has all of the correct names.
|
||||
"""
|
||||
names = ['bowser', 'frumpy', 'big fat cat']
|
||||
remove_index = 1
|
||||
obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD,
|
||||
name=names[0])
|
||||
obj.names.append(names[1])
|
||||
obj.names.append(names[2])
|
||||
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
expected_names = list()
|
||||
expected_mo_names = list()
|
||||
for i, name in enumerate(names):
|
||||
if i != remove_index:
|
||||
expected_names.append(name)
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
|
||||
|
||||
session = Session()
|
||||
update_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
update_obj.names.pop(remove_index)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_names, get_obj.names)
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
||||
def test_update_with_remove_and_add_name(self):
|
||||
"""
|
||||
Tests that a SecretData already stored in the database can be
|
||||
updated. This will store a SecretData in the database. It will
|
||||
remove a name and add another one to it in one session, and then
|
||||
retrieve it in another session to verify that it has all of the correct
|
||||
names. This simulates multiple operation being sent for the same
|
||||
object.
|
||||
"""
|
||||
names = ['bowser', 'frumpy', 'big fat cat']
|
||||
obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD,
|
||||
name=names[0])
|
||||
obj.names.append(names[1])
|
||||
obj.names.append(names[2])
|
||||
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
update_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
update_obj.names.pop()
|
||||
update_obj.names.pop()
|
||||
update_obj.names.append('dog')
|
||||
session.commit()
|
||||
|
||||
expected_names = ['bowser', 'dog']
|
||||
expected_mo_names = list()
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0],
|
||||
0))
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1],
|
||||
3))
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_names, get_obj.names)
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
|
Loading…
Reference in New Issue