diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 19008aa..097300d 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -30,6 +30,8 @@ from kmip.core import attributes from kmip.core import enums from kmip.core import exceptions +from kmip.core.objects import MACData + from kmip.core.factories import attributes as attribute_factory from kmip.core.factories import secrets @@ -46,6 +48,7 @@ from kmip.core.messages.payloads import get_attributes from kmip.core.messages.payloads import get_attribute_list from kmip.core.messages.payloads import query from kmip.core.messages.payloads import register +from kmip.core.messages.payloads import mac from kmip.core import misc @@ -949,6 +952,8 @@ class KmipEngine(object): return self._process_query(payload) elif operation == enums.Operation.DISCOVER_VERSIONS: return self._process_discover_versions(payload) + elif operation == enums.Operation.MAC: + return self._process_mac(payload) else: raise exceptions.OperationNotSupported( "{0} operation is not supported by the server.".format( @@ -1556,3 +1561,64 @@ class KmipEngine(object): ) return response_payload + + @_kmip_version_supported('1.2') + def _process_mac(self, payload): + self._logger.info("Processing operation: MAC") + + unique_identifier = self._id_placeholder + if payload.unique_identifier: + unique_identifier = payload.unique_identifier.value + + # TODO: Currently use the GET operation policy here to ensure only a + # user with read access to the secret can use it to compute a MAC + # value. However, the MAC operation's access should be controlled by + # cryptographic usage mask instead of operation policy. + managed_object = self._get_object_with_access_controls( + unique_identifier, + enums.Operation.GET + ) + + algorithm = None + if (payload.cryptographic_parameters and + payload.cryptographic_parameters.cryptographic_algorithm): + algorithm = \ + payload.cryptographic_parameters.cryptographic_algorithm.value + elif (isinstance(managed_object, objects.Key) and + managed_object.cryptographic_algorithm): + algorithm = managed_object.cryptographic_algorithm + else: + raise exceptions.InvalidField( + "The cryptographic algorithm must be specified " + "for the MAC operation" + ) + + key = None + if managed_object.value: + key = managed_object.value + else: + raise exceptions.InvalidField( + "A secret key value must be specified " + "for the MAC operation" + ) + + data = None + if payload.data: + data = payload.data.value + else: + raise exceptions.InvalidField( + "No data to be MACed" + ) + + result = self._cryptography_engine.mac( + algorithm, + key, + data + ) + + response_payload = mac.MACResponsePayload( + unique_identifier=attributes.UniqueIdentifier(unique_identifier), + mac_data=MACData(result) + ) + + return response_payload diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 059c15e..791fb45 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -48,6 +48,7 @@ from kmip.core.messages.payloads import get_attribute_list from kmip.core.messages.payloads import get_attributes from kmip.core.messages.payloads import query from kmip.core.messages.payloads import register +from kmip.core.messages.payloads import mac from kmip.pie import objects as pie_objects from kmip.pie import sqltypes @@ -4537,6 +4538,149 @@ class TestKmipEngine(testtools.TestCase): ) self.assertEqual([], result.protocol_versions) + def test_mac(self): + """ + Test that a MAC request can be processed correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + key = (b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00') + data = (b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A' + b'\x0B\x0C\x0D\x0E\x0F') + algorithm_a = enums.CryptographicAlgorithm.AES + algorithm_b = enums.CryptographicAlgorithm.HMAC_SHA512 + obj = pie_objects.SymmetricKey(algorithm_a, 128, key) + + e._data_session.add(obj) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + uuid = str(obj.unique_identifier) + + cryptographic_parameters = attributes.CryptographicParameters( + cryptographic_algorithm=attributes. + CryptographicAlgorithm(algorithm_b) + ) + + # Verify when cryptographic_parameters is specified in request + payload = mac.MACRequestPayload( + unique_identifier=attributes.UniqueIdentifier(uuid), + cryptographic_parameters=cryptographic_parameters, + data=objects.Data(data) + ) + + response_payload = e._process_mac(payload) + + e._logger.info.assert_any_call( + "Processing operation: MAC" + ) + e._cryptography_engine.logger.info.assert_any_call( + "Generating a hash-based message authentication code using {0}". + format(algorithm_b.name) + ) + e._cryptography_engine.logger.reset_mock() + self.assertEqual(str(uuid), response_payload.unique_identifier.value) + self.assertIsInstance(response_payload.mac_data, objects.MACData) + + # Verify when cryptographic_parameters is not specified in request + payload = mac.MACRequestPayload( + unique_identifier=attributes.UniqueIdentifier(uuid), + cryptographic_parameters=None, + data=objects.Data(data) + ) + + response_payload = e._process_mac(payload) + + e._cryptography_engine.logger.info.assert_any_call( + "Generating a cipher-based message authentication code using {0}". + format(algorithm_a.name) + ) + self.assertEqual(str(uuid), response_payload.unique_identifier.value) + self.assertIsInstance(response_payload.mac_data, objects.MACData) + + def test_mac_with_missing_fields(self): + """ + Test that the right errors are generated when required fields + are missing. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + key = (b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00') + data = (b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B' + b'\x0C\x0D\x0E\x0F') + algorithm = enums.CryptographicAlgorithm.AES + obj_no_key = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE) + obj_no_algorithm = pie_objects.OpaqueObject( + key, enums.OpaqueDataType.NONE) + + e._data_session.add(obj_no_key) + e._data_session.add(obj_no_algorithm) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + uuid_no_key = str(obj_no_key.unique_identifier) + uuid_no_algorithm = str(obj_no_algorithm.unique_identifier) + + cryptographic_parameters = attributes.CryptographicParameters( + cryptographic_algorithm=attributes. + CryptographicAlgorithm(algorithm)) + + payload_no_key = mac.MACRequestPayload( + unique_identifier=attributes.UniqueIdentifier(uuid_no_key), + cryptographic_parameters=cryptographic_parameters, + data=objects.Data(data) + ) + + args = (payload_no_key, ) + regex = "A secret key value must be specified" + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._process_mac, + *args + ) + + payload_no_algorithm = mac.MACRequestPayload( + unique_identifier=attributes.UniqueIdentifier(uuid_no_algorithm), + cryptographic_parameters=None, + data=objects.Data(data) + ) + + args = (payload_no_algorithm, ) + regex = "The cryptographic algorithm must be specified" + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._process_mac, + *args + ) + + payload_no_data = mac.MACRequestPayload( + unique_identifier=attributes.UniqueIdentifier(uuid_no_algorithm), + cryptographic_parameters=cryptographic_parameters, + data=None + ) + + args = (payload_no_data, ) + regex = "No data to be MACed" + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._process_mac, + *args + ) + def test_create_get_destroy(self): """ Test that a managed object can be created, retrieved, and destroyed