diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 3f72182..1ffe243 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -1473,6 +1473,8 @@ class KmipEngine(object): "The specified length exceeds the output of the derivation " "method." ) + if len(derived_data) > derivation_length: + derived_data = derived_data[:derivation_length] if payload.object_type == enums.ObjectType.SYMMETRIC_KEY: managed_object = objects.SymmetricKey( diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 0c40711..d706133 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -3656,6 +3656,98 @@ class TestKmipEngine(testtools.TestCase): self.assertEqual(enums.SecretDataType.SEED, managed_object.data_type) self.assertIsNotNone(managed_object.initial_date) + def test_derive_key_truncation(self): + """ + Test that a derived key is properly truncated after it is generated if + needed. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + base_key = pie_objects.SymmetricKey( + algorithm=enums.CryptographicAlgorithm.BLOWFISH, + length=128, + value=( + b'\x01\x23\x45\x67\x89\xAB\xCD\xEF' + b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87' + ), + masks=[enums.CryptographicUsageMask.DERIVE_KEY] + ) + e._data_session.add(base_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + # Derive a SymmetricKey object. + payload = derive_key.DeriveKeyRequestPayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifiers=[str(base_key.unique_identifier)], + derivation_method=enums.DerivationMethod.ENCRYPT, + derivation_parameters=attributes.DerivationParameters( + cryptographic_parameters=attributes.CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.CBC, + padding_method=enums.PaddingMethod.PKCS5, + cryptographic_algorithm=enums.CryptographicAlgorithm. + BLOWFISH + ), + derivation_data=( + b'\x37\x36\x35\x34\x33\x32\x31\x20' + b'\x4E\x6F\x77\x20\x69\x73\x20\x74' + b'\x68\x65\x20\x74\x69\x6D\x65\x20' + b'\x66\x6F\x72\x20\x00' + ), + initialization_vector=b'\xFE\xDC\xBA\x98\x76\x54\x32\x10' + ), + template_attribute=objects.TemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 128 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + ] + ) + ) + + response_payload = e._process_derive_key(payload) + + e._logger.info.assert_any_call("Processing operation: DeriveKey") + e._logger.info.assert_any_call( + "Object 1 will be used as the keying material for the derivation " + "process." + ) + e._logger.info.assert_any_call("Created a SymmetricKey with ID: 2") + + self.assertEqual("2", response_payload.unique_identifier) + + managed_object = e._data_session.query( + pie_objects.SymmetricKey + ).filter( + pie_objects.SymmetricKey.unique_identifier == 2 + ).one() + + self.assertEqual( + ( + b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6' + b'\x05\xB1\x56\xE2\x74\x03\x97\x93' + ), + managed_object.value + ) + self.assertEqual( + enums.CryptographicAlgorithm.AES, + managed_object.cryptographic_algorithm + ) + self.assertEqual(128, managed_object.cryptographic_length) + self.assertIsNotNone(managed_object.initial_date) + def test_derive_key_invalid_derivation_type(self): """ Test that the right error is thrown when an invalid derivation type