diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index dcdbc84..331a7f5 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -44,6 +44,7 @@ from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions +from kmip.core.messages.payloads import encrypt from kmip.core.messages.payloads import get from kmip.core.messages.payloads import get_attributes from kmip.core.messages.payloads import get_attribute_list @@ -980,6 +981,8 @@ class KmipEngine(object): return self._process_query(payload) elif operation == enums.Operation.DISCOVER_VERSIONS: return self._process_discover_versions(payload) + elif operation == enums.Operation.ENCRYPT: + return self._process_encrypt(payload) elif operation == enums.Operation.MAC: return self._process_mac(payload) else: @@ -1633,6 +1636,7 @@ class KmipEngine(object): ]) if self._protocol_version >= contents.ProtocolVersion.create(1, 2): operations.extend([ + contents.Operation(enums.Operation.ENCRYPT), contents.Operation(enums.Operation.MAC) ]) @@ -1679,6 +1683,69 @@ class KmipEngine(object): return response_payload + @_kmip_version_supported('1.2') + def _process_encrypt(self, payload): + self._logger.info("Processing operation: Encrypt") + + unique_identifier = self._id_placeholder + if payload.unique_identifier: + unique_identifier = payload.unique_identifier + + # The KMIP spec does not indicate that the Encrypt operation should + # have it's own operation policy entry. Rather, the cryptographic + # usage mask should be used to determine if the object can be used + # to encrypt data (see below). + managed_object = self._get_object_with_access_controls( + unique_identifier, + enums.Operation.GET + ) + + cryptographic_parameters = payload.cryptographic_parameters + if cryptographic_parameters is None: + # TODO (peter-hamilton): Pull the cryptographic parameters from + # the attributes associated with the encryption key. + raise exceptions.InvalidField( + "The cryptographic parameters must be specified." + ) + + # TODO (peter-hamilton): Check the usage limitations for the key to + # confirm that it can be used for this operation. + + if managed_object._object_type != enums.ObjectType.SYMMETRIC_KEY: + raise exceptions.PermissionDenied( + "The requested encryption key is not a symmetric key. " + "Only symmetric encryption is currently supported." + ) + + if managed_object.state != enums.State.ACTIVE: + raise exceptions.PermissionDenied( + "The encryption key must be in the Active state to be used " + "for encryption." + ) + + if enums.CryptographicUsageMask.ENCRYPT not in \ + managed_object.cryptographic_usage_masks: + raise exceptions.PermissionDenied( + "The Encrypt bit must be set in the encryption key's " + "cryptographic usage mask." + ) + + result = self._cryptography_engine.encrypt( + cryptographic_parameters.cryptographic_algorithm, + managed_object.value, + payload.data, + cipher_mode=cryptographic_parameters.block_cipher_mode, + padding_method=cryptographic_parameters.padding_method, + iv_nonce=payload.iv_counter_nonce + ) + + response_payload = encrypt.EncryptResponsePayload( + unique_identifier, + result.get('cipher_text'), + result.get('iv_nonce') + ) + return response_payload + @_kmip_version_supported('1.2') def _process_mac(self, payload): self._logger.info("Processing operation: MAC") diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index ad2b2bc..81708e7 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -45,6 +45,7 @@ from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions +from kmip.core.messages.payloads import encrypt from kmip.core.messages.payloads import get from kmip.core.messages.payloads import get_attribute_list from kmip.core.messages.payloads import get_attributes @@ -893,6 +894,7 @@ class TestKmipEngine(testtools.TestCase): e._process_destroy = mock.MagicMock() e._process_query = mock.MagicMock() e._process_discover_versions = mock.MagicMock() + e._process_encrypt = mock.MagicMock() e._process_operation(enums.Operation.CREATE, None) e._process_operation(enums.Operation.CREATE_KEY_PAIR, None) @@ -904,6 +906,7 @@ class TestKmipEngine(testtools.TestCase): e._process_operation(enums.Operation.DESTROY, None) e._process_operation(enums.Operation.QUERY, None) e._process_operation(enums.Operation.DISCOVER_VERSIONS, None) + e._process_operation(enums.Operation.ENCRYPT, None) e._process_create.assert_called_with(None) e._process_create_key_pair.assert_called_with(None) @@ -915,6 +918,7 @@ class TestKmipEngine(testtools.TestCase): e._process_destroy.assert_called_with(None) e._process_query.assert_called_with(None) e._process_discover_versions.assert_called_with(None) + e._process_encrypt.assert_called_with(None) def test_unsupported_operation(self): """ @@ -5004,7 +5008,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsNotNone(result.operations) - self.assertEqual(12, len(result.operations)) + self.assertEqual(13, len(result.operations)) self.assertEqual( enums.Operation.CREATE, result.operations[0].value @@ -5050,9 +5054,13 @@ class TestKmipEngine(testtools.TestCase): result.operations[10].value ) self.assertEqual( - enums.Operation.MAC, + enums.Operation.ENCRYPT, result.operations[11].value ) + self.assertEqual( + enums.Operation.MAC, + result.operations[12].value + ) self.assertEqual(list(), result.object_types) self.assertIsNotNone(result.vendor_identification) self.assertEqual( @@ -5129,6 +5137,352 @@ class TestKmipEngine(testtools.TestCase): ) self.assertEqual([], result.protocol_versions) + def test_encrypt(self): + """ + Test that an Encrypt request can be processed correctly. + + The test vectors used here come from Eric Young's test set for + Blowfish, via https://www.di-mgt.com.au/cryptopad.html. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + encryption_key = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.TRIPLE_DES, + 128, + ( + b'\x01\x23\x45\x67\x89\xAB\xCD\xEF' + b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87' + ), + [enums.CryptographicUsageMask.ENCRYPT] + ) + encryption_key.state = enums.State.ACTIVE + + e._data_session.add(encryption_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + unique_identifier = str(encryption_key.unique_identifier) + cryptographic_parameters = attributes.CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.CBC, + padding_method=enums.PaddingMethod.PKCS5, + cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH + ) + data = ( + b'\x37\x36\x35\x34\x33\x32\x31\x20' + b'\x4E\x6F\x77\x20\x69\x73\x20\x74' + b'\x68\x65\x20\x74\x69\x6D\x65\x20' + b'\x66\x6F\x72\x20\x00' + ) + iv_counter_nonce = b'\xFE\xDC\xBA\x98\x76\x54\x32\x10' + + payload = encrypt.EncryptRequestPayload( + unique_identifier, + cryptographic_parameters, + data, + iv_counter_nonce + ) + + response_payload = e._process_encrypt(payload) + + e._logger.info.assert_any_call("Processing operation: Encrypt") + self.assertEqual( + unique_identifier, + response_payload.unique_identifier + ) + self.assertEqual( + ( + b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6' + b'\x05\xB1\x56\xE2\x74\x03\x97\x93' + b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9' + b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B' + ), + response_payload.data + ) + self.assertIsNone(response_payload.iv_counter_nonce) + + def test_encrypt_no_iv_counter_nonce(self): + """ + Test that an Encrypt request can be processed correctly when a + specific IV/counter/nonce is not specified. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + encryption_key = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.TRIPLE_DES, + 128, + ( + b'\x01\x23\x45\x67\x89\xAB\xCD\xEF' + b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87' + ), + [enums.CryptographicUsageMask.ENCRYPT] + ) + encryption_key.state = enums.State.ACTIVE + + e._data_session.add(encryption_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + unique_identifier = str(encryption_key.unique_identifier) + cryptographic_parameters = attributes.CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.CBC, + padding_method=enums.PaddingMethod.PKCS5, + cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH + ) + data = ( + b'\x37\x36\x35\x34\x33\x32\x31\x20' + b'\x4E\x6F\x77\x20\x69\x73\x20\x74' + b'\x68\x65\x20\x74\x69\x6D\x65\x20' + b'\x66\x6F\x72\x20\x00' + ) + iv_counter_nonce = None + + payload = encrypt.EncryptRequestPayload( + unique_identifier, + cryptographic_parameters, + data, + iv_counter_nonce + ) + + response_payload = e._process_encrypt(payload) + + e._logger.info.assert_any_call("Processing operation: Encrypt") + self.assertEqual( + unique_identifier, + response_payload.unique_identifier + ) + self.assertIsNotNone(response_payload.data) + self.assertIsNotNone(response_payload.iv_counter_nonce) + + def test_encrypt_no_cryptographic_parameters(self): + """ + Test that the right error is thrown when cryptographic parameters + are not provided with an Encrypt request. + + Note: once the cryptographic parameters can be obtained from the + encryption key's attributes, this test should be updated to + reflect that. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + encryption_key = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.TRIPLE_DES, + 128, + ( + b'\x01\x23\x45\x67\x89\xAB\xCD\xEF' + b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87' + ), + [enums.CryptographicUsageMask.ENCRYPT] + ) + encryption_key.state = enums.State.ACTIVE + + e._data_session.add(encryption_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + unique_identifier = str(encryption_key.unique_identifier) + cryptographic_parameters = None + data = ( + b'\x37\x36\x35\x34\x33\x32\x31\x20' + b'\x4E\x6F\x77\x20\x69\x73\x20\x74' + b'\x68\x65\x20\x74\x69\x6D\x65\x20' + b'\x66\x6F\x72\x20\x00' + ) + iv_counter_nonce = b'\xFE\xDC\xBA\x98\x76\x54\x32\x10' + + payload = encrypt.EncryptRequestPayload( + unique_identifier, + cryptographic_parameters, + data, + iv_counter_nonce + ) + + args = (payload, ) + self.assertRaisesRegexp( + exceptions.InvalidField, + "The cryptographic parameters must be specified.", + e._process_encrypt, + *args + ) + + def test_encrypt_invalid_encryption_key(self): + """ + Test that the right error is thrown when an invalid encryption key + is specified with an Encrypt request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + encryption_key = pie_objects.OpaqueObject( + b'\x01\x02\x03\x04', + enums.OpaqueDataType.NONE + ) + + e._data_session.add(encryption_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + unique_identifier = str(encryption_key.unique_identifier) + cryptographic_parameters = attributes.CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.CBC, + padding_method=enums.PaddingMethod.PKCS5, + cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH + ) + data = ( + b'\x37\x36\x35\x34\x33\x32\x31\x20' + b'\x4E\x6F\x77\x20\x69\x73\x20\x74' + b'\x68\x65\x20\x74\x69\x6D\x65\x20' + b'\x66\x6F\x72\x20\x00' + ) + iv_counter_nonce = b'\xFE\xDC\xBA\x98\x76\x54\x32\x10' + + payload = encrypt.EncryptRequestPayload( + unique_identifier, + cryptographic_parameters, + data, + iv_counter_nonce + ) + + args = (payload, ) + self.assertRaisesRegexp( + exceptions.PermissionDenied, + "The requested encryption key is not a symmetric key. " + "Only symmetric encryption is currently supported.", + e._process_encrypt, + *args + ) + + def test_encrypt_inactive_encryption_key(self): + """ + Test that the right error is thrown when an inactive encryption key + is specified with an Encrypt request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + encryption_key = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.TRIPLE_DES, + 128, + ( + b'\x01\x23\x45\x67\x89\xAB\xCD\xEF' + b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87' + ), + [enums.CryptographicUsageMask.ENCRYPT] + ) + + e._data_session.add(encryption_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + unique_identifier = str(encryption_key.unique_identifier) + cryptographic_parameters = attributes.CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.CBC, + padding_method=enums.PaddingMethod.PKCS5, + cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH + ) + data = ( + b'\x37\x36\x35\x34\x33\x32\x31\x20' + b'\x4E\x6F\x77\x20\x69\x73\x20\x74' + b'\x68\x65\x20\x74\x69\x6D\x65\x20' + b'\x66\x6F\x72\x20\x00' + ) + iv_counter_nonce = b'\xFE\xDC\xBA\x98\x76\x54\x32\x10' + + payload = encrypt.EncryptRequestPayload( + unique_identifier, + cryptographic_parameters, + data, + iv_counter_nonce + ) + + args = (payload,) + self.assertRaisesRegexp( + exceptions.PermissionDenied, + "The encryption key must be in the Active state to be used " + "for encryption.", + e._process_encrypt, + *args + ) + + def test_encrypt_non_encryption_key(self): + """ + Test that the right error is thrown when a non-encryption key + is specified with an Encrypt request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + encryption_key = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.TRIPLE_DES, + 128, + ( + b'\x01\x23\x45\x67\x89\xAB\xCD\xEF' + b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87' + ), + [enums.CryptographicUsageMask.DECRYPT] + ) + encryption_key.state = enums.State.ACTIVE + + e._data_session.add(encryption_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + unique_identifier = str(encryption_key.unique_identifier) + cryptographic_parameters = attributes.CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.CBC, + padding_method=enums.PaddingMethod.PKCS5, + cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH + ) + data = ( + b'\x37\x36\x35\x34\x33\x32\x31\x20' + b'\x4E\x6F\x77\x20\x69\x73\x20\x74' + b'\x68\x65\x20\x74\x69\x6D\x65\x20' + b'\x66\x6F\x72\x20\x00' + ) + iv_counter_nonce = b'\xFE\xDC\xBA\x98\x76\x54\x32\x10' + + payload = encrypt.EncryptRequestPayload( + unique_identifier, + cryptographic_parameters, + data, + iv_counter_nonce + ) + + args = (payload,) + self.assertRaisesRegexp( + exceptions.PermissionDenied, + "The Encrypt bit must be set in the encryption key's " + "cryptographic usage mask.", + e._process_encrypt, + *args + ) + def test_mac(self): """ Test that a MAC request can be processed correctly.