mirror of https://github.com/OpenKMIP/PyKMIP.git
Merge pull request #136 from OpenKMIP/feat/sqlalchemy-secret
Added SQLAlchemy Code for Secret Data
This commit is contained in:
commit
e967bb056d
|
@ -833,6 +833,15 @@ class SecretData(CryptographicObject):
|
|||
data_type: The type of the secret value.
|
||||
"""
|
||||
|
||||
__tablename__ = 'secret_data_objects'
|
||||
unique_identifier = Column('uid', Integer,
|
||||
ForeignKey('crypto_objects.uid'),
|
||||
primary_key=True)
|
||||
data_type = Column('data_type', sql.EnumType(enums.SecretDataType))
|
||||
__mapper_args__ = {
|
||||
'polymorphic_identity': enums.ObjectType.SECRET_DATA
|
||||
}
|
||||
|
||||
def __init__(self, value, data_type, masks=None, name='Secret Data'):
|
||||
"""
|
||||
Create a SecretData object.
|
||||
|
@ -921,6 +930,10 @@ class SecretData(CryptographicObject):
|
|||
return NotImplemented
|
||||
|
||||
|
||||
event.listen(SecretData._names, 'append',
|
||||
sql.attribute_append_factory("name_index"), retval=False)
|
||||
|
||||
|
||||
class OpaqueObject(ManagedObject):
|
||||
"""
|
||||
The OpaqueObject class of the simplified KMIP object hierarchy.
|
||||
|
|
|
@ -17,7 +17,10 @@ import binascii
|
|||
import testtools
|
||||
|
||||
from kmip.core import enums
|
||||
from kmip.pie import objects
|
||||
from kmip.pie.objects import ManagedObject, SecretData
|
||||
from kmip.pie import sqltypes
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
|
||||
class TestSecretData(testtools.TestCase):
|
||||
|
@ -33,6 +36,8 @@ class TestSecretData(testtools.TestCase):
|
|||
b'\x53\x65\x63\x72\x65\x74\x50\x61\x73\x73\x77\x6F\x72\x64')
|
||||
self.bytes_b = (
|
||||
b'\x53\x65\x63\x72\x65\x74\x50\x61\x73\x73\x77\x6F\x72\x65')
|
||||
self.engine = create_engine('sqlite:///:memory:', echo=True)
|
||||
sqltypes.Base.metadata.create_all(self.engine)
|
||||
|
||||
def tearDown(self):
|
||||
super(TestSecretData, self).tearDown()
|
||||
|
@ -41,7 +46,7 @@ class TestSecretData(testtools.TestCase):
|
|||
"""
|
||||
Test that a SecretData object can be instantiated.
|
||||
"""
|
||||
secret = objects.SecretData(
|
||||
secret = SecretData(
|
||||
self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
|
||||
self.assertEqual(secret.value, self.bytes_a)
|
||||
|
@ -53,7 +58,7 @@ class TestSecretData(testtools.TestCase):
|
|||
"""
|
||||
Test that a SecretData object can be instantiated with all arguments.
|
||||
"""
|
||||
key = objects.SecretData(
|
||||
key = SecretData(
|
||||
self.bytes_a,
|
||||
enums.SecretDataType.PASSWORD,
|
||||
masks=[enums.CryptographicUsageMask.VERIFY],
|
||||
|
@ -70,7 +75,7 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the object type can be retrieved from the SecretData.
|
||||
"""
|
||||
expected = enums.ObjectType.SECRET_DATA
|
||||
key = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
key = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
observed = key.object_type
|
||||
self.assertEqual(expected, observed)
|
||||
|
||||
|
@ -80,7 +85,7 @@ class TestSecretData(testtools.TestCase):
|
|||
construct a SecretData.
|
||||
"""
|
||||
args = (0, enums.SecretDataType.PASSWORD)
|
||||
self.assertRaises(TypeError, objects.SecretData, *args)
|
||||
self.assertRaises(TypeError, SecretData, *args)
|
||||
|
||||
def test_validate_on_invalid_data_type(self):
|
||||
"""
|
||||
|
@ -88,7 +93,7 @@ class TestSecretData(testtools.TestCase):
|
|||
construct a SecretData.
|
||||
"""
|
||||
args = (self.bytes_a, 'invalid')
|
||||
self.assertRaises(TypeError, objects.SecretData, *args)
|
||||
self.assertRaises(TypeError, SecretData, *args)
|
||||
|
||||
def test_validate_on_invalid_masks(self):
|
||||
"""
|
||||
|
@ -97,7 +102,7 @@ class TestSecretData(testtools.TestCase):
|
|||
"""
|
||||
args = (self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
kwargs = {'masks': 'invalid'}
|
||||
self.assertRaises(TypeError, objects.SecretData, *args, **kwargs)
|
||||
self.assertRaises(TypeError, SecretData, *args, **kwargs)
|
||||
|
||||
def test_validate_on_invalid_mask(self):
|
||||
"""
|
||||
|
@ -106,7 +111,7 @@ class TestSecretData(testtools.TestCase):
|
|||
"""
|
||||
args = (self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
kwargs = {'masks': ['invalid']}
|
||||
self.assertRaises(TypeError, objects.SecretData, *args, **kwargs)
|
||||
self.assertRaises(TypeError, SecretData, *args, **kwargs)
|
||||
|
||||
def test_validate_on_invalid_name(self):
|
||||
"""
|
||||
|
@ -115,13 +120,13 @@ class TestSecretData(testtools.TestCase):
|
|||
"""
|
||||
args = (self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
kwargs = {'name': 0}
|
||||
self.assertRaises(TypeError, objects.SecretData, *args, **kwargs)
|
||||
self.assertRaises(TypeError, SecretData, *args, **kwargs)
|
||||
|
||||
def test_repr(self):
|
||||
"""
|
||||
Test that repr can be applied to a SecretData.
|
||||
"""
|
||||
key = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
key = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
args = "value={0}, data_type={1}".format(
|
||||
binascii.hexlify(self.bytes_a), enums.SecretDataType.PASSWORD)
|
||||
expected = "SecretData({0})".format(args)
|
||||
|
@ -132,7 +137,7 @@ class TestSecretData(testtools.TestCase):
|
|||
"""
|
||||
Test that str can be applied to a SecretData.
|
||||
"""
|
||||
key = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
key = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
expected = str(binascii.hexlify(self.bytes_a))
|
||||
observed = str(key)
|
||||
self.assertEqual(expected, observed)
|
||||
|
@ -142,8 +147,8 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the equality operator returns True when comparing two
|
||||
SecretData objects with the same data.
|
||||
"""
|
||||
a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
self.assertTrue(a == b)
|
||||
self.assertTrue(b == a)
|
||||
|
||||
|
@ -152,8 +157,8 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the equality operator returns False when comparing two
|
||||
SecretData objects with different data.
|
||||
"""
|
||||
a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = objects.SecretData(self.bytes_b, enums.SecretDataType.PASSWORD)
|
||||
a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = SecretData(self.bytes_b, enums.SecretDataType.PASSWORD)
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
||||
|
@ -162,8 +167,8 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the equality operator returns False when comparing two
|
||||
SecretData objects with different data.
|
||||
"""
|
||||
a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = objects.SecretData(self.bytes_a, enums.SecretDataType.SEED)
|
||||
a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = SecretData(self.bytes_a, enums.SecretDataType.SEED)
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
||||
|
@ -172,7 +177,7 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the equality operator returns False when comparing a
|
||||
SecretData object to a non-SecretData object.
|
||||
"""
|
||||
a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = "invalid"
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
@ -182,8 +187,8 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the inequality operator returns False when comparing
|
||||
two SecretData objects with the same internal data.
|
||||
"""
|
||||
a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
self.assertFalse(a != b)
|
||||
self.assertFalse(b != a)
|
||||
|
||||
|
@ -192,8 +197,8 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the equality operator returns True when comparing two
|
||||
SecretData objects with different data.
|
||||
"""
|
||||
a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = objects.SecretData(self.bytes_b, enums.SecretDataType.PASSWORD)
|
||||
a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = SecretData(self.bytes_b, enums.SecretDataType.PASSWORD)
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
|
@ -202,8 +207,8 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the equality operator returns True when comparing two
|
||||
SecretData objects with different data.
|
||||
"""
|
||||
a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = objects.SecretData(self.bytes_a, enums.SecretDataType.SEED)
|
||||
a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = SecretData(self.bytes_a, enums.SecretDataType.SEED)
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
|
@ -212,7 +217,277 @@ class TestSecretData(testtools.TestCase):
|
|||
Test that the equality operator returns True when comparing a
|
||||
SecretData object to a non-SecretData object.
|
||||
"""
|
||||
a = objects.SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
a = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
b = "invalid"
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
def test_save(self):
|
||||
"""
|
||||
Test that the object can be saved using SQLAlchemy. This will add it to
|
||||
the database, verify that no exceptions are thrown, and check that its
|
||||
unique identifier was set.
|
||||
"""
|
||||
obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD)
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
def test_get(self):
|
||||
"""
|
||||
Test that the object can be saved and then retrieved using SQLAlchemy.
|
||||
This adds is to the database and then retrieves it by ID and verifies
|
||||
some of the attributes.
|
||||
"""
|
||||
test_name = 'bowser'
|
||||
obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD,
|
||||
name=test_name)
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEqual(1, len(get_obj.names))
|
||||
self.assertEqual([test_name], get_obj.names)
|
||||
self.assertEqual(self.bytes_a, get_obj.value)
|
||||
self.assertEqual(enums.ObjectType.SECRET_DATA, get_obj.object_type)
|
||||
self.assertEqual(enums.SecretDataType.PASSWORD, get_obj.data_type)
|
||||
|
||||
def test_add_multiple_names(self):
|
||||
"""
|
||||
Test that multiple names can be added to a managed object. This
|
||||
verifies a few properties. First this verifies that names can be added
|
||||
using simple strings. It also verifies that the index for each
|
||||
subsequent string is set accordingly. Finally this tests that the names
|
||||
can be saved and retrieved from the database.
|
||||
"""
|
||||
expected_names = ['bowser', 'frumpy', 'big fat cat']
|
||||
obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD,
|
||||
name=expected_names[0])
|
||||
obj.names.append(expected_names[1])
|
||||
obj.names.append(expected_names[2])
|
||||
self.assertEquals(3, obj.name_index)
|
||||
expected_mo_names = list()
|
||||
for i, name in enumerate(expected_names):
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
|
||||
self.assertEquals(expected_mo_names, obj._names)
|
||||
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
||||
def test_remove_name(self):
|
||||
"""
|
||||
Tests that a name can be removed from the list of names. This will
|
||||
verify that the list of names is correct. It will verify that updating
|
||||
this object removes the name from the database.
|
||||
"""
|
||||
names = ['bowser', 'frumpy', 'big fat cat']
|
||||
remove_index = 1
|
||||
obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD,
|
||||
name=names[0])
|
||||
obj.names.append(names[1])
|
||||
obj.names.append(names[2])
|
||||
obj.names.pop(remove_index)
|
||||
self.assertEquals(3, obj.name_index)
|
||||
|
||||
expected_names = list()
|
||||
expected_mo_names = list()
|
||||
for i, name in enumerate(names):
|
||||
if i != remove_index:
|
||||
expected_names.append(name)
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
|
||||
self.assertEquals(expected_names, obj.names)
|
||||
self.assertEquals(expected_mo_names, obj._names)
|
||||
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_names, get_obj.names)
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
||||
def test_remove_and_add_name(self):
|
||||
"""
|
||||
Tests that names can be removed from the list of names and more added.
|
||||
This will verify that the list of names is correct. It will verify that
|
||||
updating this object removes the name from the database. It will verify
|
||||
that the indices for the removed names are not reused.
|
||||
"""
|
||||
names = ['bowser', 'frumpy', 'big fat cat']
|
||||
obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD,
|
||||
name=names[0])
|
||||
obj.names.append(names[1])
|
||||
obj.names.append(names[2])
|
||||
obj.names.pop()
|
||||
obj.names.pop()
|
||||
obj.names.append('dog')
|
||||
self.assertEquals(4, obj.name_index)
|
||||
|
||||
expected_names = ['bowser', 'dog']
|
||||
expected_mo_names = list()
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0],
|
||||
0))
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1],
|
||||
3))
|
||||
self.assertEquals(expected_names, obj.names)
|
||||
self.assertEquals(expected_mo_names, obj._names)
|
||||
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_names, get_obj.names)
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
||||
def test_update_with_add_name(self):
|
||||
"""
|
||||
Tests that a SecretData already stored in the database can be
|
||||
updated. This will store a SecretData in the database. It will add a
|
||||
name to it in one session, and then retrieve it in another session to
|
||||
verify that it has all of the correct names.
|
||||
|
||||
This test and the subsequent test_udpate_* methods are different than
|
||||
the name tests above because these are updating objects already stored
|
||||
in the database. This tests will simulate what happens when the KMIP
|
||||
client calls an add attribute method.
|
||||
"""
|
||||
first_name = 'bowser'
|
||||
obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD,
|
||||
name=first_name)
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
added_name = 'frumpy'
|
||||
expected_names = [first_name, added_name]
|
||||
expected_mo_names = list()
|
||||
for i, name in enumerate(expected_names):
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
|
||||
|
||||
session = Session()
|
||||
update_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
update_obj.names.append(added_name)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_names, get_obj.names)
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
||||
def test_update_with_remove_name(self):
|
||||
"""
|
||||
Tests that a SecretData already stored in the database can be
|
||||
updated. This will store a SecretData in the database. It will
|
||||
remove a name from it in one session, and then retrieve it in another
|
||||
session to verify that it has all of the correct names.
|
||||
"""
|
||||
names = ['bowser', 'frumpy', 'big fat cat']
|
||||
remove_index = 1
|
||||
obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD,
|
||||
name=names[0])
|
||||
obj.names.append(names[1])
|
||||
obj.names.append(names[2])
|
||||
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
expected_names = list()
|
||||
expected_mo_names = list()
|
||||
for i, name in enumerate(names):
|
||||
if i != remove_index:
|
||||
expected_names.append(name)
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
|
||||
|
||||
session = Session()
|
||||
update_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
update_obj.names.pop(remove_index)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_names, get_obj.names)
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
||||
def test_update_with_remove_and_add_name(self):
|
||||
"""
|
||||
Tests that a SecretData already stored in the database can be
|
||||
updated. This will store a SecretData in the database. It will
|
||||
remove a name and add another one to it in one session, and then
|
||||
retrieve it in another session to verify that it has all of the correct
|
||||
names. This simulates multiple operation being sent for the same
|
||||
object.
|
||||
"""
|
||||
names = ['bowser', 'frumpy', 'big fat cat']
|
||||
obj = SecretData(self.bytes_a, enums.SecretDataType.PASSWORD,
|
||||
name=names[0])
|
||||
obj.names.append(names[1])
|
||||
obj.names.append(names[2])
|
||||
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
update_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
update_obj.names.pop()
|
||||
update_obj.names.pop()
|
||||
update_obj.names.append('dog')
|
||||
session.commit()
|
||||
|
||||
expected_names = ['bowser', 'dog']
|
||||
expected_mo_names = list()
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0],
|
||||
0))
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1],
|
||||
3))
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(SecretData).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_names, get_obj.names)
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
|
Loading…
Reference in New Issue