Add CryptographicUsageMask filtering support for Locate

This change updates Locate operation support in the PyKMIP server,
allowing users to filter objects based on the object's
Cryptographic Usage Masks. Unit tests and integration tests have
been added to test and verify the correctness of this feature.

Additionally, the Locate demo scripts have also been updated to
support Cryptographic Usage Mask filtering. Simply use the
"--cryptographic-usage-mask" flag to specify one or more
Cryptographic Usage Mask enumeration values for the Locate script
to filter on.
This commit is contained in:
Peter Hamilton 2019-08-12 15:22:04 -04:00 committed by Peter Hamilton
parent 4a6a2eccc1
commit b5a8739157
7 changed files with 302 additions and 0 deletions

View File

@ -40,6 +40,7 @@ if __name__ == '__main__':
object_type = opts.object_type
cryptographic_algorithm = opts.cryptographic_algorithm
cryptographic_length = opts.cryptographic_length
cryptographic_usage_masks = opts.cryptographic_usage_masks
unique_identifier = opts.unique_identifier
operation_policy_name = opts.operation_policy_name
@ -147,6 +148,29 @@ if __name__ == '__main__':
)
)
sys.exit(-6)
if cryptographic_usage_masks:
masks = []
for cryptographic_usage_mask in cryptographic_usage_masks:
mask = getattr(
enums.CryptographicUsageMask,
cryptographic_usage_mask,
None
)
if mask:
masks.append(mask)
else:
logger.error(
"Invalid cryptographic usage mask provided: {}".format(
cryptographic_usage_mask
)
)
sys.exit(-7)
attributes.append(
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
masks
)
)
if unique_identifier:
attributes.append(
attribute_factory.create_attribute(

View File

@ -43,6 +43,7 @@ if __name__ == '__main__':
object_type = opts.object_type
cryptographic_algorithm = opts.cryptographic_algorithm
cryptographic_length = opts.cryptographic_length
cryptographic_usage_masks = opts.cryptographic_usage_masks
unique_identifier = opts.unique_identifier
operation_policy_name = opts.operation_policy_name
@ -174,6 +175,29 @@ if __name__ == '__main__':
)
client.close()
sys.exit(-6)
if cryptographic_usage_masks:
masks = []
for cryptographic_usage_mask in cryptographic_usage_masks:
mask = getattr(
enums.CryptographicUsageMask,
cryptographic_usage_mask,
None
)
if mask:
masks.append(mask)
else:
logger.error(
"Invalid cryptographic usage mask provided: {}".format(
cryptographic_usage_mask
)
)
sys.exit(-7)
attributes.append(
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
masks
)
)
if unique_identifier:
attributes.append(
attribute_factory.create_attribute(

View File

@ -303,6 +303,20 @@ def build_cli_parser(operation=None):
dest="cryptographic_length",
help="The cryptographic length of the secret (e.g., 128, 2048)"
)
parser.add_option(
"--cryptographic-usage-mask",
action="append",
type="str",
default=[],
dest="cryptographic_usage_masks",
help=(
"The cryptographic usage mask(s) the secret should have set "
"(e.g., ENCRYPT, DECRYPT). Use multiple times to specify "
"multiple cryptographic usage mask enumeration values. All "
"values will get combined into a single mask when sent to the "
"server."
)
)
parser.add_option(
"-i",
"--unique-identifier",

View File

@ -1743,6 +1743,25 @@ class KmipEngine(object):
)
add_object = False
break
elif name == "Cryptographic Usage Mask":
value = value.value
mask_values = enums.get_enumerations_from_bit_mask(
enums.CryptographicUsageMask,
value
)
for mask_value in mask_values:
if mask_value not in attribute:
self._logger.debug(
"Failed match: "
"the specified cryptographic usage mask "
"({}) is not set on the object.".format(
mask_value.name
)
)
add_object = False
break
if not add_object:
break
elif name == enums.AttributeType.INITIAL_DATE.value:
initial_date["value"] = attribute
self._track_date_attributes(

View File

@ -1493,6 +1493,55 @@ class TestIntegration(testtools.TestCase):
self.assertEqual(1, len(result.uuids))
self.assertIn(uid_a, result.uuids)
# Test locating keys using their cryptographic usage masks
mask = [enums.CryptographicUsageMask.ENCRYPT]
result = self.client.locate(
attributes=[
self.attr_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
mask
)
]
)
self.assertEqual(2, len(result.uuids))
self.assertIn(uid_a, result.uuids)
self.assertIn(uid_b, result.uuids)
mask.append(enums.CryptographicUsageMask.DECRYPT)
result = self.client.locate(
attributes=[
self.attr_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
mask
)
]
)
self.assertEqual(2, len(result.uuids))
self.assertIn(uid_a, result.uuids)
self.assertIn(uid_b, result.uuids)
mask.append(enums.CryptographicUsageMask.SIGN)
result = self.client.locate(
attributes=[
self.attr_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
mask
)
]
)
self.assertEqual(0, len(result.uuids))
mask = [enums.CryptographicUsageMask.EXPORT]
result = self.client.locate(
attributes=[
self.attr_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
mask
)
]
)
self.assertEqual(0, len(result.uuids))
# Clean up keys
result = self.client.destroy(uid_a)
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)

