Add InitialDate filtering support for the Locate operation

This change updates Locate operation support in the PyKMIP server,
allowing users to filter objects based on the objects InitialDate
attribute value. Specifying a single InitialDate attribute in the
Locate request will perform an exact match on objects; specifying
two InitialDate attributes will perform a ranged match. Unit tests
and integration tests have been added to test and verify the
correctness of this feature.

Additionally, the Locate demo scripts have also been updated to
support InitialDate filtering. Simply use the "--initial-date"
flag to provide one or more InitialDate values to the Locate
script to filter on those dates.
This commit is contained in:
Peter Hamilton 2019-07-26 12:38:02 -04:00 committed by Peter Hamilton
parent ee037408cf
commit da284e932b
7 changed files with 839 additions and 156 deletions

View File

@ -13,44 +13,64 @@
# License for the specific language governing permissions and limitations
# under the License.
from kmip.core.enums import NameType
from kmip.core.enums import Operation
from kmip.core.attributes import Name
from kmip.core.objects import Attribute
from kmip.demos import utils
from kmip.pie import client
import calendar
import logging
import sys
import time
from kmip.core import enums
from kmip.core.factories.attributes import AttributeFactory
from kmip.demos import utils
from kmip.pie import client
if __name__ == '__main__':
logger = utils.build_console_logger(logging.INFO)
# Build and parse arguments
parser = utils.build_cli_parser(Operation.LOCATE)
parser = utils.build_cli_parser(enums.Operation.LOCATE)
opts, args = parser.parse_args(sys.argv[1:])
config = opts.config
name = opts.name
initial_dates = opts.initial_dates
# Exit early if name is not specified
if name is None:
logger.error('No name provided, exiting early from demo')
sys.exit()
attribute_factory = AttributeFactory()
# Build name attribute
# TODO Push this into the AttributeFactory
attribute_name = Attribute.AttributeName('Name')
name_value = Name.NameValue(name)
name_type = Name.NameType(NameType.UNINTERPRETED_TEXT_STRING)
value = Name.create(name_value=name_value, name_type=name_type)
name_obj = Attribute(attribute_name=attribute_name, attribute_value=value)
attributes = [name_obj]
# Build attributes if any are specified
attributes = []
if name:
attributes.append(
attribute_factory.create_attribute(enums.AttributeType.NAME, name)
)
for initial_date in initial_dates:
try:
t = time.strptime(initial_date)
except ValueError, TypeError:
logger.error(
"Invalid initial date provided: {}".format(initial_date)
)
logger.info(
"Date values should be formatted like this: "
"'Tue Jul 23 18:39:01 2019'"
)
sys.exit(-1)
try:
t = calendar.timegm(t)
except Exception:
logger.error(
"Failed to convert initial date time tuple "
"to an integer: {}".format(t)
)
sys.exit(-2)
attributes.append(
attribute_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
t
)
)
# Build the client and connect to the server
with client.ProxyKmipClient(

View File

@ -13,42 +13,30 @@
# License for the specific language governing permissions and limitations
# under the License.
from kmip.core.enums import CredentialType
from kmip.core.enums import NameType
from kmip.core.enums import Operation
from kmip.core.enums import ResultStatus
from kmip.core.attributes import Name
from kmip.core.factories.attributes import AttributeFactory
from kmip.core.factories.credentials import CredentialFactory
from kmip.core.objects import Attribute
from kmip.demos import utils
from kmip.services.kmip_client import KMIPProxy
import calendar
import logging
import sys
import time
from kmip.core import enums
from kmip.core.factories.attributes import AttributeFactory
from kmip.core.factories.credentials import CredentialFactory
from kmip.demos import utils
from kmip.services import kmip_client
if __name__ == '__main__':
logger = utils.build_console_logger(logging.INFO)
# Build and parse arguments
parser = utils.build_cli_parser(Operation.LOCATE)
parser = utils.build_cli_parser(enums.Operation.LOCATE)
opts, args = parser.parse_args(sys.argv[1:])
username = opts.username
password = opts.password
config = opts.config
name = opts.name
# Exit early if the UUID is not specified
if name is None:
logger.error('No name provided, exiting early from demo')
sys.exit()
initial_dates = opts.initial_dates
attribute_factory = AttributeFactory()
credential_factory = CredentialFactory()
@ -58,34 +46,63 @@ if __name__ == '__main__':
if (username is None) and (password is None):
credential = None
else:
credential_type = CredentialType.USERNAME_AND_PASSWORD
credential_value = {'Username': username,
'Password': password}
credential = credential_factory.create_credential(credential_type,
credential_value)
credential_type = enums.CredentialType.USERNAME_AND_PASSWORD
credential_value = {
"Username": username,
"Password": password
}
credential = credential_factory.create_credential(
credential_type,
credential_value
)
# Build the client and connect to the server
client = KMIPProxy(config=config, config_file=opts.config_file)
client = kmip_client.KMIPProxy(config=config, config_file=opts.config_file)
client.open()
# Build name attribute
# TODO (peter-hamilton) Push this into the AttributeFactory
attribute_name = Attribute.AttributeName('Name')
name_value = Name.NameValue(name)
name_type = Name.NameType(NameType.UNINTERPRETED_TEXT_STRING)
value = Name.create(name_value=name_value, name_type=name_type)
name_obj = Attribute(attribute_name=attribute_name, attribute_value=value)
attributes = [name_obj]
# Build attributes if any are specified
attributes = []
if name:
attributes.append(
attribute_factory.create_attribute(enums.AttributeType.NAME, name)
)
for initial_date in initial_dates:
try:
t = time.strptime(initial_date)
except ValueError:
logger.error(
"Invalid initial date provided: {}".format(initial_date)
)
logger.info(
"Date values should be formatted like this: "
"'Tue Jul 23 18:39:01 2019'"
)
sys.exit(-1)
# Locate UUID of specified SYMMETRIC_KEY object
result = client.locate(attributes=attributes,
credential=credential)
try:
t = calendar.timegm(t)
except Exception:
logger.error(
"Failed to convert initial date time tuple "
"to an integer: {}".format(t)
)
sys.exit(-2)
attributes.append(
attribute_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
t
)
)
result = client.locate(attributes=attributes, credential=credential)
client.close()
# Display operation results
logger.info('locate() result status: {0}'.format(
result.result_status.value))
if result.result_status.value == ResultStatus.SUCCESS:
if result.result_status.value == enums.ResultStatus.SUCCESS:
logger.info('located UUIDs:')
for uuid in result.uuids:
logger.info('{0}'.format(uuid))

