Add DeleteAttribute support to the server

This change adds DeleteAttribute operation support to the PyKMIP
server, supporting functionality unique to KMIP 1.0 - 1.4 and the
newer KMIP 2.0. Due to the current list of attributes supported
by the server, only multivalued attributes can currently be
deleted from a stored KMIP object. Over a dozen unit tests have
been added to verify the functionality of the new additions.

Partially implements #547
This commit is contained in:
Peter Hamilton 2019-11-08 16:45:13 -05:00 committed by Peter Hamilton
parent e48aff7b9a
commit 676aaf5e72
5 changed files with 920 additions and 4 deletions

View File

@ -97,7 +97,8 @@ class ManagedObject(sql.Base):
_names = sqlalchemy.orm.relationship(
"ManagedObjectName",
back_populates="mo",
cascade="all, delete-orphan"
cascade="all, delete-orphan",
order_by="ManagedObjectName.id"
)
names = association_proxy('_names', 'name')
operation_policy_name = Column(
@ -112,12 +113,14 @@ class ManagedObject(sql.Base):
"ApplicationSpecificInformation",
secondary=app_specific_info_map,
back_populates="managed_objects",
order_by="ApplicationSpecificInformation.id",
passive_deletes=True
)
object_groups = sqlalchemy.orm.relationship(
"ObjectGroup",
secondary=object_group_map,
back_populates="managed_objects",
order_by="ObjectGroup.id",
passive_deletes=True
)

View File