View File

@ -1148,6 +1148,55 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
self.assertEqual(1, len(result))
self.assertIn(a_id, result)
# Test locating keys using their cryptographic usage masks
mask = [enums.CryptographicUsageMask.ENCRYPT]
result = self.client.locate(
attributes=[
self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
mask
)
]
)
self.assertEqual(2, len(result))
self.assertIn(a_id, result)
self.assertIn(b_id, result)
mask.append(enums.CryptographicUsageMask.DECRYPT)
result = self.client.locate(
attributes=[
self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
mask
)
]
)
self.assertEqual(2, len(result))
self.assertIn(a_id, result)
self.assertIn(b_id, result)
mask.append(enums.CryptographicUsageMask.SIGN)
result = self.client.locate(
attributes=[
self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
mask
)
]
)
self.assertEqual(0, len(result))
mask = [enums.CryptographicUsageMask.EXPORT]
result = self.client.locate(
attributes=[
self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
mask
)
]
)
self.assertEqual(0, len(result))
# Clean up the keys
self.client.destroy(a_id)
self.client.destroy(b_id)

View File

@ -5163,6 +5163,129 @@ class TestKmipEngine(testtools.TestCase):
)
self.assertEqual(0, len(response_payload.unique_identifiers))
def test_locate_with_cryptographic_usage_masks(self):
"""
Test the Locate operation when 'Cryptographic Usage Mask' values are
given.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()
key = (
b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
)
obj_a = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
128,
key,
name='name1'
)
obj_a.cryptographic_usage_masks = [
enums.CryptographicUsageMask.EXPORT,
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
]
obj_b = pie_objects.SecretData(
key,
enums.SecretDataType.PASSWORD
)
obj_b.cryptographic_usage_masks = [
enums.CryptographicUsageMask.EXPORT
]
e._data_session.add(obj_a)
e._data_session.add(obj_b)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
id_a = str(obj_a.unique_identifier)
id_b = str(obj_b.unique_identifier)
attribute_factory = factory.AttributeFactory()
# Locate the objects based on their shared cryptographic usage masks.
attrs = [
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
[enums.CryptographicUsageMask.EXPORT]
)
]
payload = payloads.LocateRequestPayload(attributes=attrs)
e._logger.reset_mock()
response_payload = e._process_locate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call("Processing operation: Locate")
e._logger.debug.assert_any_call(
"Locate filter matched object: {}".format(id_a)
)
e._logger.debug.assert_any_call(
"Locate filter matched object: {}".format(id_b)
)
self.assertEqual(2, len(response_payload.unique_identifiers))
self.assertIn(id_a, response_payload.unique_identifiers)
self.assertIn(id_b, response_payload.unique_identifiers)
# Locate the symmetric key based on its unique cryptographic usage
# masks.
attrs = [
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
[
enums.CryptographicUsageMask.ENCRYPT
]
)
]
payload = payloads.LocateRequestPayload(attributes=attrs)
e._logger.reset_mock()
response_payload = e._process_locate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call("Processing operation: Locate")
e._logger.debug.assert_any_call(
"Locate filter matched object: {}".format(id_a)
)
e._logger.debug.assert_any_call(
"Failed match: the specified cryptographic usage mask (ENCRYPT) "
"is not set on the object."
)
self.assertEqual(1, len(response_payload.unique_identifiers))
self.assertIn(id_a, response_payload.unique_identifiers)
# Try to locate a non-existent object based on its unique cryptographic
# usage masks.
attrs = [
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
[
enums.CryptographicUsageMask.SIGN
]
)
]
payload = payloads.LocateRequestPayload(attributes=attrs)
e._logger.reset_mock()
response_payload = e._process_locate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call("Processing operation: Locate")
e._logger.debug.assert_any_call(
"Failed match: the specified cryptographic usage mask (SIGN) "
"is not set on the object."
)
e._logger.debug.assert_any_call(
"Failed match: the specified cryptographic usage mask (SIGN) "
"is not set on the object."
)
self.assertEqual(0, len(response_payload.unique_identifiers))
def test_locate_with_unique_identifier(self):
"""
Test the Locate operation when the 'Unique Identifier' attribute