View File

@ -238,6 +238,20 @@ def build_cli_parser(operation=None):
default=None,
dest="name",
help="Name of secret to retrieve from the KMIP server")
parser.add_option(
"--initial-date",
action="append",
type="str",
default=[],
dest="initial_dates",
help=(
"Initial date(s) in UTC of the secret to retrieve from the "
"KMIP server. Use once to perform an exact date match. Use "
"twice to create a date range that the secret's date should "
"be within. The value format should look like this: "
"'Tue Jul 23 18:39:01 2019'"
)
)
elif operation is Operation.REGISTER:
parser.add_option(
"-f",

View File

@ -652,7 +652,7 @@ class KmipEngine(object):
names.append(name)
return names
elif attr_name == 'Object Type':
return managed_object._object_type
return managed_object.object_type
elif attr_name == 'Cryptographic Algorithm':
return managed_object.cryptographic_algorithm
elif attr_name == 'Cryptographic Length':
@ -926,6 +926,63 @@ class KmipEngine(object):
else:
return False
def _is_valid_date(self, date_type, value, start, end):
date_type = date_type.value.lower()
if start is not None:
if end is not None:
if value < start:
self._logger.debug(
"Failed match: object's {} ({}) is less than "
"the starting {} ({}).".format(
date_type,
time.asctime(time.gmtime(value)),
date_type,
time.asctime(time.gmtime(start))
)
)
return False
elif value > end:
self._logger.debug(
"Failed match: object's {} ({}) is greater than "
"the ending {} ({}).".format(
date_type,
time.asctime(time.gmtime(value)),
date_type,
time.asctime(time.gmtime(end))
)
)
return False
else:
if start != value:
self._logger.debug(
"Failed match: object's {} ({}) does not match "
"the specified {} ({}).".format(
date_type,
time.asctime(time.gmtime(value)),
date_type,
time.asctime(time.gmtime(start))
)
)
return False
return True
def _track_date_attributes(self, date_type, date_values, value):
if date_values.get("start") is None:
date_values["start"] = value
elif date_values.get("end") is None:
if value > date_values.get("start"):
date_values["end"] = value
else:
date_values["end"] = date_values.get("start")
date_values["start"] = value
else:
raise exceptions.InvalidField(
"Too many {} attributes provided. "
"Include one for an exact match. "
"Include two for a ranged match.".format(date_type.value)
)
def _get_object_with_access_controls(
self,
uid,
@ -1549,20 +1606,63 @@ class KmipEngine(object):
managed_objects_filtered = []
# Filter the objects based on given attributes.
# TODO: Currently will only filter for 'Name'.
# Needs to add other attributes.
for managed_object in managed_objects:
for attribute in payload.attributes:
attribute_name = attribute.attribute_name.value
attribute_value = attribute.attribute_value
attr = self._get_attribute_from_managed_object(
managed_object, attribute_name)
if attribute_name == 'Name':
names = attr
if attribute_value not in names:
self._logger.debug(
"Evaluating object: {}".format(
managed_object.unique_identifier
)
)
add_object = True
initial_date = {}
for payload_attribute in payload.attributes:
name = payload_attribute.attribute_name.value
value = payload_attribute.attribute_value
attribute = self._get_attribute_from_managed_object(
managed_object,
name
)
if attribute is None:
continue
elif name == "Name":
if value not in attribute:
self._logger.debug(
"Failed match: "
"the specified name ({}) does not match "
"any of the object's names ({}).".format(
value,
attribute
)
)
add_object = False
break
# TODO: filtering on other attributes
else:
elif name == enums.AttributeType.INITIAL_DATE.value:
initial_date["value"] = attribute
self._track_date_attributes(
enums.AttributeType.INITIAL_DATE,
initial_date,
value.value
)
else:
if value != attribute:
add_object = False
break
if initial_date.get("value"):
add_object &= self._is_valid_date(
enums.AttributeType.INITIAL_DATE,
initial_date.get("value"),
initial_date.get("start"),
initial_date.get("end")
)
if add_object:
self._logger.debug(
"Locate filter matched object: {}".format(
managed_object.unique_identifier
)
)
managed_objects_filtered.append(managed_object)
managed_objects = managed_objects_filtered

View File

@ -14,7 +14,9 @@
# under the License.
import logging
from testtools import TestCase
import pytest
import testtools
import time
from kmip.core.attributes import CryptographicAlgorithm
from kmip.core.attributes import CryptographicLength
@ -53,11 +55,9 @@ from kmip.core.secrets import Certificate
from kmip.core.secrets import SecretData
from kmip.core.secrets import OpaqueObject
import pytest
@pytest.mark.usefixtures("client")
class TestIntegration(TestCase):
class TestIntegration(testtools.TestCase):
def setUp(self):
super(TestIntegration, self).setUp()
@ -71,6 +71,11 @@ class TestIntegration(TestCase):
def tearDown(self):
super(TestIntegration, self).tearDown()
result = self.client.locate()
if result.result_status.value == ResultStatus.SUCCESS:
for uuid in result.uuids:
self.client.destroy(uuid=uuid)
def _create_symmetric_key(self, key_name=None):
"""
Helper function for creating symmetric keys. Used any time a key
@ -1193,3 +1198,166 @@ class TestIntegration(TestCase):
self.assertEqual(
ResultStatus.OPERATION_FAILED, result.result_status.value)
def test_symmetric_key_create_getattributes_locate_destroy(self):
"""
Test that newly created keys can be located based on their attributes.
"""
start_time = int(time.time())
time.sleep(2)
key_name = "Integration Test - Create-GetAttributes-Locate-Destroy Key"
result = self._create_symmetric_key(key_name=key_name)
uid_a = result.uuid
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)
self.assertEqual(ObjectType.SYMMETRIC_KEY, result.object_type)
self.assertIsInstance(uid_a, str)
time.sleep(2)
mid_time = int(time.time())
time.sleep(2)
key_name = "Integration Test - Create-GetAttributes-Locate-Destroy Key"
result = self._create_symmetric_key(key_name=key_name)
uid_b = result.uuid
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)
self.assertEqual(ObjectType.SYMMETRIC_KEY, result.object_type)
self.assertIsInstance(uid_b, str)
time.sleep(2)
end_time = int(time.time())
# Get the actual "Initial Date" values for each key
result = self.client.get_attributes(
uuid=uid_a,
attribute_names=["Initial Date"]
)
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)
self.assertEqual(1, len(result.attributes))
self.assertEqual(
"Initial Date",
result.attributes[0].attribute_name.value
)
initial_date_a = result.attributes[0].attribute_value.value
result = self.client.get_attributes(
uuid=uid_b,
attribute_names=["Initial Date"]
)
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)
self.assertEqual(1, len(result.attributes))
self.assertEqual(
"Initial Date",
result.attributes[0].attribute_name.value
)
initial_date_b = result.attributes[0].attribute_value.value
# Test locating each key by its exact "Initial Date" value
result = self.client.locate(
attributes=[
self.attr_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
initial_date_a
)
]
)
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)
self.assertEqual(1, len(result.uuids))
self.assertEqual(uid_a, result.uuids[0])
result = self.client.locate(
attributes=[
self.attr_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
initial_date_b
)
]
)
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)
self.assertEqual(1, len(result.uuids))
self.assertEqual(uid_b, result.uuids[0])
# Test locating each key by a range around its "Initial Date" value
result = self.client.locate(
attributes=[
self.attr_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
start_time
),
self.attr_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
mid_time
)
]
)
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)
self.assertEqual(1, len(result.uuids))
self.assertEqual(uid_a, result.uuids[0])
result = self.client.locate(
attributes=[
self.attr_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
mid_time
),
self.attr_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
end_time
)
]
)
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)
self.assertEqual(1, len(result.uuids))
self.assertEqual(uid_b, result.uuids[0])
result = self.client.locate(
attributes=[
self.attr_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
start_time
),
self.attr_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
end_time
)
]
)
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)
self.assertEqual(2, len(result.uuids))
self.assertIn(uid_a, result.uuids)
self.assertIn(uid_b, result.uuids)
# Test locating each key based off of its name.
result = self.client.locate(
attributes=[
self.attr_factory.create_attribute(
enums.AttributeType.NAME,
key_name
)
]
)
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)
self.assertEqual(2, len(result.uuids))
self.assertIn(uid_a, result.uuids)
self.assertIn(uid_b, result.uuids)
# Clean up keys
result = self.client.destroy(uid_a)
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)
result = self.client.get(uuid=result.uuid.value, credential=None)
self.assertEqual(
ResultStatus.OPERATION_FAILED,
result.result_status.value
)
result = self.client.destroy(uid_b)
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)
result = self.client.get(uuid=result.uuid.value, credential=None)
self.assertEqual(
ResultStatus.OPERATION_FAILED,
result.result_status.value
)

View File

@ -15,9 +15,11 @@
import six
import testtools
import time
import pytest
from kmip.core import enums
from kmip.core.factories import attributes as attribute_factory
from kmip.pie import exceptions
from kmip.pie import factory
@ -30,10 +32,15 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
def setUp(self):
super(TestProxyKmipClientIntegration, self).setUp()
self.object_factory = factory.ObjectFactory()
self.attribute_factory = attribute_factory.AttributeFactory()
def tearDown(self):
super(TestProxyKmipClientIntegration, self).tearDown()
uuids = self.client.locate()
for uuid in uuids:
self.client.destroy(uid=uuid)
def test_symmetric_key_create_get_destroy(self):
"""
Test that the ProxyKmipClient can create, retrieve, and destroy a
@ -860,3 +867,123 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
)
self.client.destroy(public_key_id)
self.client.destroy(private_key_id)
def test_create_getattributes_locate_destroy(self):
"""
Test that the ProxyKmipClient can create symmetric keys and then
locate those keys using their attributes.
"""
start_time = int(time.time())
time.sleep(2)
# Create some symmetric keys
a_id = self.client.create(enums.CryptographicAlgorithm.AES, 256)
time.sleep(2)
mid_time = int(time.time())
time.sleep(2)
b_id = self.client.create(enums.CryptographicAlgorithm.AES, 128)
time.sleep(2)
end_time = int(time.time())
self.assertIsInstance(a_id, str)
self.assertIsInstance(b_id, str)
# Get the "Initial Date" attributes for each key
result_id, result_attributes = self.client.get_attributes(
uid=a_id,
attribute_names=["Initial Date"]
)
self.assertEqual(1, len(result_attributes))
self.assertEqual(
"Initial Date",
result_attributes[0].attribute_name.value
)
initial_date_a = result_attributes[0].attribute_value.value
result_id, result_attributes = self.client.get_attributes(
uid=b_id,
attribute_names=["Initial Date"]
)
self.assertEqual(1, len(result_attributes))
self.assertEqual(
"Initial Date",
result_attributes[0].attribute_name.value
)
initial_date_b = result_attributes[0].attribute_value.value
# Test locating each key by its exact "Initial Date" value
result = self.client.locate(
attributes=[
self.attribute_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
initial_date_a
)
]
)
self.assertEqual(1, len(result))
self.assertEqual(a_id, result[0])
result = self.client.locate(
attributes=[
self.attribute_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
initial_date_b
)
]
)
self.assertEqual(1, len(result))
self.assertEqual(b_id, result[0])
# Test locating each key by a range around its "Initial Date" value
result = self.client.locate(
attributes=[
self.attribute_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
start_time
),
self.attribute_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
mid_time
)
]
)
self.assertEqual(1, len(result))
self.assertEqual(a_id, result[0])
result = self.client.locate(
attributes=[
self.attribute_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
mid_time
),
self.attribute_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
end_time
)
]
)
self.assertEqual(1, len(result))
self.assertEqual(b_id, result[0])
result = self.client.locate(
attributes=[
self.attribute_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
start_time
),
self.attribute_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
end_time
)
]
)
self.assertEqual(2, len(result))
self.assertIn(a_id, result)
self.assertIn(b_id, result)
# Clean up the keys
self.client.destroy(a_id)
self.client.destroy(b_id)

