mirror of https://github.com/OpenKMIP/PyKMIP.git
Added SQLAlchemy Code to OpaqueData
Added the ORM code to be able to persist OpaqueData in a database. This added the code to the base class ManagedObject as well. Unit tests are added to demonstrate the code is working correctly.
This commit is contained in:
parent
52c7103681
commit
e7a383cace
|
@ -2,9 +2,11 @@
|
|||
.pydevproject
|
||||
*.pyc
|
||||
|
||||
.cache/
|
||||
.coverage
|
||||
.tox/
|
||||
ChangeLog
|
||||
PyKMIP.egg-info/
|
||||
dist/
|
||||
htmlcov/
|
||||
tags
|
||||
|
|
|
@ -13,17 +13,19 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from abc import ABCMeta
|
||||
from abc import abstractmethod
|
||||
from sqlalchemy import Column, event, ForeignKey, Integer, VARCHAR
|
||||
from sqlalchemy.ext.associationproxy import association_proxy
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
import binascii
|
||||
import six
|
||||
|
||||
from kmip.core import enums
|
||||
from kmip.pie import sqltypes as sql
|
||||
|
||||
|
||||
@six.add_metaclass(ABCMeta)
|
||||
class ManagedObject:
|
||||
class ManagedObject(sql.Base):
|
||||
"""
|
||||
The abstract base class of the simplified KMIP object hierarchy.
|
||||
|
||||
|
@ -41,6 +43,20 @@ class ManagedObject:
|
|||
object_type: An enumeration associated with the type of ManagedObject.
|
||||
"""
|
||||
|
||||
__tablename__ = 'managed_objects'
|
||||
unique_identifier = Column('uid', Integer, primary_key=True)
|
||||
_object_type = Column('object_type', sql.EnumType(enums.ObjectType))
|
||||
value = Column('value', VARCHAR(1024))
|
||||
name_index = Column(Integer, default=0)
|
||||
_names = relationship('ManagedObjectName', back_populates='mo',
|
||||
cascade='all, delete-orphan')
|
||||
names = association_proxy('_names', 'name')
|
||||
|
||||
__mapper_args__ = {
|
||||
'polymorphic_identity': 0x00000000,
|
||||
'polymorphic_on': _object_type
|
||||
}
|
||||
|
||||
@abstractmethod
|
||||
def __init__(self):
|
||||
"""
|
||||
|
@ -49,6 +65,7 @@ class ManagedObject:
|
|||
self.value = None
|
||||
|
||||
self.unique_identifier = None
|
||||
self.name_index = 0
|
||||
self.names = list()
|
||||
self._object_type = None
|
||||
|
||||
|
@ -873,6 +890,15 @@ class OpaqueObject(ManagedObject):
|
|||
opaque_type: The type of the opaque value.
|
||||
"""
|
||||
|
||||
__tablename__ = 'opaque_objects'
|
||||
unique_identifier = Column('uid', Integer,
|
||||
ForeignKey('managed_objects.uid'),
|
||||
primary_key=True)
|
||||
opaque_type = Column('opaque_type', sql.EnumType(enums.OpaqueDataType))
|
||||
__mapper_args__ = {
|
||||
'polymorphic_identity': enums.ObjectType.OPAQUE_DATA
|
||||
}
|
||||
|
||||
def __init__(self, value, opaque_type, name='Opaque Object'):
|
||||
"""
|
||||
Create a OpaqueObject.
|
||||
|
@ -889,7 +915,7 @@ class OpaqueObject(ManagedObject):
|
|||
|
||||
self.value = value
|
||||
self.opaque_type = opaque_type
|
||||
self.names = [name]
|
||||
self.names.append(name)
|
||||
|
||||
# All remaining attributes are not considered part of the public API
|
||||
# and are subject to change.
|
||||
|
@ -950,3 +976,7 @@ class OpaqueObject(ManagedObject):
|
|||
return not (self == other)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
|
||||
event.listen(OpaqueObject._names, 'append',
|
||||
sql.attribute_append_factory("name_index"), retval=False)
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
# Copyright (c) 2016 The Johns Hopkins University/Applied Physics Laboratory
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from kmip.core import enums
|
||||
from sqlalchemy import Column, ForeignKey, Integer, String
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
import sqlalchemy.types as types
|
||||
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
def attribute_append_factory(index_attribute):
|
||||
def attribute_append(list_container, list_attribute, initiator):
|
||||
index = getattr(list_container, index_attribute)
|
||||
list_attribute.index = index
|
||||
setattr(list_container, index_attribute, index + 1)
|
||||
return list_attribute
|
||||
return attribute_append
|
||||
|
||||
|
||||
class EnumType(types.TypeDecorator):
|
||||
"""
|
||||
Converts a Python enum to an integer before storing it in the database.
|
||||
This also does the reverse of converting an integer into an enum object.
|
||||
This allows enums to be stored in a database.
|
||||
"""
|
||||
|
||||
impl = types.Integer
|
||||
|
||||
def __init__(self, cls):
|
||||
"""
|
||||
Create a new EnumType. This new EnumType requires a class object in the
|
||||
constructor. The class is used to construct new instances of the Enum
|
||||
when the integer value is retrieved from the database.
|
||||
|
||||
Args:
|
||||
cls(class): An Enum class used to create new instances from integer
|
||||
values.
|
||||
"""
|
||||
super(EnumType, self).__init__()
|
||||
self._cls = cls
|
||||
|
||||
def process_bind_param(self, value, dialect):
|
||||
"""
|
||||
Returns the integer value of the Enum. This value is stored in the
|
||||
database.
|
||||
|
||||
Args:
|
||||
value(Enum): An Enum instance whose integer value is to be stored.
|
||||
dialect(string): SQL dialect
|
||||
"""
|
||||
return value.value
|
||||
|
||||
def process_result_value(self, value, dialect):
|
||||
"""
|
||||
Returns a new Enum representing the value stored in the database. The
|
||||
Enum class type of the returned object is that of the cls parameter in
|
||||
the __init__ call.
|
||||
|
||||
Args:
|
||||
value(int): The integer value stored in the database that is used
|
||||
to create the Enum
|
||||
dialect(string): SQL dialect
|
||||
"""
|
||||
return self._cls(value)
|
||||
|
||||
|
||||
class ManagedObjectName(Base):
|
||||
|
||||
__tablename__ = 'managed_object_names'
|
||||
id = Column('id', Integer, primary_key=True)
|
||||
mo_uid = Column('mo_uid', Integer, ForeignKey('managed_objects.uid'))
|
||||
name = Column('name', String)
|
||||
index = Column('name_index', Integer)
|
||||
name_type = Column('name_type', EnumType(enums.NameType))
|
||||
|
||||
mo = relationship('ManagedObject', back_populates='_names')
|
||||
|
||||
def __init__(self, name, index=0,
|
||||
name_type=enums.NameType.UNINTERPRETED_TEXT_STRING):
|
||||
self.name = name
|
||||
self.index = index
|
||||
self.name_type = name_type
|
||||
|
||||
def __repr__(self):
|
||||
return ("<ManagedObjectName(name='%s', index='%d', type='%s')>" %
|
||||
(self.name, self.index, self.name_type))
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, ManagedObjectName):
|
||||
if self.name != other.name:
|
||||
return False
|
||||
elif self.index != other.index:
|
||||
return False
|
||||
elif self.name_type != other.name_type:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __ne__(self, other):
|
||||
if isinstance(other, ManagedObjectName):
|
||||
return not (self == other)
|
||||
else:
|
||||
return NotImplemented
|
|
@ -17,7 +17,10 @@ import binascii
|
|||
import testtools
|
||||
|
||||
from kmip.core import enums
|
||||
from kmip.pie import objects
|
||||
from kmip.pie.objects import ManagedObject, OpaqueObject
|
||||
from kmip.pie import sqltypes
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
|
||||
class TestOpaqueObject(testtools.TestCase):
|
||||
|
@ -33,6 +36,8 @@ class TestOpaqueObject(testtools.TestCase):
|
|||
b'\x53\x65\x63\x72\x65\x74\x50\x61\x73\x73\x77\x6F\x72\x64')
|
||||
self.bytes_b = (
|
||||
b'\x53\x65\x63\x72\x65\x74\x50\x61\x73\x73\x77\x6F\x72\x65')
|
||||
self.engine = create_engine('sqlite:///:memory:', echo=True)
|
||||
sqltypes.Base.metadata.create_all(self.engine)
|
||||
|
||||
def tearDown(self):
|
||||
super(TestOpaqueObject, self).tearDown()
|
||||
|
@ -41,7 +46,7 @@ class TestOpaqueObject(testtools.TestCase):
|
|||
"""
|
||||
Test that a OpaqueObject object can be instantiated.
|
||||
"""
|
||||
obj = objects.OpaqueObject(
|
||||
obj = OpaqueObject(
|
||||
self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
|
||||
self.assertEqual(obj.value, self.bytes_a)
|
||||
|
@ -52,7 +57,7 @@ class TestOpaqueObject(testtools.TestCase):
|
|||
"""
|
||||
Test that a OpaqueObject object can be instantiated with all arguments.
|
||||
"""
|
||||
obj = objects.OpaqueObject(
|
||||
obj = OpaqueObject(
|
||||
self.bytes_a,
|
||||
enums.OpaqueDataType.NONE,
|
||||
name='Test Opaque Object')
|
||||
|
@ -66,7 +71,7 @@ class TestOpaqueObject(testtools.TestCase):
|
|||
Test that the object type can be retrieved from the OpaqueObject.
|
||||
"""
|
||||
expected = enums.ObjectType.OPAQUE_DATA
|
||||
obj = objects.OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
obj = OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
observed = obj.object_type
|
||||
self.assertEqual(expected, observed)
|
||||
|
||||
|
@ -76,7 +81,7 @@ class TestOpaqueObject(testtools.TestCase):
|
|||
construct a OpaqueObject.
|
||||
"""
|
||||
args = (0, enums.OpaqueDataType.NONE)
|
||||
self.assertRaises(TypeError, objects.OpaqueObject, *args)
|
||||
self.assertRaises(TypeError, OpaqueObject, *args)
|
||||
|
||||
def test_validate_on_invalid_data_type(self):
|
||||
"""
|
||||
|
@ -84,7 +89,7 @@ class TestOpaqueObject(testtools.TestCase):
|
|||
construct a OpaqueObject.
|
||||
"""
|
||||
args = (self.bytes_a, 'invalid')
|
||||
self.assertRaises(TypeError, objects.OpaqueObject, *args)
|
||||
self.assertRaises(TypeError, OpaqueObject, *args)
|
||||
|
||||
def test_validate_on_invalid_name(self):
|
||||
"""
|
||||
|
@ -93,13 +98,13 @@ class TestOpaqueObject(testtools.TestCase):
|
|||
"""
|
||||
args = (self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
kwargs = {'name': 0}
|
||||
self.assertRaises(TypeError, objects.OpaqueObject, *args, **kwargs)
|
||||
self.assertRaises(TypeError, OpaqueObject, *args, **kwargs)
|
||||
|
||||
def test_repr(self):
|
||||
"""
|
||||
Test that repr can be applied to a OpaqueObject.
|
||||
"""
|
||||
obj = objects.OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
obj = OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
args = "value={0}, opaque_type={1}".format(
|
||||
binascii.hexlify(self.bytes_a), enums.OpaqueDataType.NONE)
|
||||
expected = "OpaqueObject({0})".format(args)
|
||||
|
@ -110,7 +115,7 @@ class TestOpaqueObject(testtools.TestCase):
|
|||
"""
|
||||
Test that str can be applied to a OpaqueObject.
|
||||
"""
|
||||
obj = objects.OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
obj = OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
expected = str(binascii.hexlify(self.bytes_a))
|
||||
observed = str(obj)
|
||||
self.assertEqual(expected, observed)
|
||||
|
@ -120,8 +125,8 @@ class TestOpaqueObject(testtools.TestCase):
|
|||
Test that the equality operator returns True when comparing two
|
||||
OpaqueObject objects with the same data.
|
||||
"""
|
||||
a = objects.OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
b = objects.OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
a = OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
b = OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
self.assertTrue(a == b)
|
||||
self.assertTrue(b == a)
|
||||
|
||||
|
@ -130,8 +135,8 @@ class TestOpaqueObject(testtools.TestCase):
|
|||
Test that the equality operator returns False when comparing two
|
||||
OpaqueObject objects with different data.
|
||||
"""
|
||||
a = objects.OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
b = objects.OpaqueObject(self.bytes_b, enums.OpaqueDataType.NONE)
|
||||
a = OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
b = OpaqueObject(self.bytes_b, enums.OpaqueDataType.NONE)
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
||||
|
@ -140,8 +145,8 @@ class TestOpaqueObject(testtools.TestCase):
|
|||
Test that the equality operator returns False when comparing two
|
||||
OpaqueObject objects with different data.
|
||||
"""
|
||||
a = objects.OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
b = objects.OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
a = OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
b = OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
b.opaque_type = "invalid"
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
@ -151,7 +156,7 @@ class TestOpaqueObject(testtools.TestCase):
|
|||
Test that the equality operator returns False when comparing a
|
||||
OpaqueObject object to a non-OpaqueObject object.
|
||||
"""
|
||||
a = objects.OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
a = OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
b = "invalid"
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
@ -161,8 +166,8 @@ class TestOpaqueObject(testtools.TestCase):
|
|||
Test that the inequality operator returns False when comparing
|
||||
two OpaqueObject objects with the same internal data.
|
||||
"""
|
||||
a = objects.OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
b = objects.OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
a = OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
b = OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
self.assertFalse(a != b)
|
||||
self.assertFalse(b != a)
|
||||
|
||||
|
@ -171,8 +176,8 @@ class TestOpaqueObject(testtools.TestCase):
|
|||
Test that the equality operator returns True when comparing two
|
||||
OpaqueObject objects with different data.
|
||||
"""
|
||||
a = objects.OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
b = objects.OpaqueObject(self.bytes_b, enums.OpaqueDataType.NONE)
|
||||
a = OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
b = OpaqueObject(self.bytes_b, enums.OpaqueDataType.NONE)
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
|
@ -181,8 +186,8 @@ class TestOpaqueObject(testtools.TestCase):
|
|||
Test that the equality operator returns True when comparing two
|
||||
OpaqueObject objects with different data.
|
||||
"""
|
||||
a = objects.OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
b = objects.OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
a = OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
b = OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
b.opaque_type = "invalid"
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
@ -192,7 +197,278 @@ class TestOpaqueObject(testtools.TestCase):
|
|||
Test that the equality operator returns True when comparing a
|
||||
OpaqueObject object to a non-OpaqueObject object.
|
||||
"""
|
||||
a = objects.OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
a = OpaqueObject(self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
b = "invalid"
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
def test_save(self):
|
||||
"""
|
||||
Test that the object can be saved using SQLAlchemy. This will add it to
|
||||
the database, verify that no exceptions are thrown, and check that its
|
||||
unique identifier was set.
|
||||
"""
|
||||
obj = OpaqueObject(
|
||||
self.bytes_a, enums.OpaqueDataType.NONE)
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
def test_get(self):
|
||||
"""
|
||||
Test that the object can be saved and then retrieved using SQLAlchemy.
|
||||
This adds is to the database and then retrieves it by ID and verifies
|
||||
some of the attributes.
|
||||
"""
|
||||
test_name = 'bowser'
|
||||
obj = OpaqueObject(
|
||||
self.bytes_a, enums.OpaqueDataType.NONE, name=test_name)
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(OpaqueObject).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEqual(1, len(get_obj.names))
|
||||
self.assertEqual([test_name], get_obj.names)
|
||||
self.assertEqual(self.bytes_a, get_obj.value)
|
||||
self.assertEqual(enums.ObjectType.OPAQUE_DATA, get_obj.object_type)
|
||||
self.assertEqual(enums.OpaqueDataType.NONE, get_obj.opaque_type)
|
||||
|
||||
def test_add_multiple_names(self):
|
||||
"""
|
||||
Test that multiple names can be added to a managed object. This
|
||||
verifies a few properties. First this verifies that names can be added
|
||||
using simple strings. It also verifies that the index for each
|
||||
subsequent string is set accordingly. Finally this tests that the names
|
||||
can be saved and retrieved from the database.
|
||||
"""
|
||||
expected_names = ['bowser', 'frumpy', 'big fat cat']
|
||||
obj = OpaqueObject(
|
||||
self.bytes_a, enums.OpaqueDataType.NONE, name=expected_names[0])
|
||||
obj.names.append(expected_names[1])
|
||||
obj.names.append(expected_names[2])
|
||||
self.assertEquals(3, obj.name_index)
|
||||
expected_mo_names = list()
|
||||
for i, name in enumerate(expected_names):
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
|
||||
self.assertEquals(expected_mo_names, obj._names)
|
||||
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(OpaqueObject).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
||||
def test_remove_name(self):
|
||||
"""
|
||||
Tests that a name can be removed from the list of names. This will
|
||||
verify that the list of names is correct. It will verify that updating
|
||||
this object removes the name from the database.
|
||||
"""
|
||||
names = ['bowser', 'frumpy', 'big fat cat']
|
||||
remove_index = 1
|
||||
obj = OpaqueObject(
|
||||
self.bytes_a, enums.OpaqueDataType.NONE, name=names[0])
|
||||
obj.names.append(names[1])
|
||||
obj.names.append(names[2])
|
||||
obj.names.pop(remove_index)
|
||||
self.assertEquals(3, obj.name_index)
|
||||
|
||||
expected_names = list()
|
||||
expected_mo_names = list()
|
||||
for i, name in enumerate(names):
|
||||
if i != remove_index:
|
||||
expected_names.append(name)
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
|
||||
self.assertEquals(expected_names, obj.names)
|
||||
self.assertEquals(expected_mo_names, obj._names)
|
||||
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(OpaqueObject).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_names, get_obj.names)
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
||||
def test_remove_and_add_name(self):
|
||||
"""
|
||||
Tests that names can be removed from the list of names and more added.
|
||||
This will verify that the list of names is correct. It will verify that
|
||||
updating this object removes the name from the database. It will verify
|
||||
that the indices for the removed names are not reused.
|
||||
"""
|
||||
names = ['bowser', 'frumpy', 'big fat cat']
|
||||
obj = OpaqueObject(
|
||||
self.bytes_a, enums.OpaqueDataType.NONE, name=names[0])
|
||||
obj.names.append(names[1])
|
||||
obj.names.append(names[2])
|
||||
obj.names.pop()
|
||||
obj.names.pop()
|
||||
obj.names.append('dog')
|
||||
self.assertEquals(4, obj.name_index)
|
||||
|
||||
expected_names = ['bowser', 'dog']
|
||||
expected_mo_names = list()
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0],
|
||||
0))
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1],
|
||||
3))
|
||||
self.assertEquals(expected_names, obj.names)
|
||||
self.assertEquals(expected_mo_names, obj._names)
|
||||
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(OpaqueObject).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_names, get_obj.names)
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
||||
def test_update_with_add_name(self):
|
||||
"""
|
||||
Tests that an OpaqueObject already stored in the database can be
|
||||
updated. This will store an OpaqueObject in the database. It will add a
|
||||
name to it in one session, and then retrieve it in another session to
|
||||
verify that it has all of the correct names.
|
||||
|
||||
This test and the subsequent test_udpate_* methods are different than
|
||||
the name tests above because these are updating objects already stored
|
||||
in the database. This tests will simulate what happens when the KMIP
|
||||
client calls an add attribute method.
|
||||
"""
|
||||
first_name = 'bowser'
|
||||
obj = OpaqueObject(
|
||||
self.bytes_a, enums.OpaqueDataType.NONE, name=first_name)
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
added_name = 'frumpy'
|
||||
expected_names = [first_name, added_name]
|
||||
expected_mo_names = list()
|
||||
for i, name in enumerate(expected_names):
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
|
||||
|
||||
session = Session()
|
||||
update_obj = session.query(OpaqueObject).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
update_obj.names.append(added_name)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(OpaqueObject).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_names, get_obj.names)
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
||||
def test_update_with_remove_name(self):
|
||||
"""
|
||||
Tests that an OpaqueObject already stored in the database can be
|
||||
updated. This will store an OpaqueObject in the database. It will
|
||||
remove a name from it in one session, and then retrieve it in another
|
||||
session to verify that it has all of the correct names.
|
||||
"""
|
||||
names = ['bowser', 'frumpy', 'big fat cat']
|
||||
remove_index = 1
|
||||
obj = OpaqueObject(
|
||||
self.bytes_a, enums.OpaqueDataType.NONE, name=names[0])
|
||||
obj.names.append(names[1])
|
||||
obj.names.append(names[2])
|
||||
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
expected_names = list()
|
||||
expected_mo_names = list()
|
||||
for i, name in enumerate(names):
|
||||
if i != remove_index:
|
||||
expected_names.append(name)
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
|
||||
|
||||
session = Session()
|
||||
update_obj = session.query(OpaqueObject).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
update_obj.names.pop(remove_index)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(OpaqueObject).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_names, get_obj.names)
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
||||
def test_update_with_remove_and_add_name(self):
|
||||
"""
|
||||
Tests that an OpaqueObject already stored in the database can be
|
||||
updated. This will store an OpaqueObject in the database. It will
|
||||
remove a name and add another one to it in one session, and then
|
||||
retrieve it in another session to verify that it has all of the correct
|
||||
names. This simulates multiple operation being sent for the same
|
||||
object.
|
||||
"""
|
||||
names = ['bowser', 'frumpy', 'big fat cat']
|
||||
obj = OpaqueObject(
|
||||
self.bytes_a, enums.OpaqueDataType.NONE, name=names[0])
|
||||
obj.names.append(names[1])
|
||||
obj.names.append(names[2])
|
||||
|
||||
Session = sessionmaker(bind=self.engine)
|
||||
session = Session()
|
||||
session.add(obj)
|
||||
session.commit()
|
||||
|
||||
session = Session()
|
||||
update_obj = session.query(OpaqueObject).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
update_obj.names.pop()
|
||||
update_obj.names.pop()
|
||||
update_obj.names.append('dog')
|
||||
session.commit()
|
||||
|
||||
expected_names = ['bowser', 'dog']
|
||||
expected_mo_names = list()
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0],
|
||||
0))
|
||||
expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1],
|
||||
3))
|
||||
|
||||
session = Session()
|
||||
get_obj = session.query(OpaqueObject).filter(
|
||||
ManagedObject.unique_identifier == obj.unique_identifier
|
||||
).one()
|
||||
session.commit()
|
||||
self.assertEquals(expected_names, get_obj.names)
|
||||
self.assertEquals(expected_mo_names, get_obj._names)
|
||||
|
|
|
@ -0,0 +1,132 @@
|
|||
# Copyright (c) 2016 The Johns Hopkins University/Applied Physics Laboratory
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import testtools
|
||||
|
||||
from kmip.core import enums
|
||||
from kmip.pie.sqltypes import ManagedObjectName
|
||||
|
||||
|
||||
class TestSqlTypesManagedObjectName(testtools.TestCase):
|
||||
"""
|
||||
Test suite for objects in sqltypes.py.
|
||||
"""
|
||||
|
||||
def test_equal_on_equal(self):
|
||||
"""
|
||||
Test that the equality operator returns True when comparing two
|
||||
ManagedObjectName objects with the same data.
|
||||
"""
|
||||
a = ManagedObjectName('a', 0)
|
||||
b = ManagedObjectName('a', 0)
|
||||
self.assertTrue(a == b)
|
||||
self.assertTrue(b == a)
|
||||
|
||||
def test_equal_on_not_equal_name(self):
|
||||
"""
|
||||
Test that the equality operator returns False when comparing two
|
||||
ManagedObjectName objects with different names.
|
||||
"""
|
||||
a = ManagedObjectName('a', 0)
|
||||
b = ManagedObjectName('b', 0)
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
||||
def test_equal_on_not_equal_index(self):
|
||||
"""
|
||||
Test that the equality operator returns False when comparing two
|
||||
ManagedObjectName objects with different indices.
|
||||
"""
|
||||
a = ManagedObjectName('a', 0)
|
||||
b = ManagedObjectName('a', 1)
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
||||
def test_equal_on_not_equal_name_type(self):
|
||||
"""
|
||||
Test that the equality operator returns False when comparing two
|
||||
ManagedObjectName objects with different name types.
|
||||
"""
|
||||
a = ManagedObjectName('a', 0, enums.NameType.UNINTERPRETED_TEXT_STRING)
|
||||
b = ManagedObjectName('a', 0, enums.NameType.URI)
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
||||
def test_equal_on_not_equal_class_type(self):
|
||||
"""
|
||||
Test that the equality operator returns False when comparing a
|
||||
ManagedObjectName object with a different type of object.
|
||||
"""
|
||||
a = ManagedObjectName('a', 0, enums.NameType.UNINTERPRETED_TEXT_STRING)
|
||||
b = 'foo'
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
||||
def test_not_equal_on_equal(self):
|
||||
"""
|
||||
Test that the not equal operator returns False when comparing two
|
||||
ManagedObjectName objects with the same data.
|
||||
"""
|
||||
a = ManagedObjectName('a', 0)
|
||||
b = ManagedObjectName('a', 0)
|
||||
self.assertFalse(a != b)
|
||||
self.assertFalse(b != a)
|
||||
|
||||
def test_not_equal_on_not_equal_name(self):
|
||||
"""
|
||||
Test that the not equal operator returns True when comparing two
|
||||
ManagedObjectName objects with different names.
|
||||
"""
|
||||
a = ManagedObjectName('a', 0)
|
||||
b = ManagedObjectName('b', 0)
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
def test_not_equal_on_not_equal_index(self):
|
||||
"""
|
||||
Test that the not equal operator returns True when comparing two
|
||||
ManagedObjectName objects with different indices.
|
||||
"""
|
||||
a = ManagedObjectName('a', 0)
|
||||
b = ManagedObjectName('a', 1)
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
def test_not_equal_on_not_equal_name_type(self):
|
||||
"""
|
||||
Test that the not equal operator returns True when comparing two
|
||||
ManagedObjectName objects with different name types.
|
||||
"""
|
||||
a = ManagedObjectName('a', 0, enums.NameType.UNINTERPRETED_TEXT_STRING)
|
||||
b = ManagedObjectName('a', 0, enums.NameType.URI)
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
def test_not_equal_on_not_equal_class_type(self):
|
||||
"""
|
||||
Test that the not equal operator returns False when comparing a
|
||||
ManagedObjectName object with a different type of object.
|
||||
"""
|
||||
a = ManagedObjectName('a', 0, enums.NameType.UNINTERPRETED_TEXT_STRING)
|
||||
b = 'foo'
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
def test_repr(self):
|
||||
"""
|
||||
Test that __repr__ is implemented.
|
||||
"""
|
||||
a = ManagedObjectName('a', 0, enums.NameType.UNINTERPRETED_TEXT_STRING)
|
||||
repr(a)
|
|
@ -1,4 +1,4 @@
|
|||
cryptography>=1.1
|
||||
enum34
|
||||
six>=1.9.0
|
||||
|
||||
sqlalchemy>=1.0
|
||||
|
|
Loading…
Reference in New Issue