@ -843,6 +843,86 @@ class KmipEngine(object):
"The {0} attribute is unsupported.".format(attribute_name)
)
def _delete_attribute_from_managed_object(self, managed_object, attribute):
attribute_name, attribute_index, attribute_value = attribute
object_type = managed_object._object_type
if not self._attribute_policy.is_attribute_applicable_to_object_type(
attribute_name,
object_type
):
raise exceptions.ItemNotFound(
"The '{}' attribute is not applicable to '{}' objects.".format(
attribute_name,
''.join(
[x.capitalize() for x in object_type.name.split('_')]
)
)
)
if not self._attribute_policy.is_attribute_deletable_by_client(
attribute_name
):
raise exceptions.PermissionDenied(
"Cannot delete a required attribute."
)
if self._attribute_policy.is_attribute_multivalued(attribute_name):
# Get the specific attribute collection and attribute objects.
attribute_list = []
if attribute_name == "Name":
attribute_list = managed_object.names
if attribute_value is not None:
attribute_value = attribute_value.value
elif attribute_name == "Application Specific Information":
attribute_list = managed_object.app_specific_info
if attribute_value is not None:
namespace = attribute_value.application_namespace
attribute_value = objects.ApplicationSpecificInformation(
application_namespace=namespace,
application_data=attribute_value.application_data
)
elif attribute_name == "Object Group":
attribute_list = managed_object.object_groups
if attribute_value is not None:
attribute_value = objects.ObjectGroup(
object_group=attribute_value.value
)
else:
raise exceptions.InvalidField(
"The '{}' attribute is not supported.".format(
attribute_name
)
)
# Generically handle attribute deletion.
if attribute_value:
if attribute_list.count(attribute_value):
attribute_list.remove(attribute_value)
else:
raise exceptions.ItemNotFound(
"Could not locate the attribute instance with the "
"specified value: {}".format(attribute_value)
)
elif attribute_index is not None:
if attribute_index < len(attribute_list):
attribute_list.pop(attribute_index)
else:
raise exceptions.ItemNotFound(
"Could not locate the attribute instance with the "
"specified index: {}".format(attribute_index)
)
else:
# If no attribute index is provided, this is not a KMIP
# 1.* request. If no attribute value is provided, this
# must be a KMIP 2.0 attribute reference request, so
# delete all instances of the attribute.
attribute_list[:] = []
else:
# The server does not currently support any single-instance,
# client deletable attributes.
raise exceptions.InvalidField(
"The '{}' attribute is not supported.".format(attribute_name)
)
def _is_allowed_by_operation_policy(
self,
policy_name,
@ -1075,6 +1155,8 @@ class KmipEngine(object):
return self._process_create(payload)
elif operation == enums.Operation.CREATE_KEY_PAIR:
return self._process_create_key_pair(payload)
elif operation == enums.Operation.DELETE_ATTRIBUTE:
return self._process_delete_attribute(payload)
elif operation == enums.Operation.REGISTER:
return self._process_register(payload)
elif operation == enums.Operation.DERIVE_KEY:
@ -1378,6 +1460,91 @@ class KmipEngine(object):
self._id_placeholder = str(private_key.unique_identifier)
return response_payload
@_kmip_version_supported('1.0')
def _process_delete_attribute(self, payload):
self._logger.info("Processing operation: DeleteAttribute")
unique_identifier = self._id_placeholder
if payload.unique_identifier:
unique_identifier = payload.unique_identifier
managed_object = self._get_object_with_access_controls(
unique_identifier,
enums.Operation.DELETE_ATTRIBUTE
)
deleted_attribute = None
attribute_name = None
attribute_index = None
attribute_value = None
if self._protocol_version >= contents.ProtocolVersion(2, 0):
# If the current attribute is defined, use that. Otherwise, use
# the attribute reference.
if payload.current_attribute:
try:
attribute_name = enums.convert_attribute_tag_to_name(
payload.current_attribute.attribute.tag
)
attribute_value = payload.current_attribute.attribute
except ValueError as e:
self._logger.exception(e)
raise exceptions.ItemNotFound(
"No attribute with the specified name exists."
)
elif payload.attribute_reference:
attribute_name = payload.attribute_reference.attribute_name
else:
raise exceptions.InvalidMessage(
"The DeleteAttribute request must specify the current "
"attribute or an attribute reference."
)
else:
# Build a partial attribute from the attribute name and index.
if payload.attribute_name:
attribute_name = payload.attribute_name
else:
raise exceptions.InvalidMessage(
"The DeleteAttribute request must specify the attribute "
"name."
)
if payload.attribute_index:
attribute_index = payload.attribute_index
else:
attribute_index = 0
# Grab a copy of the attribute before deleting it.
existing_attributes = self._get_attributes_from_managed_object(
managed_object,
[payload.attribute_name]
)
if len(existing_attributes) > 0:
if not attribute_index:
deleted_attribute = existing_attributes[0]
else:
if attribute_index < len(existing_attributes):
deleted_attribute = existing_attributes[
attribute_index
]
else:
raise exceptions.ItemNotFound(
"Could not locate the attribute instance with the "
"specified index: {}".format(attribute_index)
)
self._delete_attribute_from_managed_object(
managed_object,
(attribute_name, attribute_index, attribute_value)
)
self._data_session.commit()
response_payload = payloads.DeleteAttributeResponsePayload(
unique_identifier=unique_identifier,
attribute=deleted_attribute
)
return response_payload
@_kmip_version_supported('1.0')
def _process_register(self, payload):
self._logger.info("Processing operation: Register")

View File

@ -129,6 +129,7 @@ class AttributePolicy(object):
"""
self._version = version
# TODO (peterhamilton) Alphabetize these
self._attribute_rule_sets = {
'Unique Identifier': AttributeRuleSet(
True,
@ -872,9 +873,9 @@ class AttributePolicy(object):
'Object Group': AttributeRuleSet(
False,
('server', 'client'),
False,
False,
False,
True,
True,
True,
True,
(
enums.Operation.CREATE,
@ -1116,6 +1117,20 @@ class AttributePolicy(object):
else:
return False
def is_attribute_deletable_by_client(self, attribute):
"""
Check if the attribute can be deleted by the client.
Args:
attribute (string): The name of the attribute (e.g., "Name").
Returns:
bool: True if the attribute can be deleted by the client. False
otherwise.
"""
rule_set = self._attribute_rule_sets.get(attribute)
return rule_set.deletable_by_client
def is_attribute_applicable_to_object_type(self, attribute, object_type):
"""
Check if the attribute is supported by the given object type.

View File

@ -702,8 +702,11 @@ class TestKmipEngine(testtools.TestCase):
e = engine.KmipEngine()
e._logger = mock.MagicMock()
# TODO (peterhamilton) Alphatize these.
e._process_create = mock.MagicMock()
e._process_create_key_pair = mock.MagicMock()
e._process_delete_attribute = mock.MagicMock()
e._process_derive_key = mock.MagicMock()
e._process_register = mock.MagicMock()
e._process_locate = mock.MagicMock()
e._process_get = mock.MagicMock()
@ -722,6 +725,8 @@ class TestKmipEngine(testtools.TestCase):
e._process_operation(enums.Operation.CREATE, None)
e._process_operation(enums.Operation.CREATE_KEY_PAIR, None)
e._process_operation(enums.Operation.DELETE_ATTRIBUTE, None)
e._process_operation(enums.Operation.DERIVE_KEY, None)
e._process_operation(enums.Operation.REGISTER, None)
e._process_operation(enums.Operation.LOCATE, None)
e._process_operation(enums.Operation.GET, None)
@ -740,6 +745,8 @@ class TestKmipEngine(testtools.TestCase):
e._process_create.assert_called_with(None)
e._process_create_key_pair.assert_called_with(None)
e._process_delete_attribute.assert_called_with(None)
e._process_derive_key.assert_called_with(None)
e._process_register.assert_called_with(None)
e._process_locate.assert_called_with(None)
e._process_get.assert_called_with(None)
@ -1918,6 +1925,343 @@ class TestKmipEngine(testtools.TestCase):
*args
)
def test_delete_attribute_from_managed_object(self):
"""
Test that various attributes can be deleted correctly from a given
managed object.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
attribute_factory = factory.AttributeFactory()
name_1 = attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
"Name 1",
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
name_2 = attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
"Name 2",
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
app_specific_info_1 = attribute_factory.create_attribute(
enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION,
{
"application_namespace": "Namespace 1",
"application_data": "Data 1"
}
)
app_specific_info_2 = attribute_factory.create_attribute(
enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION,
{
"application_namespace": "Namespace 2",
"application_data": "Data 2"
}
)
object_group_1 = attribute_factory.create_attribute(
enums.AttributeType.OBJECT_GROUP,
"Object Group 1"
)
object_group_2 = attribute_factory.create_attribute(
enums.AttributeType.OBJECT_GROUP,
"Object Group 2"
)
managed_object = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
managed_object.names.clear()
self.assertEqual(0, len(managed_object.names))
self.assertEqual(0, len(managed_object.app_specific_info))
self.assertEqual(0, len(managed_object.object_groups))
e._set_attribute_on_managed_object(
managed_object,
(
"Name",
[
name_1.attribute_value,
name_2.attribute_value
]
)
)
e._set_attribute_on_managed_object(
managed_object,
(
"Application Specific Information",
[
app_specific_info_1.attribute_value,
app_specific_info_2.attribute_value
]
)
)
e._set_attribute_on_managed_object(
managed_object,
(
"Object Group",
[
object_group_1.attribute_value,
object_group_2.attribute_value
]
)
)
self.assertEqual(2, len(managed_object.names))
self.assertEqual(2, len(managed_object.app_specific_info))
self.assertEqual(2, len(managed_object.object_groups))
e._delete_attribute_from_managed_object(
managed_object,
(
"Application Specific Information",
0,
None
)
)
self.assertEqual(1, len(managed_object.app_specific_info))
e._delete_attribute_from_managed_object(
managed_object,
(
"Application Specific Information",
0,
app_specific_info_2.attribute_value
)
)
self.assertEqual(0, len(managed_object.app_specific_info))
e._delete_attribute_from_managed_object(
managed_object,
(
"Name",
None,
primitives.TextString(value="Name 2", tag=enums.Tags.NAME)
)
)
self.assertEqual(1, len(managed_object.names))
self.assertEqual("Name 1", managed_object.names[0])
e._delete_attribute_from_managed_object(
managed_object,
(
"Object Group",
None,
None
)
)
self.assertEqual(0, len(managed_object.object_groups))
e._set_attribute_on_managed_object(
managed_object,
(
"Object Group",
[object_group_1.attribute_value]
)
)
self.assertEqual(1, len(managed_object.object_groups))
e._delete_attribute_from_managed_object(
managed_object,
(
"Object Group",
None,
primitives.TextString(
value="Object Group 1",
tag=enums.Tags.OBJECT_GROUP
)
)
)
self.assertEqual(0, len(managed_object.object_groups))
def test_delete_attribute_from_managed_object_unsupported_attribute(self):
"""
Test that an ItemNotFound error is raised when attempting to delete an
unsupported attribute from a managed object.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
managed_object = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
args = (managed_object, ("Digital Signature Algorithm", None, None))
self.assertRaisesRegex(
exceptions.ItemNotFound,
"The 'Digital Signature Algorithm' attribute is not applicable "
"to 'SymmetricKey' objects.",
e._delete_attribute_from_managed_object,
*args
)
def test_delete_attribute_from_managed_object_undeletable_attribute(self):
"""
Test that a PermissionDenied error is raised when attempting to delete
a required attribute from a managed object.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
managed_object = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
args = (managed_object, ("Cryptographic Algorithm", None, None))
self.assertRaisesRegex(
exceptions.PermissionDenied,
"Cannot delete a required attribute.",
e._delete_attribute_from_managed_object,
*args
)
def test_delete_attribute_from_managed_object_unsupported_multivalue(self):
"""
Test that an InvalidField error is raised when attempting to delete an
unsupported multivalued attribute.
"""
# TODO (peterhamilton) Remove this test once all multivalued attributes
# are supported.
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
managed_object = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
args = (managed_object, ("Link", None, None))
self.assertRaisesRegex(
exceptions.InvalidField,
"The 'Link' attribute is not supported.",
e._delete_attribute_from_managed_object,
*args
)
def test_delete_attribute_from_managed_object_bad_attribute_value(self):
"""
Test that an ItemNotFound error is raised when attempting to delete
an attribute by value that cannot be found on a managed object.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
managed_object = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
args = (
managed_object,
(
"Object Group",
None,
primitives.TextString(
value="invalid",
tag=enums.Tags.OBJECT_GROUP
)
)
)
self.assertRaisesRegex(
exceptions.ItemNotFound,
"Could not locate the attribute instance with the specified "
"value: {'object_group': 'invalid'}",
e._delete_attribute_from_managed_object,
*args
)
def test_delete_attribute_from_managed_object_bad_attribute_index(self):
"""
Test that an ItemNotFound error is raised when attempting to delete
an attribute by index that cannot be found on a managed object.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
managed_object = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
args = (
managed_object,
(
"Object Group",
3,
None
)
)
self.assertRaisesRegex(
exceptions.ItemNotFound,
"Could not locate the attribute instance with the specified "
"index: 3",
e._delete_attribute_from_managed_object,
*args
)
def test_delete_attribute_from_managed_object_bad_single_value(self):
"""
Test that an InvalidField error is raised when attempting to delete
a single-valued attribute from a managed object.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
managed_object = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
args = (
managed_object,
(
"Contact Information",
None,
None
)
)
self.assertRaisesRegex(
exceptions.InvalidField,
"The 'Contact Information' attribute is not supported",
e._delete_attribute_from_managed_object,
*args
)
def test_is_allowed_by_operation_policy_granted(self):
"""
Test that access granted by operation policy is processed correctly.
@ -3393,6 +3737,379 @@ class TestKmipEngine(testtools.TestCase):
)
e._logger.reset_mock()
def test_delete_attribute(self):
"""
Test that a DeleteAttribute request can be processed correctly.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()
attribute_factory = factory.AttributeFactory()
secret = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
object_group_1 = attribute_factory.create_attribute(
enums.AttributeType.OBJECT_GROUP,
"Object Group 1"
)
object_group_2 = attribute_factory.create_attribute(
enums.AttributeType.OBJECT_GROUP,
"Object Group 2"
)
e._data_session.add(secret)
e._set_attribute_on_managed_object(
secret,
(
"Object Group",
[
object_group_1.attribute_value,
object_group_2.attribute_value
]
)
)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
# Confirm that the attribute was actually added by fetching the
# managed object fresh from the database and checking it.
managed_object = e._get_object_with_access_controls(
"1",
enums.Operation.DELETE_ATTRIBUTE
)
self.assertEqual(2, len(managed_object.object_groups))
payload = payloads.DeleteAttributeRequestPayload(
unique_identifier="1",
attribute_name="Object Group",
attribute_index=1
)
response_payload = e._process_delete_attribute(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: DeleteAttribute"
)
self.assertEqual(
"1",
response_payload.unique_identifier
)
self.assertEqual(
attribute_factory.create_attribute(
enums.AttributeType.OBJECT_GROUP,
"Object Group 2",
1
),
response_payload.attribute
)
# Confirm that the attribute was actually deleted by fetching the
# managed object fresh from the database and checking it.
managed_object = e._get_object_with_access_controls(
response_payload.unique_identifier,
enums.Operation.DELETE_ATTRIBUTE
)
self.assertEqual(1, len(managed_object.object_groups))
payload = payloads.DeleteAttributeRequestPayload(
unique_identifier="1",
attribute_name="Object Group"
)
response_payload = e._process_delete_attribute(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: DeleteAttribute"
)
self.assertEqual(
"1",
response_payload.unique_identifier
)
self.assertEqual(
attribute_factory.create_attribute(
enums.AttributeType.OBJECT_GROUP,
"Object Group 1",
0
),
response_payload.attribute
)
# Confirm that the attribute was actually deleted by fetching the
# managed object fresh from the database and checking it.
managed_object = e._get_object_with_access_controls(
response_payload.unique_identifier,
enums.Operation.DELETE_ATTRIBUTE
)
self.assertEqual(0, len(managed_object.object_groups))
def test_delete_attribute_with_kmip_2_0(self):
"""
Test that a DeleteAttribute request can be processed correctly
when using KMIP 2.0 payload features.
"""
e = engine.KmipEngine()
e._protocol_version = contents.ProtocolVersion(2, 0)
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()
attribute_factory = factory.AttributeFactory()
secret = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
object_group = attribute_factory.create_attribute(
enums.AttributeType.OBJECT_GROUP,
"Object Group 1"
)
e._data_session.add(secret)
e._set_attribute_on_managed_object(
secret,
(
"Object Group",
[object_group.attribute_value]
)
)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
# Confirm that the attribute was actually added by fetching the
# managed object fresh from the database and checking it.
managed_object = e._get_object_with_access_controls(
"1",
enums.Operation.DELETE_ATTRIBUTE
)
self.assertEqual(1, len(managed_object.names))
self.assertEqual(1, len(managed_object.object_groups))
payload = payloads.DeleteAttributeRequestPayload(
unique_identifier="1",
current_attribute=objects.CurrentAttribute(
attribute=primitives.TextString(
value="Object Group 1",
tag=enums.Tags.OBJECT_GROUP
)
),
attribute_reference=objects.AttributeReference(
vendor_identification="Vendor 1",
attribute_name="Object Group"
)
)
response_payload = e._process_delete_attribute(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: DeleteAttribute"
)
self.assertEqual(
"1",
response_payload.unique_identifier
)
self.assertIsNone(response_payload.attribute)
# Confirm that the attribute was actually deleted by fetching the
# managed object fresh from the database and checking it.
managed_object = e._get_object_with_access_controls(
response_payload.unique_identifier,
enums.Operation.DELETE_ATTRIBUTE
)
self.assertEqual(0, len(managed_object.object_groups))
payload = payloads.DeleteAttributeRequestPayload(
unique_identifier="1",
attribute_reference=objects.AttributeReference(
vendor_identification="Vendor 1",
attribute_name="Name"
)
)
response_payload = e._process_delete_attribute(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: DeleteAttribute"
)
self.assertEqual(
"1",
response_payload.unique_identifier
)
self.assertIsNone(response_payload.attribute)
# Confirm that the attribute was actually deleted by fetching the
# managed object fresh from the database and checking it.
managed_object = e._get_object_with_access_controls(
response_payload.unique_identifier,
enums.Operation.DELETE_ATTRIBUTE
)
self.assertEqual(0, len(managed_object.names))
def test_delete_attribute_with_invalid_current_attribute(self):
"""
Test that an ItemNotFound error is raised when attempting to delete
an invalid current attribute from a managed object.
"""
e = engine.KmipEngine()
e._protocol_version = contents.ProtocolVersion(2, 0)
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()
secret = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
e._data_session.add(secret)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
current_attribute = objects.CurrentAttribute()
current_attribute._attribute = primitives.TextString(
value="Object Group 1",
tag=enums.Tags.CURRENT_ATTRIBUTE
)
args = (
payloads.DeleteAttributeRequestPayload(
unique_identifier="1",
current_attribute=current_attribute
),
)
self.assertRaisesRegex(
exceptions.ItemNotFound,
"No attribute with the specified name exists.",
e._process_delete_attribute,
*args
)
def test_delete_attribute_with_missing_current_attribute_reference(self):
"""
Test that an InvalidMessage error is raised when attempting to delete
an attribute without specifying a current attribute or an attribute
reference (under KMIP 2.0+).
"""
e = engine.KmipEngine()
e._protocol_version = contents.ProtocolVersion(2, 0)
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()
secret = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
e._data_session.add(secret)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
args = (
payloads.DeleteAttributeRequestPayload(unique_identifier="1"),
)
self.assertRaisesRegex(
exceptions.InvalidMessage,
"The DeleteAttribute request must specify the current attribute "
"or an attribute reference.",
e._process_delete_attribute,
*args
)
def test_delete_attribute_with_missing_attribute_name(self):
"""
Test that an InvalidMessage error is raised when attempting to delete
an attribute without specifying the attribute name (under KMIP 1.0+).
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()
secret = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
e._data_session.add(secret)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
args = (
payloads.DeleteAttributeRequestPayload(unique_identifier="1"),
)
self.assertRaisesRegex(
exceptions.InvalidMessage,
"The DeleteAttribute request must specify the attribute name.",
e._process_delete_attribute,
*args
)
def test_delete_attribute_with_invalid_attribute_index(self):
"""
Test that an ItemNotFound error is raised when attempting to delete
an attribute when specifying an invalid attribute index.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()
secret = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
e._data_session.add(secret)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
args = (
payloads.DeleteAttributeRequestPayload(
unique_identifier="1",
attribute_name="Name",
attribute_index=20),
)
self.assertRaisesRegex(
exceptions.ItemNotFound,
"Could not locate the attribute instance with the specified "
"index: 20",
e._process_delete_attribute,
*args
)
def test_register(self):
"""
Test that a Register request can be processed correctly.

View File

@ -76,6 +76,20 @@ class TestAttributePolicy(testtools.TestCase):
result = rules.is_attribute_deprecated(attribute_b)
self.assertTrue(result)
def test_is_attribute_deletable_by_client(self):
"""
Test that is_attribute_deletable_by_client returns the expected
results in all cases.
"""
rules = policy.AttributePolicy(contents.ProtocolVersion(1, 0))
self.assertFalse(
rules.is_attribute_deletable_by_client("Cryptographic Algorithm")
)
self.assertTrue(
rules.is_attribute_deletable_by_client("Contact Information")
)
def test_is_attribute_applicable_to_object_type(self):
"""
Test that is_attribute_applicable_to_object_type returns the