View File

@ -4263,6 +4263,155 @@ class TestKmipEngine(testtools.TestCase):
*args
)
def test_is_valid_date(self):
"""
Test that object date checking yields the correct results.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()
# If the date range isn't fully defined, the value is implicitly valid.
self.assertTrue(
e._is_valid_date(
enums.AttributeType.INITIAL_DATE,
1563564520,
None,
None
)
)
self.assertTrue(
e._is_valid_date(
enums.AttributeType.INITIAL_DATE,
1563564520,
None,
1563564521
)
)
# Verify the value is valid for a fully defined, encompassing range.
self.assertTrue(
e._is_valid_date(
enums.AttributeType.INITIAL_DATE,
1563564520,
1563564519,
1563564521
)
)
# Verify the value is valid for a specific date value.
self.assertTrue(
e._is_valid_date(
enums.AttributeType.INITIAL_DATE,
1563564520,
1563564520,
None
)
)
# Verify the value is invalid for a specific date value.
self.assertFalse(
e._is_valid_date(
enums.AttributeType.INITIAL_DATE,
1563564520,
1563564519,
None
)
)
e._logger.debug.assert_any_call(
"Failed match: "
"object's initial date (Fri Jul 19 19:28:40 2019) does not match "
"the specified initial date (Fri Jul 19 19:28:39 2019)."
)
e._logger.reset_mock()
# Verify the value is invalid below a specific date range.
self.assertFalse(
e._is_valid_date(
enums.AttributeType.INITIAL_DATE,
1563564519,
1563564520,
1563564521
)
)
e._logger.debug.assert_any_call(
"Failed match: "
"object's initial date (Fri Jul 19 19:28:39 2019) is less than "
"the starting initial date (Fri Jul 19 19:28:40 2019)."
)
e._logger.reset_mock()
# Verify the value is invalid above a specific date range.
self.assertFalse(
e._is_valid_date(
enums.AttributeType.INITIAL_DATE,
1563564521,
1563564519,
1563564520
)
)
e._logger.debug.assert_any_call(
"Failed match: "
"object's initial date (Fri Jul 19 19:28:41 2019) is greater than "
"the ending initial date (Fri Jul 19 19:28:40 2019)."
)
def test_track_date_attributes(self):
"""
Test date attribute value tracking with a simple dictionary.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()
date_values = {}
# Verify the first date given is considered the starting range value.
e._track_date_attributes(
enums.AttributeType.INITIAL_DATE,
date_values,
1563564519
)
self.assertEqual(1563564519, date_values["start"])
# Verify the second date given is considered the ending range value.
e._track_date_attributes(
enums.AttributeType.INITIAL_DATE,
date_values,
1563564521
)
self.assertEqual(1563564519, date_values["start"])
self.assertEqual(1563564521, date_values["end"])
# Verify that the third date given triggers an exception.
args = (enums.AttributeType.INITIAL_DATE, date_values, 1563564520)
six.assertRaisesRegex(
self,
exceptions.InvalidField,
"Too many Initial Date attributes provided. "
"Include one for an exact match. "
"Include two for a ranged match.",
e._track_date_attributes,
*args
)
# Verify that a lower second date is interpreted as the new start date.
date_values = {}
date_values["start"] = 1563564521
e._track_date_attributes(
enums.AttributeType.INITIAL_DATE,
date_values,
1563564519
)
self.assertEqual(1563564519, date_values["start"])
self.assertEqual(1563564521, date_values["end"])
def test_locate(self):
"""
Test that a Locate request can be processed correctly.
@ -4285,13 +4434,8 @@ class TestKmipEngine(testtools.TestCase):
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: Locate"
)
self.assertEqual(
len(response_payload.unique_identifiers),
0
)
e._logger.info.assert_any_call("Processing operation: Locate")
self.assertEqual(len(response_payload.unique_identifiers), 0)
# Add the first obj and test the locate
e._data_session.add(obj_a)
@ -4306,18 +4450,10 @@ class TestKmipEngine(testtools.TestCase):
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: Locate"
)
e._logger.info.assert_any_call("Processing operation: Locate")
self.assertEqual(
len(response_payload.unique_identifiers),
1
)
self.assertEqual(
id_a,
response_payload.unique_identifiers[0]
)
self.assertEqual(len(response_payload.unique_identifiers), 1)
self.assertEqual(id_a, response_payload.unique_identifiers[0])
# Add the second obj and test the locate
e._data_session.add(obj_b)
@ -4332,22 +4468,11 @@ class TestKmipEngine(testtools.TestCase):
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: Locate"
)
e._logger.info.assert_any_call("Processing operation: Locate")
self.assertEqual(
len(response_payload.unique_identifiers),
2
)
self.assertIn(
id_a,
response_payload.unique_identifiers
)
self.assertIn(
id_b,
response_payload.unique_identifiers
)
self.assertEqual(len(response_payload.unique_identifiers), 2)
self.assertIn(id_a, response_payload.unique_identifiers)
self.assertIn(id_b, response_payload.unique_identifiers)
def test_locate_with_name(self):
"""
@ -4360,14 +4485,27 @@ class TestKmipEngine(testtools.TestCase):
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()
key = (b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x00')
key = (
b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
)
obj_a = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, key, name='name0')
enums.CryptographicAlgorithm.AES,
128,
key,
name='name0'
)
obj_b = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.DES, 128, key, name='name0')
enums.CryptographicAlgorithm.DES,
128,
key,
name='name0'
)
obj_c = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES, 128, key, name='name1')
enums.CryptographicAlgorithm.AES,
128,
key,
name='name1'
)
e._data_session.add(obj_a)
e._data_session.add(obj_b)
@ -4383,13 +4521,13 @@ class TestKmipEngine(testtools.TestCase):
# Locate the obj with name 'name0'
attrs = [
attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
'name0',
enums.NameType.UNINTERPRETED_TEXT_STRING
)
),
attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
'name0',
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
]
payload = payloads.LocateRequestPayload(attributes=attrs)
@ -4398,32 +4536,21 @@ class TestKmipEngine(testtools.TestCase):
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: Locate"
)
e._logger.info.assert_any_call("Processing operation: Locate")
self.assertEqual(
len(response_payload.unique_identifiers),
2
)
self.assertIn(
id_a,
response_payload.unique_identifiers
)
self.assertIn(
id_b,
response_payload.unique_identifiers
)
self.assertEqual(len(response_payload.unique_identifiers), 2)
self.assertIn(id_a, response_payload.unique_identifiers)
self.assertIn(id_b, response_payload.unique_identifiers)
# Locate the obj with name 'name1'
attrs = [
attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
'name1',
enums.NameType.UNINTERPRETED_TEXT_STRING
)
),
attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
'name1',
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
]
payload = payloads.LocateRequestPayload(attributes=attrs)
@ -4432,19 +4559,129 @@ class TestKmipEngine(testtools.TestCase):
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: Locate"
e._logger.info.assert_any_call("Processing operation: Locate")
self.assertEqual(len(response_payload.unique_identifiers), 1)
self.assertIn(id_c, response_payload.unique_identifiers)
def test_locate_with_initial_date(self):
"""
Test the Locate operation when 'Initial Date' attributes are given.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()
key = (
b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
)
self.assertEqual(
len(response_payload.unique_identifiers),
1
obj_a = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
128,
key,
name='name1'
)
self.assertIn(
id_c,
response_payload.unique_identifiers
obj_a.initial_date = int(time.time())
obj_a_time_str = time.strftime(
"%a %b %d %H:%M:%S %Y",
time.gmtime(obj_a.initial_date)
)
time.sleep(2)
mid_time = int(time.time())
mid_time_str = time.strftime(
"%a %b %d %H:%M:%S %Y",
time.gmtime(mid_time)
)
time.sleep(2)
obj_b = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.DES,
128,
key,
name='name2'
)
obj_b.initial_date = int(time.time())
obj_b_time_str = time.strftime(
"%a %b %d %H:%M:%S %Y",
time.gmtime(obj_b.initial_date)
)
time.sleep(2)
end_time = int(time.time())
e._data_session.add(obj_a)
e._data_session.add(obj_b)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
id_a = str(obj_a.unique_identifier)
id_b = str(obj_b.unique_identifier)
attribute_factory = factory.AttributeFactory()
# Locate the object with a specific timestamp
attrs = [
attribute_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
obj_a.initial_date
)
]
payload = payloads.LocateRequestPayload(attributes=attrs)
e._logger.reset_mock()
response_payload = e._process_locate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call("Processing operation: Locate")
e._logger.debug.assert_any_call(
"Failed match: object's initial date ({}) does not match "
"the specified initial date ({}).".format(
obj_b_time_str,
obj_a_time_str
)
)
e._logger.debug.assert_any_call(
"Locate filter matched object: {}".format(id_a)
)
self.assertEqual(len(response_payload.unique_identifiers), 1)
self.assertIn(id_a, response_payload.unique_identifiers)
# Locate an object with a timestamp range
attrs = [
attribute_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
mid_time
),
attribute_factory.create_attribute(
enums.AttributeType.INITIAL_DATE,
end_time
)
]
payload = payloads.LocateRequestPayload(attributes=attrs)
e._logger.reset_mock()
response_payload = e._process_locate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call("Processing operation: Locate")
e._logger.debug.assert_any_call(
"Failed match: object's initial date ({}) is less than "
"the starting initial date ({}).".format(
obj_a_time_str,
mid_time_str
)
)
e._logger.debug.assert_any_call(
"Locate filter matched object: {}".format(id_b)
)
self.assertEqual(len(response_payload.unique_identifiers), 1)
self.assertIn(id_b, response_payload.unique_identifiers)
def test_get(self):
"""
Test that a Get request can be processed correctly.