diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index db0cc5c..06976b2 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -1155,6 +1155,7 @@ class KmipEngine(object): return managed_objects_allowed def _process_operation(self, operation, payload): + # TODO (peterhamilton) Alphabetize this. if operation == enums.Operation.CREATE: return self._process_create(payload) elif operation == enums.Operation.CREATE_KEY_PAIR: @@ -1189,6 +1190,8 @@ class KmipEngine(object): return self._process_decrypt(payload) elif operation == enums.Operation.SIGNATURE_VERIFY: return self._process_signature_verify(payload) + elif operation == enums.Operation.SET_ATTRIBUTE: + return self._process_set_attribute(payload) elif operation == enums.Operation.MAC: return self._process_mac(payload) elif operation == enums.Operation.SIGN: @@ -1549,6 +1552,54 @@ class KmipEngine(object): return response_payload + @_kmip_version_supported('2.0') + def _process_set_attribute(self, payload): + self._logger.info("Processing operation: SetAttribute") + + unique_identifier = self._id_placeholder + if payload.unique_identifier: + unique_identifier = payload.unique_identifier + + managed_object = self._get_object_with_access_controls( + unique_identifier, + enums.Operation.SET_ATTRIBUTE + ) + + attribute_name = enums.convert_attribute_tag_to_name( + payload.new_attribute.attribute.tag + ) + if self._attribute_policy.is_attribute_multivalued(attribute_name): + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.MULTI_VALUED_ATTRIBUTE, + message=( + "The '{}' attribute is multi-valued. Multi-valued " + "attributes cannot be set with the SetAttribute " + "operation.".format(attribute_name) + ) + ) + if not self._attribute_policy.is_attribute_modifiable_by_client( + attribute_name + ): + raise exceptions.KmipError( + status=enums.ResultStatus.OPERATION_FAILED, + reason=enums.ResultReason.READ_ONLY_ATTRIBUTE, + message=( + "The '{}' attribute is read-only and cannot be modified " + "by the client.".format(attribute_name) + ) + ) + + self._set_attributes_on_managed_object( + managed_object, + {attribute_name: payload.new_attribute.attribute} + ) + self._data_session.commit() + + return payloads.SetAttributeResponsePayload( + unique_identifier=unique_identifier + ) + @_kmip_version_supported('1.0') def _process_register(self, payload): self._logger.info("Processing operation: Register") diff --git a/kmip/services/server/policy.py b/kmip/services/server/policy.py index 9601f89..400ffdf 100644 --- a/kmip/services/server/policy.py +++ b/kmip/services/server/policy.py @@ -1155,6 +1155,20 @@ class AttributePolicy(object): rule_set = self._attribute_rule_sets.get(attribute) return rule_set.deletable_by_client + def is_attribute_modifiable_by_client(self, attribute): + """ + Check if the attribute can be modified by the client. + + Args: + attribute (string): The name of the attribute (e.g., "Name"). + + Returns: + bool: True if the attribute can be modified by the client. False + otherwise. + """ + rule_set = self._attribute_rule_sets.get(attribute) + return rule_set.modifiable_by_client + def is_attribute_applicable_to_object_type(self, attribute, object_type): """ Check if the attribute is supported by the given object type. diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 1582e6f..965e0f5 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -722,6 +722,7 @@ class TestKmipEngine(testtools.TestCase): e._process_signature_verify = mock.MagicMock() e._process_mac = mock.MagicMock() e._process_sign = mock.MagicMock() + e._process_set_attribute = mock.MagicMock() e._process_operation(enums.Operation.CREATE, None) e._process_operation(enums.Operation.CREATE_KEY_PAIR, None) @@ -742,6 +743,7 @@ class TestKmipEngine(testtools.TestCase): e._process_operation(enums.Operation.SIGN, None) e._process_operation(enums.Operation.SIGNATURE_VERIFY, None) e._process_operation(enums.Operation.MAC, None) + e._process_operation(enums.Operation.SET_ATTRIBUTE, None) e._process_create.assert_called_with(None) e._process_create_key_pair.assert_called_with(None) @@ -761,6 +763,7 @@ class TestKmipEngine(testtools.TestCase): e._process_decrypt.assert_called_with(None) e._process_signature_verify.assert_called_with(None) e._process_mac.assert_called_with(None) + e._process_set_attribute.assert_called_with(None) def test_unsupported_operation(self): """ @@ -4122,6 +4125,154 @@ class TestKmipEngine(testtools.TestCase): *args ) + def test_set_attribute(self): + """ + Test that a SetAttribute request can be processed correctly. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + # Confirm that the attribute is set to its default value by + # fetching the managed object fresh from the database and + # checking it. + managed_object = e._get_object_with_access_controls( + "1", + enums.Operation.SET_ATTRIBUTE + ) + self.assertFalse(managed_object.sensitive) + + payload = payloads.SetAttributeRequestPayload( + unique_identifier="1", + new_attribute=objects.NewAttribute( + attribute=primitives.Boolean( + value=True, + tag=enums.Tags.SENSITIVE + ) + ) + ) + + response_payload = e._process_set_attribute(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: SetAttribute" + ) + self.assertEqual( + "1", + response_payload.unique_identifier + ) + + # Confirm that the attribute was actually set by fetching the + # managed object fresh from the database and checking it. + managed_object = e._get_object_with_access_controls( + response_payload.unique_identifier, + enums.Operation.SET_ATTRIBUTE + ) + self.assertTrue(managed_object.sensitive) + + def test_set_attribute_with_multivalued_attribute(self): + """ + Test that a KmipError is raised when attempting to set the value of + a multivalued attribute. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + args = ( + payloads.SetAttributeRequestPayload( + unique_identifier="1", + new_attribute=objects.NewAttribute( + attribute=primitives.TextString( + value="New Name", + tag=enums.Tags.NAME + ) + ) + ), + ) + + self.assertRaisesRegex( + exceptions.KmipError, + "The 'Name' attribute is multi-valued. Multi-valued attributes " + "cannot be set with the SetAttribute operation.", + e._process_set_attribute, + *args + ) + + def test_set_attribute_with_non_client_modifiable_attribute(self): + """ + Test that a KmipError is raised when attempting to set the value of + a attribute not modifiable by the client. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + args = ( + payloads.SetAttributeRequestPayload( + unique_identifier="1", + new_attribute=objects.NewAttribute( + attribute=primitives.Enumeration( + enums.CryptographicAlgorithm, + enums.CryptographicAlgorithm.RSA, + enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ) + ), + ) + + self.assertRaisesRegex( + exceptions.KmipError, + "The 'Cryptographic Algorithm' attribute is read-only and cannot " + "be modified by the client.", + e._process_set_attribute, + *args + ) + def test_register(self): """ Test that a Register request can be processed correctly. diff --git a/kmip/tests/unit/services/server/test_policy.py b/kmip/tests/unit/services/server/test_policy.py index 838c0da..fd17e3a 100644 --- a/kmip/tests/unit/services/server/test_policy.py +++ b/kmip/tests/unit/services/server/test_policy.py @@ -90,6 +90,20 @@ class TestAttributePolicy(testtools.TestCase): rules.is_attribute_deletable_by_client("Contact Information") ) + def test_is_attribute_modifiable_by_client(self): + """ + Test that is_attribute_modifiable_by_client returns the expected + results in all cases. + """ + rules = policy.AttributePolicy(contents.ProtocolVersion(1, 0)) + + self.assertFalse( + rules.is_attribute_modifiable_by_client("Unique Identifier") + ) + self.assertTrue( + rules.is_attribute_modifiable_by_client("Name") + ) + def test_is_attribute_applicable_to_object_type(self): """ Test that is_attribute_applicable_to_object_type returns the