mirror of https://github.com/OpenKMIP/PyKMIP.git
Added SQLAlchemy Code for X509 Certificates
The code for persisting X509 certificates in a database has been added along with the corresponding unit tests.
This commit is contained in:
parent
3a4de2121d
commit
043553c0e0
|
@ -686,6 +686,17 @@ class Certificate(CryptographicObject):
|
||||||
names: The list of string names of the Certificate.
|
names: The list of string names of the Certificate.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
__tablename__ = 'certificates'
|
||||||
|
unique_identifier = Column('uid', Integer,
|
||||||
|
ForeignKey('crypto_objects.uid'),
|
||||||
|
primary_key=True)
|
||||||
|
certificate_type = Column(
|
||||||
|
'certificate_type', sql.EnumType(enums.CertificateTypeEnum))
|
||||||
|
|
||||||
|
__mapper_args__ = {
|
||||||
|
'polymorphic_identity': 'Certificate'
|
||||||
|
}
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def __init__(self, certificate_type, value, masks=None,
|
def __init__(self, certificate_type, value, masks=None,
|
||||||
name='Certificate'):
|
name='Certificate'):
|
||||||
|
@ -774,6 +785,15 @@ class X509Certificate(Certificate):
|
||||||
names: The list of string names of the Certificate.
|
names: The list of string names of the Certificate.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
__tablename__ = 'x509_certificates'
|
||||||
|
unique_identifier = Column('uid', Integer,
|
||||||
|
ForeignKey('certificates.uid'),
|
||||||
|
primary_key=True)
|
||||||
|
|
||||||
|
__mapper_args__ = {
|
||||||
|
'polymorphic_identity': 'Certificate'
|
||||||
|
}
|
||||||
|
|
||||||
def __init__(self, value, masks=None, name='X.509 Certificate'):
|
def __init__(self, value, masks=None, name='X.509 Certificate'):
|
||||||
"""
|
"""
|
||||||
Create an X509Certificate.
|
Create an X509Certificate.
|
||||||
|
@ -820,6 +840,10 @@ class X509Certificate(Certificate):
|
||||||
return NotImplemented
|
return NotImplemented
|
||||||
|
|
||||||
|
|
||||||
|
event.listen(X509Certificate._names, 'append',
|
||||||
|
sql.attribute_append_factory("name_index"), retval=False)
|
||||||
|
|
||||||
|
|
||||||
class SecretData(CryptographicObject):
|
class SecretData(CryptographicObject):
|
||||||
"""
|
"""
|
||||||
The SecretData class of the simplified KMIP object hierarchy.
|
The SecretData class of the simplified KMIP object hierarchy.
|
||||||
|
|
|
@ -17,7 +17,10 @@ import binascii
|
||||||
import testtools
|
import testtools
|
||||||
|
|
||||||
from kmip.core import enums
|
from kmip.core import enums
|
||||||
from kmip.pie import objects
|
from kmip.pie import sqltypes
|
||||||
|
from kmip.pie.objects import ManagedObject, X509Certificate
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
from sqlalchemy.orm import sessionmaker
|
||||||
|
|
||||||
|
|
||||||
class TestX509Certificate(testtools.TestCase):
|
class TestX509Certificate(testtools.TestCase):
|
||||||
|
@ -133,6 +136,8 @@ class TestX509Certificate(testtools.TestCase):
|
||||||
b'\x6B\xEC\xB0\xF2\x70\x55\xB1\x03\xAF\x3B\x66\x75\xD1\x23\xCD\x3B'
|
b'\x6B\xEC\xB0\xF2\x70\x55\xB1\x03\xAF\x3B\x66\x75\xD1\x23\xCD\x3B'
|
||||||
b'\x71\x79\xA4\x6C\x77\xC7\x3A\xE0\x0F\xFD\xEF\xA9\xB1\x25\xDA\x07'
|
b'\x71\x79\xA4\x6C\x77\xC7\x3A\xE0\x0F\xFD\xEF\xA9\xB1\x25\xDA\x07'
|
||||||
b'\x1E\xAD\x10\xD8\x5E\xAD\x0D\x0D\x44\x1F')
|
b'\x1E\xAD\x10\xD8\x5E\xAD\x0D\x0D\x44\x1F')
|
||||||
|
self.engine = create_engine('sqlite:///:memory:', echo=True)
|
||||||
|
sqltypes.Base.metadata.create_all(self.engine)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
super(TestX509Certificate, self).tearDown()
|
super(TestX509Certificate, self).tearDown()
|
||||||
|
@ -141,7 +146,7 @@ class TestX509Certificate(testtools.TestCase):
|
||||||
"""
|
"""
|
||||||
Test that an X509Certificate object can be instantiated.
|
Test that an X509Certificate object can be instantiated.
|
||||||
"""
|
"""
|
||||||
certificate = objects.X509Certificate(self.bytes_a)
|
certificate = X509Certificate(self.bytes_a)
|
||||||
|
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
certificate.certificate_type, enums.CertificateTypeEnum.X_509)
|
certificate.certificate_type, enums.CertificateTypeEnum.X_509)
|
||||||
|
@ -154,7 +159,7 @@ class TestX509Certificate(testtools.TestCase):
|
||||||
Test that an X509Certificate object can be instantiated with all
|
Test that an X509Certificate object can be instantiated with all
|
||||||
arguments.
|
arguments.
|
||||||
"""
|
"""
|
||||||
cert = objects.X509Certificate(
|
cert = X509Certificate(
|
||||||
self.bytes_a,
|
self.bytes_a,
|
||||||
masks=[enums.CryptographicUsageMask.ENCRYPT,
|
masks=[enums.CryptographicUsageMask.ENCRYPT,
|
||||||
enums.CryptographicUsageMask.VERIFY],
|
enums.CryptographicUsageMask.VERIFY],
|
||||||
|
@ -173,7 +178,7 @@ class TestX509Certificate(testtools.TestCase):
|
||||||
Test that the object type can be retrieved from the X509Certificate.
|
Test that the object type can be retrieved from the X509Certificate.
|
||||||
"""
|
"""
|
||||||
expected = enums.ObjectType.CERTIFICATE
|
expected = enums.ObjectType.CERTIFICATE
|
||||||
cert = objects.X509Certificate(self.bytes_a)
|
cert = X509Certificate(self.bytes_a)
|
||||||
observed = cert.object_type
|
observed = cert.object_type
|
||||||
self.assertEqual(expected, observed)
|
self.assertEqual(expected, observed)
|
||||||
|
|
||||||
|
@ -183,7 +188,7 @@ class TestX509Certificate(testtools.TestCase):
|
||||||
to construct a X509Certificate.
|
to construct a X509Certificate.
|
||||||
"""
|
"""
|
||||||
args = (0, )
|
args = (0, )
|
||||||
self.assertRaises(TypeError, objects.X509Certificate, *args)
|
self.assertRaises(TypeError, X509Certificate, *args)
|
||||||
|
|
||||||
def test_validate_on_invalid_masks(self):
|
def test_validate_on_invalid_masks(self):
|
||||||
"""
|
"""
|
||||||
|
@ -192,7 +197,7 @@ class TestX509Certificate(testtools.TestCase):
|
||||||
"""
|
"""
|
||||||
args = (self.bytes_a, )
|
args = (self.bytes_a, )
|
||||||
kwargs = {'masks': 'invalid'}
|
kwargs = {'masks': 'invalid'}
|
||||||
self.assertRaises(TypeError, objects.X509Certificate, *args, **kwargs)
|
self.assertRaises(TypeError, X509Certificate, *args, **kwargs)
|
||||||
|
|
||||||
def test_validate_on_invalid_mask(self):
|
def test_validate_on_invalid_mask(self):
|
||||||
"""
|
"""
|
||||||
|
@ -201,7 +206,7 @@ class TestX509Certificate(testtools.TestCase):
|
||||||
"""
|
"""
|
||||||
args = (self.bytes_a, )
|
args = (self.bytes_a, )
|
||||||
kwargs = {'masks': ['invalid']}
|
kwargs = {'masks': ['invalid']}
|
||||||
self.assertRaises(TypeError, objects.X509Certificate, *args, **kwargs)
|
self.assertRaises(TypeError, X509Certificate, *args, **kwargs)
|
||||||
|
|
||||||
def test_validate_on_invalid_name(self):
|
def test_validate_on_invalid_name(self):
|
||||||
"""
|
"""
|
||||||
|
@ -210,13 +215,13 @@ class TestX509Certificate(testtools.TestCase):
|
||||||
"""
|
"""
|
||||||
args = (self.bytes_a, )
|
args = (self.bytes_a, )
|
||||||
kwargs = {'name': 0}
|
kwargs = {'name': 0}
|
||||||
self.assertRaises(TypeError, objects.X509Certificate, *args, **kwargs)
|
self.assertRaises(TypeError, X509Certificate, *args, **kwargs)
|
||||||
|
|
||||||
def test_repr(self):
|
def test_repr(self):
|
||||||
"""
|
"""
|
||||||
Test that repr can be applied to a X509Certificate.
|
Test that repr can be applied to a X509Certificate.
|
||||||
"""
|
"""
|
||||||
cert = objects.X509Certificate(self.bytes_a)
|
cert = X509Certificate(self.bytes_a)
|
||||||
args = "certificate_type={0}, value={1}".format(
|
args = "certificate_type={0}, value={1}".format(
|
||||||
enums.CertificateTypeEnum.X_509, binascii.hexlify(self.bytes_a))
|
enums.CertificateTypeEnum.X_509, binascii.hexlify(self.bytes_a))
|
||||||
expected = "X509Certificate({0})".format(args)
|
expected = "X509Certificate({0})".format(args)
|
||||||
|
@ -227,7 +232,7 @@ class TestX509Certificate(testtools.TestCase):
|
||||||
"""
|
"""
|
||||||
Test that str can be applied to a X509Certificate.
|
Test that str can be applied to a X509Certificate.
|
||||||
"""
|
"""
|
||||||
cert = objects.X509Certificate(self.bytes_a)
|
cert = X509Certificate(self.bytes_a)
|
||||||
expected = str(binascii.hexlify(self.bytes_a))
|
expected = str(binascii.hexlify(self.bytes_a))
|
||||||
observed = str(cert)
|
observed = str(cert)
|
||||||
self.assertEqual(expected, observed)
|
self.assertEqual(expected, observed)
|
||||||
|
@ -237,8 +242,8 @@ class TestX509Certificate(testtools.TestCase):
|
||||||
Test that the equality operator returns True when comparing two
|
Test that the equality operator returns True when comparing two
|
||||||
X509Certificate objects with the same data.
|
X509Certificate objects with the same data.
|
||||||
"""
|
"""
|
||||||
a = objects.X509Certificate(self.bytes_a)
|
a = X509Certificate(self.bytes_a)
|
||||||
b = objects.X509Certificate(self.bytes_a)
|
b = X509Certificate(self.bytes_a)
|
||||||
self.assertTrue(a == b)
|
self.assertTrue(a == b)
|
||||||
self.assertTrue(b == a)
|
self.assertTrue(b == a)
|
||||||
|
|
||||||
|
@ -247,8 +252,8 @@ class TestX509Certificate(testtools.TestCase):
|
||||||
Test that the equality operator returns False when comparing two
|
Test that the equality operator returns False when comparing two
|
||||||
X509Certificate objects with different data.
|
X509Certificate objects with different data.
|
||||||
"""
|
"""
|
||||||
a = objects.X509Certificate(self.bytes_a)
|
a = X509Certificate(self.bytes_a)
|
||||||
b = objects.X509Certificate(self.bytes_b)
|
b = X509Certificate(self.bytes_b)
|
||||||
self.assertFalse(a == b)
|
self.assertFalse(a == b)
|
||||||
self.assertFalse(b == a)
|
self.assertFalse(b == a)
|
||||||
|
|
||||||
|
@ -257,7 +262,7 @@ class TestX509Certificate(testtools.TestCase):
|
||||||
Test that the equality operator returns False when comparing a
|
Test that the equality operator returns False when comparing a
|
||||||
X509Certificate object to a non-PrivateKey object.
|
X509Certificate object to a non-PrivateKey object.
|
||||||
"""
|
"""
|
||||||
a = objects.X509Certificate(self.bytes_a)
|
a = X509Certificate(self.bytes_a)
|
||||||
b = "invalid"
|
b = "invalid"
|
||||||
self.assertFalse(a == b)
|
self.assertFalse(a == b)
|
||||||
self.assertFalse(b == a)
|
self.assertFalse(b == a)
|
||||||
|
@ -267,8 +272,8 @@ class TestX509Certificate(testtools.TestCase):
|
||||||
Test that the inequality operator returns False when comparing
|
Test that the inequality operator returns False when comparing
|
||||||
two X509Certificate objects with the same internal data.
|
two X509Certificate objects with the same internal data.
|
||||||
"""
|
"""
|
||||||
a = objects.X509Certificate(self.bytes_a)
|
a = X509Certificate(self.bytes_a)
|
||||||
b = objects.X509Certificate(self.bytes_a)
|
b = X509Certificate(self.bytes_a)
|
||||||
self.assertFalse(a != b)
|
self.assertFalse(a != b)
|
||||||
self.assertFalse(b != a)
|
self.assertFalse(b != a)
|
||||||
|
|
||||||
|
@ -277,8 +282,8 @@ class TestX509Certificate(testtools.TestCase):
|
||||||
Test that the equality operator returns True when comparing two
|
Test that the equality operator returns True when comparing two
|
||||||
X509Certificate objects with different data.
|
X509Certificate objects with different data.
|
||||||
"""
|
"""
|
||||||
a = objects.X509Certificate(self.bytes_a)
|
a = X509Certificate(self.bytes_a)
|
||||||
b = objects.X509Certificate(self.bytes_b)
|
b = X509Certificate(self.bytes_b)
|
||||||
self.assertTrue(a != b)
|
self.assertTrue(a != b)
|
||||||
self.assertTrue(b != a)
|
self.assertTrue(b != a)
|
||||||
|
|
||||||
|
@ -287,7 +292,273 @@ class TestX509Certificate(testtools.TestCase):
|
||||||
Test that the equality operator returns True when comparing a
|
Test that the equality operator returns True when comparing a
|
||||||
X509Certificate object to a non-PrivateKey object.
|
X509Certificate object to a non-PrivateKey object.
|
||||||
"""
|
"""
|
||||||
a = objects.X509Certificate(self.bytes_a)
|
a = X509Certificate(self.bytes_a)
|
||||||
b = "invalid"
|
b = "invalid"
|
||||||
self.assertTrue(a != b)
|
self.assertTrue(a != b)
|
||||||
self.assertTrue(b != a)
|
self.assertTrue(b != a)
|
||||||
|
|
||||||
|
def test_save(self):
|
||||||
|
"""
|
||||||
|
Test that the object can be saved using SQLAlchemy. This will add it to
|
||||||
|
the database, verify that no exceptions are thrown, and check that its
|
||||||
|
unique identifier was set.
|
||||||
|
"""
|
||||||
|
cert = X509Certificate(self.bytes_a)
|
||||||
|
Session = sessionmaker(bind=self.engine)
|
||||||
|
session = Session()
|
||||||
|
session.add(cert)
|
||||||
|
session.commit()
|
||||||
|
self.assertIsNotNone(cert.unique_identifier)
|
||||||
|
|
||||||
|
def test_get(self):
|
||||||
|
"""
|
||||||
|
Test that the object can be saved and then retrieved using SQLAlchemy.
|
||||||
|
This adds is to the database and then retrieves it by ID and verifies
|
||||||
|
some of the attributes.
|
||||||
|
"""
|
||||||
|
test_name = 'bowser'
|
||||||
|
masks = [enums.CryptographicUsageMask.ENCRYPT,
|
||||||
|
enums.CryptographicUsageMask.WRAP_KEY]
|
||||||
|
cert = X509Certificate(self.bytes_a, masks=masks, name=test_name)
|
||||||
|
Session = sessionmaker(bind=self.engine)
|
||||||
|
session = Session()
|
||||||
|
session.add(cert)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
session = Session()
|
||||||
|
get_obj = session.query(X509Certificate).filter(
|
||||||
|
ManagedObject.unique_identifier == cert.unique_identifier
|
||||||
|
).one()
|
||||||
|
session.commit()
|
||||||
|
self.assertEqual(1, len(get_obj.names))
|
||||||
|
self.assertEqual([test_name], get_obj.names)
|
||||||
|
self.assertEqual(enums.ObjectType.CERTIFICATE, get_obj.object_type)
|
||||||
|
self.assertEqual(self.bytes_a, get_obj.value)
|
||||||
|
self.assertEqual(masks, get_obj.cryptographic_usage_masks)
|
||||||
|
|
||||||
|
def test_add_multiple_names(self):
|
||||||
|
"""
|
||||||
|
Test that multiple names can be added to a managed object. This
|
||||||
|
verifies a few properties. First this verifies that names can be added
|
||||||
|
using simple strings. It also verifies that the index for each
|
||||||
|
subsequent string is set accordingly. Finally this tests that the names
|
||||||
|
can be saved and retrieved from the database.
|
||||||
|
"""
|
||||||
|
expected_names = ['bowser', 'frumpy', 'big fat cat']
|
||||||
|
cert = X509Certificate(self.bytes_a, name=expected_names[0])
|
||||||
|
cert.names.append(expected_names[1])
|
||||||
|
cert.names.append(expected_names[2])
|
||||||
|
self.assertEquals(3, cert.name_index)
|
||||||
|
expected_mo_names = list()
|
||||||
|
for i, name in enumerate(expected_names):
|
||||||
|
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
|
||||||
|
self.assertEquals(expected_mo_names, cert._names)
|
||||||
|
|
||||||
|
Session = sessionmaker(bind=self.engine)
|
||||||
|
session = Session()
|
||||||
|
session.add(cert)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
session = Session()
|
||||||
|
get_obj = session.query(X509Certificate).filter(
|
||||||
|
ManagedObject.unique_identifier == cert.unique_identifier
|
||||||
|
).one()
|
||||||
|
session.commit()
|
||||||
|
self.assertEquals(expected_mo_names, get_obj._names)
|
||||||
|
|
||||||
|
def test_remove_name(self):
|
||||||
|
"""
|
||||||
|
Tests that a name can be removed from the list of names. This will
|
||||||
|
verify that the list of names is correct. It will verify that updating
|
||||||
|
this object removes the name from the database.
|
||||||
|
"""
|
||||||
|
names = ['bowser', 'frumpy', 'big fat cat']
|
||||||
|
remove_index = 1
|
||||||
|
cert = X509Certificate(self.bytes_a, name=names[0])
|
||||||
|
cert.names.append(names[1])
|
||||||
|
cert.names.append(names[2])
|
||||||
|
cert.names.pop(remove_index)
|
||||||
|
self.assertEquals(3, cert.name_index)
|
||||||
|
|
||||||
|
expected_names = list()
|
||||||
|
expected_mo_names = list()
|
||||||
|
for i, name in enumerate(names):
|
||||||
|
if i != remove_index:
|
||||||
|
expected_names.append(name)
|
||||||
|
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
|
||||||
|
self.assertEquals(expected_names, cert.names)
|
||||||
|
self.assertEquals(expected_mo_names, cert._names)
|
||||||
|
|
||||||
|
Session = sessionmaker(bind=self.engine)
|
||||||
|
session = Session()
|
||||||
|
session.add(cert)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
session = Session()
|
||||||
|
get_obj = session.query(X509Certificate).filter(
|
||||||
|
ManagedObject.unique_identifier == cert.unique_identifier
|
||||||
|
).one()
|
||||||
|
session.commit()
|
||||||
|
self.assertEquals(expected_names, get_obj.names)
|
||||||
|
self.assertEquals(expected_mo_names, get_obj._names)
|
||||||
|
|
||||||
|
def test_remove_and_add_name(self):
|
||||||
|
"""
|
||||||
|
Tests that names can be removed from the list of names and more added.
|
||||||
|
This will verify that the list of names is correct. It will verify that
|
||||||
|
updating this object removes the name from the database. It will verify
|
||||||
|
that the indices for the removed names are not reused.
|
||||||
|
"""
|
||||||
|
names = ['bowser', 'frumpy', 'big fat cat']
|
||||||
|
cert = X509Certificate(self.bytes_a, name=names[0])
|
||||||
|
cert.names.append(names[1])
|
||||||
|
cert.names.append(names[2])
|
||||||
|
cert.names.pop()
|
||||||
|
cert.names.pop()
|
||||||
|
cert.names.append('dog')
|
||||||
|
self.assertEquals(4, cert.name_index)
|
||||||
|
|
||||||
|
expected_names = ['bowser', 'dog']
|
||||||
|
expected_mo_names = list()
|
||||||
|
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0],
|
||||||
|
0))
|
||||||
|
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1],
|
||||||
|
3))
|
||||||
|
self.assertEquals(expected_names, cert.names)
|
||||||
|
self.assertEquals(expected_mo_names, cert._names)
|
||||||
|
|
||||||
|
Session = sessionmaker(bind=self.engine)
|
||||||
|
session = Session()
|
||||||
|
session.add(cert)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
session = Session()
|
||||||
|
get_obj = session.query(X509Certificate).filter(
|
||||||
|
ManagedObject.unique_identifier == cert.unique_identifier
|
||||||
|
).one()
|
||||||
|
session.commit()
|
||||||
|
self.assertEquals(expected_names, get_obj.names)
|
||||||
|
self.assertEquals(expected_mo_names, get_obj._names)
|
||||||
|
|
||||||
|
def test_update_with_add_name(self):
|
||||||
|
"""
|
||||||
|
Tests that an X509Certificate already stored in the database can be
|
||||||
|
updated. This will store an X509Certificate in the database. It will
|
||||||
|
add a name to it in one session, and then retrieve it in another
|
||||||
|
session to verify that it has all of the correct names.
|
||||||
|
|
||||||
|
This test and the subsequent test_udpate_* methods are different than
|
||||||
|
the name tests above because these are updating objects already stored
|
||||||
|
in the database. This tests will simulate what happens when the KMIP
|
||||||
|
client calls an add attribute method.
|
||||||
|
"""
|
||||||
|
first_name = 'bowser'
|
||||||
|
cert = X509Certificate(self.bytes_a, name=first_name)
|
||||||
|
Session = sessionmaker(bind=self.engine)
|
||||||
|
session = Session()
|
||||||
|
session.add(cert)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
added_name = 'frumpy'
|
||||||
|
expected_names = [first_name, added_name]
|
||||||
|
expected_mo_names = list()
|
||||||
|
for i, name in enumerate(expected_names):
|
||||||
|
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
|
||||||
|
|
||||||
|
session = Session()
|
||||||
|
update_cert = session.query(X509Certificate).filter(
|
||||||
|
ManagedObject.unique_identifier == cert.unique_identifier
|
||||||
|
).one()
|
||||||
|
update_cert.names.append(added_name)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
session = Session()
|
||||||
|
get_obj = session.query(X509Certificate).filter(
|
||||||
|
ManagedObject.unique_identifier == cert.unique_identifier
|
||||||
|
).one()
|
||||||
|
session.commit()
|
||||||
|
self.assertEquals(expected_names, get_obj.names)
|
||||||
|
self.assertEquals(expected_mo_names, get_obj._names)
|
||||||
|
|
||||||
|
def test_update_with_remove_name(self):
|
||||||
|
"""
|
||||||
|
Tests that an X509Certificate already stored in the database can be
|
||||||
|
updated. This will store an X509Certificate in the database. It will
|
||||||
|
remove a name from it in one session, and then retrieve it in another
|
||||||
|
session to verify that it has all of the correct names.
|
||||||
|
"""
|
||||||
|
names = ['bowser', 'frumpy', 'big fat cat']
|
||||||
|
remove_index = 1
|
||||||
|
cert = X509Certificate(self.bytes_a, name=names[0])
|
||||||
|
cert.names.append(names[1])
|
||||||
|
cert.names.append(names[2])
|
||||||
|
|
||||||
|
Session = sessionmaker(bind=self.engine)
|
||||||
|
session = Session()
|
||||||
|
session.add(cert)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
expected_names = list()
|
||||||
|
expected_mo_names = list()
|
||||||
|
for i, name in enumerate(names):
|
||||||
|
if i != remove_index:
|
||||||
|
expected_names.append(name)
|
||||||
|
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
|
||||||
|
|
||||||
|
session = Session()
|
||||||
|
update_cert = session.query(X509Certificate).filter(
|
||||||
|
ManagedObject.unique_identifier == cert.unique_identifier
|
||||||
|
).one()
|
||||||
|
update_cert.names.pop(remove_index)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
session = Session()
|
||||||
|
get_obj = session.query(X509Certificate).filter(
|
||||||
|
ManagedObject.unique_identifier == cert.unique_identifier
|
||||||
|
).one()
|
||||||
|
session.commit()
|
||||||
|
self.assertEquals(expected_names, get_obj.names)
|
||||||
|
self.assertEquals(expected_mo_names, get_obj._names)
|
||||||
|
|
||||||
|
def test_update_with_remove_and_add_name(self):
|
||||||
|
"""
|
||||||
|
Tests that an X509Certificate already stored in the database can be
|
||||||
|
updated. This will store an X509Certificate in the database. It will
|
||||||
|
remove a name and add another one to it in one session, and then
|
||||||
|
retrieve it in another session to verify that it has all of the correct
|
||||||
|
names. This simulates multiple operation being sent for the same
|
||||||
|
object.
|
||||||
|
"""
|
||||||
|
names = ['bowser', 'frumpy', 'big fat cat']
|
||||||
|
cert = X509Certificate(self.bytes_a, name=names[0])
|
||||||
|
cert.names.append(names[1])
|
||||||
|
cert.names.append(names[2])
|
||||||
|
|
||||||
|
Session = sessionmaker(bind=self.engine)
|
||||||
|
session = Session()
|
||||||
|
session.add(cert)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
session = Session()
|
||||||
|
update_cert = session.query(X509Certificate).filter(
|
||||||
|
ManagedObject.unique_identifier == cert.unique_identifier
|
||||||
|
).one()
|
||||||
|
update_cert.names.pop()
|
||||||
|
update_cert.names.pop()
|
||||||
|
update_cert.names.append('dog')
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
expected_names = ['bowser', 'dog']
|
||||||
|
expected_mo_names = list()
|
||||||
|
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0],
|
||||||
|
0))
|
||||||
|
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1],
|
||||||
|
3))
|
||||||
|
|
||||||
|
session = Session()
|
||||||
|
get_obj = session.query(X509Certificate).filter(
|
||||||
|
ManagedObject.unique_identifier == cert.unique_identifier
|
||||||
|
).one()
|
||||||
|
session.commit()
|
||||||
|
self.assertEquals(expected_names, get_obj.names)
|
||||||
|
self.assertEquals(expected_mo_names, get_obj._names)
|
||||||
|
|
Loading…
Reference in New Issue