From 3be219144ae8621c913f088fa1f4f2840e8e9780 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Fri, 22 Nov 2019 16:01:21 -0500 Subject: [PATCH] Add SetAttribute support to the server This change adds SetAttribute operation support to the PyKMIP server, including additional attribute policy functionality to check for certain attribute characteristics that preclude SetAttribute operation functionality. Specifically, the operation cannot set the value of any multivalued attribute nor the value of any attribute not modifiable by the client. New unit tests have been added to cover these changes. Partially implements #547 --- kmip/services/server/engine.py | 51 ++++++ kmip/services/server/policy.py | 14 ++ .../tests/unit/services/server/test_engine.py | 151 ++++++++++++++++++ .../tests/unit/services/server/test_policy.py | 14 ++ 4 files changed, 230 insertions(+) diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index db0cc5c..06976b2 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -1155,6 +1155,7 @@ class KmipEngine(object): return managed_objects_allowed def _process_operation(self, operation, payload): + # TODO (peterhamilton) Alphabetize this. if operation == enums.Operation.CREATE: return self._process_create(payload) elif operation == enums.Operation.CREATE_KEY_PAIR: @@ -1189,6 +1190,8 @@ class KmipEngine(object): return self._process_decrypt(payload) elif operation == enums.Operation.SIGNATURE_VERIFY: return self._process_signature_verify(payload) + elif operation == enums.Operation.SET_ATTRIBUTE: + return self._process_set_attribute(payload) elif operation == enums.Operation.MAC: return self._process_mac(payload) elif operation == enums.Operation.SIGN: @@ -1549,6 +1552,54 @@ class KmipEngine(object): return response_payload + @_kmip_version_supported('2.0') + def _process_set_attribute(self, payload): + self._logger.info("Processing operation: SetAttribute") + + unique_identifier = self._id_placeholder + if payload.unique_identifier: + unique_identifier = payload.unique_identifier + + managed_object = self._get_object_with_access_controls( + unique_identifier, + enums.Operation.SET_ATTRIBUTE + ) + + attribute_name = enums.convert_attribute_tag_to_name( + payload.new_attribute.attribute.tag + ) + if self._attribute_policy.is_attribute_multivalued(attribute_name): + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.MULTI_VALUED_ATTRIBUTE, + message=( + "The '{}' attribute is multi-valued. Multi-valued " + "attributes cannot be set with the SetAttribute " + "operation.".format(attribute_name) + ) + ) + if not self._attribute_policy.is_attribute_modifiable_by_client( + attribute_name + ): + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.READ_ONLY_ATTRIBUTE, + message=( + "The '{}' attribute is read-only and cannot be modified " + "by the client.".format(attribute_name) + ) + ) + + self._set_attributes_on_managed_object( + managed_object, + {attribute_name: payload.new_attribute.attribute} + ) + self._data_session.commit() + + return payloads.SetAttributeResponsePayload( + unique_identifier=unique_identifier + ) + @_kmip_version_supported('1.0') def _process_register(self, payload): self._logger.info("Processing operation: Register") diff --git a/kmip/services/server/policy.py b/kmip/services/server/policy.py index 9601f89..400ffdf 100644 --- a/kmip/services/server/policy.py +++ b/kmip/services/server/policy.py @@ -1155,6 +1155,20 @@ class AttributePolicy(object): rule_set = self._attribute_rule_sets.get(attribute) return rule_set.deletable_by_client + def is_attribute_modifiable_by_client(self, attribute): + """ + Check if the attribute can be modified by the client. + + Args: + attribute (string): The name of the attribute (e.g., "Name"). + + Returns: + bool: True if the attribute can be modified by the client. False + otherwise. + """ + rule_set = self._attribute_rule_sets.get(attribute) + return rule_set.modifiable_by_client + def is_attribute_applicable_to_object_type(self, attribute, object_type): """ Check if the attribute is supported by the given object type. diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 1582e6f..965e0f5 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -722,6 +722,7 @@ class TestKmipEngine(testtools.TestCase): e._process_signature_verify = mock.MagicMock() e._process_mac = mock.MagicMock() e._process_sign = mock.MagicMock() + e._process_set_attribute = mock.MagicMock() e._process_operation(enums.Operation.CREATE, None) e._process_operation(enums.Operation.CREATE_KEY_PAIR, None) @@ -742,6 +743,7 @@ class TestKmipEngine(testtools.TestCase): e._process_operation(enums.Operation.SIGN, None) e._process_operation(enums.Operation.SIGNATURE_VERIFY, None) e._process_operation(enums.Operation.MAC, None) + e._process_operation(enums.Operation.SET_ATTRIBUTE, None) e._process_create.assert_called_with(None) e._process_create_key_pair.assert_called_with(None) @@ -761,6 +763,7 @@ class TestKmipEngine(testtools.TestCase): e._process_decrypt.assert_called_with(None) e._process_signature_verify.assert_called_with(None) e._process_mac.assert_called_with(None) + e._process_set_attribute.assert_called_with(None) def test_unsupported_operation(self): """ @@ -4122,6 +4125,154 @@ class TestKmipEngine(testtools.TestCase): *args ) + def test_set_attribute(self): + """ + Test that a SetAttribute request can be processed correctly. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + # Confirm that the attribute is set to its default value by + # fetching the managed object fresh from the database and + # checking it. + managed_object = e._get_object_with_access_controls( + "1", + enums.Operation.SET_ATTRIBUTE + ) + self.assertFalse(managed_object.sensitive) + + payload = payloads.SetAttributeRequestPayload( + unique_identifier="1", + new_attribute=objects.NewAttribute( + attribute=primitives.Boolean( + value=True, + tag=enums.Tags.SENSITIVE + ) + ) + ) + + response_payload = e._process_set_attribute(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: SetAttribute" + ) + self.assertEqual( + "1", + response_payload.unique_identifier + ) + + # Confirm that the attribute was actually set by fetching the + # managed object fresh from the database and checking it. + managed_object = e._get_object_with_access_controls( + response_payload.unique_identifier, + enums.Operation.SET_ATTRIBUTE + ) + self.assertTrue(managed_object.sensitive) + + def test_set_attribute_with_multivalued_attribute(self): + """ + Test that a KmipError is raised when attempting to set the value of + a multivalued attribute. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + args = ( + payloads.SetAttributeRequestPayload( + unique_identifier="1", + new_attribute=objects.NewAttribute( + attribute=primitives.TextString( + value="New Name", + tag=enums.Tags.NAME + ) + ) + ), + ) + + self.assertRaisesRegex( + exceptions.KmipError, + "The 'Name' attribute is multi-valued. Multi-valued attributes " + "cannot be set with the SetAttribute operation.", + e._process_set_attribute, + *args + ) + + def test_set_attribute_with_non_client_modifiable_attribute(self): + """ + Test that a KmipError is raised when attempting to set the value of + a attribute not modifiable by the client. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + args = ( + payloads.SetAttributeRequestPayload( + unique_identifier="1", + new_attribute=objects.NewAttribute( + attribute=primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.RSA, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ) + ), + ) + + self.assertRaisesRegex( + exceptions.KmipError, + "The 'Cryptographic Algorithm' attribute is read-only and cannot " + "be modified by the client.", + e._process_set_attribute, + *args + ) + def test_register(self): """ Test that a Register request can be processed correctly. diff --git a/kmip/tests/unit/services/server/test_policy.py b/kmip/tests/unit/services/server/test_policy.py index 838c0da..fd17e3a 100644 --- a/kmip/tests/unit/services/server/test_policy.py +++ b/kmip/tests/unit/services/server/test_policy.py @@ -90,6 +90,20 @@ class TestAttributePolicy(testtools.TestCase): rules.is_attribute_deletable_by_client("Contact Information") ) + def test_is_attribute_modifiable_by_client(self): + """ + Test that is_attribute_modifiable_by_client returns the expected + results in all cases. + """ + rules = policy.AttributePolicy(contents.ProtocolVersion(1, 0)) + + self.assertFalse( + rules.is_attribute_modifiable_by_client("Unique Identifier") + ) + self.assertTrue( + rules.is_attribute_modifiable_by_client("Name") + ) + def test_is_attribute_applicable_to_object_type(self): """ Test that is_attribute_applicable_to_object_type returns the