mirror of
https://github.com/OpenKMIP/PyKMIP.git
synced 2025-07-07 06:04:21 +02:00
Merge pull request #296 from OpenKMIP/feat/add-decrypt-to-server
Add Decrypt support to the server
This commit is contained in:
commit
815cf9bc2f
@ -554,7 +554,8 @@ class CryptographyEngine(api.CryptographicEngine):
|
|||||||
plain_text = self._handle_symmetric_padding(
|
plain_text = self._handle_symmetric_padding(
|
||||||
self._symmetric_key_algorithms.get(decryption_algorithm),
|
self._symmetric_key_algorithms.get(decryption_algorithm),
|
||||||
plain_text,
|
plain_text,
|
||||||
padding_method
|
padding_method,
|
||||||
|
undo_padding=True
|
||||||
)
|
)
|
||||||
|
|
||||||
return plain_text
|
return plain_text
|
||||||
|
@ -42,6 +42,7 @@ from kmip.core.messages.payloads import activate
|
|||||||
from kmip.core.messages.payloads import revoke
|
from kmip.core.messages.payloads import revoke
|
||||||
from kmip.core.messages.payloads import create
|
from kmip.core.messages.payloads import create
|
||||||
from kmip.core.messages.payloads import create_key_pair
|
from kmip.core.messages.payloads import create_key_pair
|
||||||
|
from kmip.core.messages.payloads import decrypt
|
||||||
from kmip.core.messages.payloads import derive_key
|
from kmip.core.messages.payloads import derive_key
|
||||||
from kmip.core.messages.payloads import destroy
|
from kmip.core.messages.payloads import destroy
|
||||||
from kmip.core.messages.payloads import discover_versions
|
from kmip.core.messages.payloads import discover_versions
|
||||||
@ -986,6 +987,8 @@ class KmipEngine(object):
|
|||||||
return self._process_discover_versions(payload)
|
return self._process_discover_versions(payload)
|
||||||
elif operation == enums.Operation.ENCRYPT:
|
elif operation == enums.Operation.ENCRYPT:
|
||||||
return self._process_encrypt(payload)
|
return self._process_encrypt(payload)
|
||||||
|
elif operation == enums.Operation.DECRYPT:
|
||||||
|
return self._process_decrypt(payload)
|
||||||
elif operation == enums.Operation.MAC:
|
elif operation == enums.Operation.MAC:
|
||||||
return self._process_mac(payload)
|
return self._process_mac(payload)
|
||||||
else:
|
else:
|
||||||
@ -1936,6 +1939,7 @@ class KmipEngine(object):
|
|||||||
if self._protocol_version >= contents.ProtocolVersion.create(1, 2):
|
if self._protocol_version >= contents.ProtocolVersion.create(1, 2):
|
||||||
operations.extend([
|
operations.extend([
|
||||||
contents.Operation(enums.Operation.ENCRYPT),
|
contents.Operation(enums.Operation.ENCRYPT),
|
||||||
|
contents.Operation(enums.Operation.DECRYPT),
|
||||||
contents.Operation(enums.Operation.MAC)
|
contents.Operation(enums.Operation.MAC)
|
||||||
])
|
])
|
||||||
|
|
||||||
@ -2045,6 +2049,68 @@ class KmipEngine(object):
|
|||||||
)
|
)
|
||||||
return response_payload
|
return response_payload
|
||||||
|
|
||||||
|
@_kmip_version_supported('1.2')
|
||||||
|
def _process_decrypt(self, payload):
|
||||||
|
self._logger.info("Processing operation: Decrypt")
|
||||||
|
|
||||||
|
unique_identifier = self._id_placeholder
|
||||||
|
if payload.unique_identifier:
|
||||||
|
unique_identifier = payload.unique_identifier
|
||||||
|
|
||||||
|
# The KMIP spec does not indicate that the Decrypt operation should
|
||||||
|
# have it's own operation policy entry. Rather, the cryptographic
|
||||||
|
# usage mask should be used to determine if the object can be used
|
||||||
|
# to decrypt data (see below).
|
||||||
|
managed_object = self._get_object_with_access_controls(
|
||||||
|
unique_identifier,
|
||||||
|
enums.Operation.GET
|
||||||
|
)
|
||||||
|
|
||||||
|
cryptographic_parameters = payload.cryptographic_parameters
|
||||||
|
if cryptographic_parameters is None:
|
||||||
|
# TODO (peter-hamilton): Pull the cryptographic parameters from
|
||||||
|
# the attributes associated with the decryption key.
|
||||||
|
raise exceptions.InvalidField(
|
||||||
|
"The cryptographic parameters must be specified."
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO (peter-hamilton): Check the usage limitations for the key to
|
||||||
|
# confirm that it can be used for this operation.
|
||||||
|
|
||||||
|
if managed_object._object_type != enums.ObjectType.SYMMETRIC_KEY:
|
||||||
|
raise exceptions.PermissionDenied(
|
||||||
|
"The requested decryption key is not a symmetric key. "
|
||||||
|
"Only symmetric decryption is currently supported."
|
||||||
|
)
|
||||||
|
|
||||||
|
if managed_object.state != enums.State.ACTIVE:
|
||||||
|
raise exceptions.PermissionDenied(
|
||||||
|
"The decryption key must be in the Active state to be used "
|
||||||
|
"for decryption."
|
||||||
|
)
|
||||||
|
|
||||||
|
if enums.CryptographicUsageMask.DECRYPT not in \
|
||||||
|
managed_object.cryptographic_usage_masks:
|
||||||
|
raise exceptions.PermissionDenied(
|
||||||
|
"The Decrypt bit must be set in the decryption key's "
|
||||||
|
"cryptographic usage mask."
|
||||||
|
)
|
||||||
|
|
||||||
|
result = self._cryptography_engine.decrypt(
|
||||||
|
cryptographic_parameters.cryptographic_algorithm,
|
||||||
|
managed_object.value,
|
||||||
|
payload.data,
|
||||||
|
cipher_mode=cryptographic_parameters.block_cipher_mode,
|
||||||
|
padding_method=cryptographic_parameters.padding_method,
|
||||||
|
iv_nonce=payload.iv_counter_nonce
|
||||||
|
)
|
||||||
|
|
||||||
|
response_payload = decrypt.DecryptResponsePayload(
|
||||||
|
unique_identifier,
|
||||||
|
result
|
||||||
|
)
|
||||||
|
return response_payload
|
||||||
|
|
||||||
@_kmip_version_supported('1.2')
|
@_kmip_version_supported('1.2')
|
||||||
def _process_mac(self, payload):
|
def _process_mac(self, payload):
|
||||||
self._logger.info("Processing operation: MAC")
|
self._logger.info("Processing operation: MAC")
|
||||||
|
@ -945,7 +945,8 @@ def test_decrypt(encrypt_parameters):
|
|||||||
encrypt_parameters.get('algorithm')
|
encrypt_parameters.get('algorithm')
|
||||||
),
|
),
|
||||||
encrypt_parameters.get('plain_text'),
|
encrypt_parameters.get('plain_text'),
|
||||||
None
|
None,
|
||||||
|
undo_padding=True
|
||||||
)
|
)
|
||||||
|
|
||||||
assert encrypt_parameters.get('plain_text') == result
|
assert encrypt_parameters.get('plain_text') == result
|
||||||
|
@ -43,6 +43,7 @@ from kmip.core.messages.payloads import activate
|
|||||||
from kmip.core.messages.payloads import revoke
|
from kmip.core.messages.payloads import revoke
|
||||||
from kmip.core.messages.payloads import create
|
from kmip.core.messages.payloads import create
|
||||||
from kmip.core.messages.payloads import create_key_pair
|
from kmip.core.messages.payloads import create_key_pair
|
||||||
|
from kmip.core.messages.payloads import decrypt
|
||||||
from kmip.core.messages.payloads import derive_key
|
from kmip.core.messages.payloads import derive_key
|
||||||
from kmip.core.messages.payloads import destroy
|
from kmip.core.messages.payloads import destroy
|
||||||
from kmip.core.messages.payloads import discover_versions
|
from kmip.core.messages.payloads import discover_versions
|
||||||
@ -899,6 +900,7 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
e._process_query = mock.MagicMock()
|
e._process_query = mock.MagicMock()
|
||||||
e._process_discover_versions = mock.MagicMock()
|
e._process_discover_versions = mock.MagicMock()
|
||||||
e._process_encrypt = mock.MagicMock()
|
e._process_encrypt = mock.MagicMock()
|
||||||
|
e._process_decrypt = mock.MagicMock()
|
||||||
e._process_mac = mock.MagicMock()
|
e._process_mac = mock.MagicMock()
|
||||||
|
|
||||||
e._process_operation(enums.Operation.CREATE, None)
|
e._process_operation(enums.Operation.CREATE, None)
|
||||||
@ -914,6 +916,7 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
e._process_operation(enums.Operation.QUERY, None)
|
e._process_operation(enums.Operation.QUERY, None)
|
||||||
e._process_operation(enums.Operation.DISCOVER_VERSIONS, None)
|
e._process_operation(enums.Operation.DISCOVER_VERSIONS, None)
|
||||||
e._process_operation(enums.Operation.ENCRYPT, None)
|
e._process_operation(enums.Operation.ENCRYPT, None)
|
||||||
|
e._process_operation(enums.Operation.DECRYPT, None)
|
||||||
e._process_operation(enums.Operation.MAC, None)
|
e._process_operation(enums.Operation.MAC, None)
|
||||||
|
|
||||||
e._process_create.assert_called_with(None)
|
e._process_create.assert_called_with(None)
|
||||||
@ -929,6 +932,7 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
e._process_query.assert_called_with(None)
|
e._process_query.assert_called_with(None)
|
||||||
e._process_discover_versions.assert_called_with(None)
|
e._process_discover_versions.assert_called_with(None)
|
||||||
e._process_encrypt.assert_called_with(None)
|
e._process_encrypt.assert_called_with(None)
|
||||||
|
e._process_decrypt.assert_called_with(None)
|
||||||
e._process_mac.assert_called_with(None)
|
e._process_mac.assert_called_with(None)
|
||||||
|
|
||||||
def test_unsupported_operation(self):
|
def test_unsupported_operation(self):
|
||||||
@ -6421,7 +6425,7 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
e._logger.info.assert_called_once_with("Processing operation: Query")
|
e._logger.info.assert_called_once_with("Processing operation: Query")
|
||||||
self.assertIsInstance(result, query.QueryResponsePayload)
|
self.assertIsInstance(result, query.QueryResponsePayload)
|
||||||
self.assertIsNotNone(result.operations)
|
self.assertIsNotNone(result.operations)
|
||||||
self.assertEqual(14, len(result.operations))
|
self.assertEqual(15, len(result.operations))
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
enums.Operation.CREATE,
|
enums.Operation.CREATE,
|
||||||
result.operations[0].value
|
result.operations[0].value
|
||||||
@ -6475,9 +6479,13 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
result.operations[12].value
|
result.operations[12].value
|
||||||
)
|
)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
enums.Operation.MAC,
|
enums.Operation.DECRYPT,
|
||||||
result.operations[13].value
|
result.operations[13].value
|
||||||
)
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
enums.Operation.MAC,
|
||||||
|
result.operations[14].value
|
||||||
|
)
|
||||||
self.assertEqual(list(), result.object_types)
|
self.assertEqual(list(), result.object_types)
|
||||||
self.assertIsNotNone(result.vendor_identification)
|
self.assertIsNotNone(result.vendor_identification)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
@ -6900,6 +6908,293 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
*args
|
*args
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_decrypt(self):
|
||||||
|
"""
|
||||||
|
Test that an Decrypt request can be processed correctly.
|
||||||
|
|
||||||
|
The test vectors used here come from Eric Young's test set for
|
||||||
|
Blowfish, via https://www.di-mgt.com.au/cryptopad.html.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e._data_store = self.engine
|
||||||
|
e._data_store_session_factory = self.session_factory
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
e._logger = mock.MagicMock()
|
||||||
|
e._cryptography_engine.logger = mock.MagicMock()
|
||||||
|
|
||||||
|
decryption_key = pie_objects.SymmetricKey(
|
||||||
|
enums.CryptographicAlgorithm.TRIPLE_DES,
|
||||||
|
128,
|
||||||
|
(
|
||||||
|
b'\x01\x23\x45\x67\x89\xAB\xCD\xEF'
|
||||||
|
b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87'
|
||||||
|
),
|
||||||
|
[enums.CryptographicUsageMask.DECRYPT]
|
||||||
|
)
|
||||||
|
decryption_key.state = enums.State.ACTIVE
|
||||||
|
|
||||||
|
e._data_session.add(decryption_key)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
unique_identifier = str(decryption_key.unique_identifier)
|
||||||
|
cryptographic_parameters = attributes.CryptographicParameters(
|
||||||
|
block_cipher_mode=enums.BlockCipherMode.CBC,
|
||||||
|
padding_method=enums.PaddingMethod.PKCS5,
|
||||||
|
cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH
|
||||||
|
)
|
||||||
|
data = (
|
||||||
|
b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6'
|
||||||
|
b'\x05\xB1\x56\xE2\x74\x03\x97\x93'
|
||||||
|
b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9'
|
||||||
|
b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B'
|
||||||
|
)
|
||||||
|
iv_counter_nonce = b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
|
||||||
|
|
||||||
|
payload = decrypt.DecryptRequestPayload(
|
||||||
|
unique_identifier,
|
||||||
|
cryptographic_parameters,
|
||||||
|
data,
|
||||||
|
iv_counter_nonce
|
||||||
|
)
|
||||||
|
|
||||||
|
response_payload = e._process_decrypt(payload)
|
||||||
|
|
||||||
|
e._logger.info.assert_any_call("Processing operation: Decrypt")
|
||||||
|
self.assertEqual(
|
||||||
|
unique_identifier,
|
||||||
|
response_payload.unique_identifier
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
(
|
||||||
|
b'\x37\x36\x35\x34\x33\x32\x31\x20'
|
||||||
|
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
|
||||||
|
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
|
||||||
|
b'\x66\x6F\x72\x20\x00'
|
||||||
|
),
|
||||||
|
response_payload.data
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_decrypt_no_cryptographic_parameters(self):
|
||||||
|
"""
|
||||||
|
Test that the right error is thrown when cryptographic parameters
|
||||||
|
are not provided with a Decrypt request.
|
||||||
|
|
||||||
|
Note: once the cryptographic parameters can be obtained from the
|
||||||
|
encryption key's attributes, this test should be updated to
|
||||||
|
reflect that.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e._data_store = self.engine
|
||||||
|
e._data_store_session_factory = self.session_factory
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
e._logger = mock.MagicMock()
|
||||||
|
e._cryptography_engine.logger = mock.MagicMock()
|
||||||
|
|
||||||
|
decryption_key = pie_objects.SymmetricKey(
|
||||||
|
enums.CryptographicAlgorithm.TRIPLE_DES,
|
||||||
|
128,
|
||||||
|
(
|
||||||
|
b'\x01\x23\x45\x67\x89\xAB\xCD\xEF'
|
||||||
|
b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87'
|
||||||
|
),
|
||||||
|
[enums.CryptographicUsageMask.DECRYPT]
|
||||||
|
)
|
||||||
|
decryption_key.state = enums.State.ACTIVE
|
||||||
|
|
||||||
|
e._data_session.add(decryption_key)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
unique_identifier = str(decryption_key.unique_identifier)
|
||||||
|
cryptographic_parameters = None
|
||||||
|
data = (
|
||||||
|
b'\x37\x36\x35\x34\x33\x32\x31\x20'
|
||||||
|
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
|
||||||
|
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
|
||||||
|
b'\x66\x6F\x72\x20\x00'
|
||||||
|
)
|
||||||
|
iv_counter_nonce = b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
|
||||||
|
|
||||||
|
payload = decrypt.DecryptRequestPayload(
|
||||||
|
unique_identifier,
|
||||||
|
cryptographic_parameters,
|
||||||
|
data,
|
||||||
|
iv_counter_nonce
|
||||||
|
)
|
||||||
|
|
||||||
|
args = (payload, )
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
exceptions.InvalidField,
|
||||||
|
"The cryptographic parameters must be specified.",
|
||||||
|
e._process_decrypt,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_decrypt_invalid_decryption_key(self):
|
||||||
|
"""
|
||||||
|
Test that the right error is thrown when an invalid decryption key
|
||||||
|
is specified with a Decrypt request.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e._data_store = self.engine
|
||||||
|
e._data_store_session_factory = self.session_factory
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
e._logger = mock.MagicMock()
|
||||||
|
e._cryptography_engine.logger = mock.MagicMock()
|
||||||
|
|
||||||
|
decryption_key = pie_objects.OpaqueObject(
|
||||||
|
b'\x01\x02\x03\x04',
|
||||||
|
enums.OpaqueDataType.NONE
|
||||||
|
)
|
||||||
|
|
||||||
|
e._data_session.add(decryption_key)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
unique_identifier = str(decryption_key.unique_identifier)
|
||||||
|
cryptographic_parameters = attributes.CryptographicParameters(
|
||||||
|
block_cipher_mode=enums.BlockCipherMode.CBC,
|
||||||
|
padding_method=enums.PaddingMethod.PKCS5,
|
||||||
|
cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH
|
||||||
|
)
|
||||||
|
data = (
|
||||||
|
b'\x37\x36\x35\x34\x33\x32\x31\x20'
|
||||||
|
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
|
||||||
|
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
|
||||||
|
b'\x66\x6F\x72\x20\x00'
|
||||||
|
)
|
||||||
|
iv_counter_nonce = b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
|
||||||
|
|
||||||
|
payload = decrypt.DecryptRequestPayload(
|
||||||
|
unique_identifier,
|
||||||
|
cryptographic_parameters,
|
||||||
|
data,
|
||||||
|
iv_counter_nonce
|
||||||
|
)
|
||||||
|
|
||||||
|
args = (payload, )
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
exceptions.PermissionDenied,
|
||||||
|
"The requested decryption key is not a symmetric key. "
|
||||||
|
"Only symmetric decryption is currently supported.",
|
||||||
|
e._process_decrypt,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_decrypt_inactive_decryption_key(self):
|
||||||
|
"""
|
||||||
|
Test that the right error is thrown when an inactive decryption key
|
||||||
|
is specified with a Decrypt request.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e._data_store = self.engine
|
||||||
|
e._data_store_session_factory = self.session_factory
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
e._logger = mock.MagicMock()
|
||||||
|
e._cryptography_engine.logger = mock.MagicMock()
|
||||||
|
|
||||||
|
decryption_key = pie_objects.SymmetricKey(
|
||||||
|
enums.CryptographicAlgorithm.TRIPLE_DES,
|
||||||
|
128,
|
||||||
|
(
|
||||||
|
b'\x01\x23\x45\x67\x89\xAB\xCD\xEF'
|
||||||
|
b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87'
|
||||||
|
),
|
||||||
|
[enums.CryptographicUsageMask.DECRYPT]
|
||||||
|
)
|
||||||
|
|
||||||
|
e._data_session.add(decryption_key)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
unique_identifier = str(decryption_key.unique_identifier)
|
||||||
|
cryptographic_parameters = attributes.CryptographicParameters(
|
||||||
|
block_cipher_mode=enums.BlockCipherMode.CBC,
|
||||||
|
padding_method=enums.PaddingMethod.PKCS5,
|
||||||
|
cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH
|
||||||
|
)
|
||||||
|
data = (
|
||||||
|
b'\x37\x36\x35\x34\x33\x32\x31\x20'
|
||||||
|
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
|
||||||
|
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
|
||||||
|
b'\x66\x6F\x72\x20\x00'
|
||||||
|
)
|
||||||
|
iv_counter_nonce = b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
|
||||||
|
|
||||||
|
payload = decrypt.DecryptRequestPayload(
|
||||||
|
unique_identifier,
|
||||||
|
cryptographic_parameters,
|
||||||
|
data,
|
||||||
|
iv_counter_nonce
|
||||||
|
)
|
||||||
|
|
||||||
|
args = (payload,)
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
exceptions.PermissionDenied,
|
||||||
|
"The decryption key must be in the Active state to be used "
|
||||||
|
"for decryption.",
|
||||||
|
e._process_decrypt,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_decrypt_non_decryption_key(self):
|
||||||
|
"""
|
||||||
|
Test that the right error is thrown when a non-decryption key
|
||||||
|
is specified with a Decrypt request.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e._data_store = self.engine
|
||||||
|
e._data_store_session_factory = self.session_factory
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
e._logger = mock.MagicMock()
|
||||||
|
e._cryptography_engine.logger = mock.MagicMock()
|
||||||
|
|
||||||
|
decryption_key = pie_objects.SymmetricKey(
|
||||||
|
enums.CryptographicAlgorithm.TRIPLE_DES,
|
||||||
|
128,
|
||||||
|
(
|
||||||
|
b'\x01\x23\x45\x67\x89\xAB\xCD\xEF'
|
||||||
|
b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87'
|
||||||
|
),
|
||||||
|
[enums.CryptographicUsageMask.ENCRYPT]
|
||||||
|
)
|
||||||
|
decryption_key.state = enums.State.ACTIVE
|
||||||
|
|
||||||
|
e._data_session.add(decryption_key)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
unique_identifier = str(decryption_key.unique_identifier)
|
||||||
|
cryptographic_parameters = attributes.CryptographicParameters(
|
||||||
|
block_cipher_mode=enums.BlockCipherMode.CBC,
|
||||||
|
padding_method=enums.PaddingMethod.PKCS5,
|
||||||
|
cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH
|
||||||
|
)
|
||||||
|
data = (
|
||||||
|
b'\x37\x36\x35\x34\x33\x32\x31\x20'
|
||||||
|
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
|
||||||
|
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
|
||||||
|
b'\x66\x6F\x72\x20\x00'
|
||||||
|
)
|
||||||
|
iv_counter_nonce = b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
|
||||||
|
|
||||||
|
payload = decrypt.DecryptRequestPayload(
|
||||||
|
unique_identifier,
|
||||||
|
cryptographic_parameters,
|
||||||
|
data,
|
||||||
|
iv_counter_nonce
|
||||||
|
)
|
||||||
|
|
||||||
|
args = (payload,)
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
exceptions.PermissionDenied,
|
||||||
|
"The Decrypt bit must be set in the decryption key's "
|
||||||
|
"cryptographic usage mask.",
|
||||||
|
e._process_decrypt,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
def test_mac(self):
|
def test_mac(self):
|
||||||
"""
|
"""
|
||||||
Test that a MAC request can be processed correctly.
|
Test that a MAC request can be processed correctly.
|
||||||
@ -7615,3 +7910,213 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
|
|
||||||
e._data_session.commit()
|
e._data_session.commit()
|
||||||
e._data_store_session_factory()
|
e._data_store_session_factory()
|
||||||
|
|
||||||
|
def test_register_activate_encrypt_decrypt_revoke_destroy(self):
|
||||||
|
"""
|
||||||
|
Test that a symmetric key can be registered with the server,
|
||||||
|
activated, used for encryption and decryption, revoked, and finally
|
||||||
|
destroyed without error.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e._data_store = self.engine
|
||||||
|
e._data_store_session_factory = self.session_factory
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
e._logger = mock.MagicMock()
|
||||||
|
|
||||||
|
attribute_factory = factory.AttributeFactory()
|
||||||
|
|
||||||
|
# Build a SymmetricKey for registration.
|
||||||
|
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
|
||||||
|
template_attribute = objects.TemplateAttribute(
|
||||||
|
attributes=[
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.NAME,
|
||||||
|
attributes.Name.create(
|
||||||
|
'Test Symmetric Key',
|
||||||
|
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||||
|
)
|
||||||
|
),
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||||
|
enums.CryptographicAlgorithm.BLOWFISH
|
||||||
|
),
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
|
||||||
|
128
|
||||||
|
),
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||||
|
[
|
||||||
|
enums.CryptographicUsageMask.ENCRYPT,
|
||||||
|
enums.CryptographicUsageMask.DECRYPT
|
||||||
|
]
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
key_bytes = (
|
||||||
|
b'\x01\x23\x45\x67\x89\xAB\xCD\xEF'
|
||||||
|
b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87'
|
||||||
|
)
|
||||||
|
secret = secrets.SymmetricKey(
|
||||||
|
key_block=objects.KeyBlock(
|
||||||
|
key_format_type=misc.KeyFormatType(enums.KeyFormatType.RAW),
|
||||||
|
key_value=objects.KeyValue(
|
||||||
|
key_material=objects.KeyMaterial(key_bytes)
|
||||||
|
),
|
||||||
|
cryptographic_algorithm=attributes.CryptographicAlgorithm(
|
||||||
|
enums.CryptographicAlgorithm.BLOWFISH
|
||||||
|
),
|
||||||
|
cryptographic_length=attributes.CryptographicLength(128)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Register the symmetric key with the corresponding attributes
|
||||||
|
payload = register.RegisterRequestPayload(
|
||||||
|
object_type=object_type,
|
||||||
|
template_attribute=template_attribute,
|
||||||
|
secret=secret
|
||||||
|
)
|
||||||
|
|
||||||
|
response_payload = e._process_register(payload)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
e._logger.info.assert_any_call(
|
||||||
|
"Processing operation: Register"
|
||||||
|
)
|
||||||
|
|
||||||
|
uuid = response_payload.unique_identifier.value
|
||||||
|
self.assertEqual('1', uuid)
|
||||||
|
|
||||||
|
e._logger.reset_mock()
|
||||||
|
|
||||||
|
# Activate the symmetric key
|
||||||
|
payload = activate.ActivateRequestPayload(
|
||||||
|
attributes.UniqueIdentifier(uuid)
|
||||||
|
)
|
||||||
|
|
||||||
|
response_payload = e._process_activate(payload)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
e._logger.info.assert_any_call(
|
||||||
|
"Processing operation: Activate"
|
||||||
|
)
|
||||||
|
|
||||||
|
activated_uuid = response_payload.unique_identifier.value
|
||||||
|
self.assertEqual(uuid, activated_uuid)
|
||||||
|
|
||||||
|
# Encrypt some data using the symmetric key
|
||||||
|
payload = encrypt.EncryptRequestPayload(
|
||||||
|
unique_identifier=uuid,
|
||||||
|
cryptographic_parameters=attributes.CryptographicParameters(
|
||||||
|
block_cipher_mode=enums.BlockCipherMode.CBC,
|
||||||
|
padding_method=enums.PaddingMethod.PKCS5,
|
||||||
|
cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH
|
||||||
|
),
|
||||||
|
data=(
|
||||||
|
b'\x37\x36\x35\x34\x33\x32\x31\x20'
|
||||||
|
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
|
||||||
|
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
|
||||||
|
b'\x66\x6F\x72\x20\x00'
|
||||||
|
),
|
||||||
|
iv_counter_nonce=b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
|
||||||
|
)
|
||||||
|
|
||||||
|
response_payload = e._process_encrypt(payload)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
e._logger.info.assert_any_call(
|
||||||
|
"Processing operation: Encrypt"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
uuid,
|
||||||
|
response_payload.unique_identifier
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
(
|
||||||
|
b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6'
|
||||||
|
b'\x05\xB1\x56\xE2\x74\x03\x97\x93'
|
||||||
|
b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9'
|
||||||
|
b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B'
|
||||||
|
),
|
||||||
|
response_payload.data
|
||||||
|
)
|
||||||
|
|
||||||
|
# Decrypt the encrypted data using the symmetric key
|
||||||
|
payload = decrypt.DecryptRequestPayload(
|
||||||
|
unique_identifier=uuid,
|
||||||
|
cryptographic_parameters=attributes.CryptographicParameters(
|
||||||
|
block_cipher_mode=enums.BlockCipherMode.CBC,
|
||||||
|
padding_method=enums.PaddingMethod.PKCS5,
|
||||||
|
cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH
|
||||||
|
),
|
||||||
|
data=response_payload.data,
|
||||||
|
iv_counter_nonce=b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
|
||||||
|
)
|
||||||
|
|
||||||
|
response_payload = e._process_decrypt(payload)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
e._logger.info.assert_any_call(
|
||||||
|
"Processing operation: Decrypt"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
uuid,
|
||||||
|
response_payload.unique_identifier
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
(
|
||||||
|
b'\x37\x36\x35\x34\x33\x32\x31\x20'
|
||||||
|
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
|
||||||
|
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
|
||||||
|
b'\x66\x6F\x72\x20\x00'
|
||||||
|
),
|
||||||
|
response_payload.data
|
||||||
|
)
|
||||||
|
|
||||||
|
# Revoke the activated symmetric key to prepare it for deletion
|
||||||
|
payload = revoke.RevokeRequestPayload(
|
||||||
|
unique_identifier=attributes.UniqueIdentifier(uuid)
|
||||||
|
)
|
||||||
|
|
||||||
|
response_payload = e._process_revoke(payload)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
e._logger.info.assert_any_call(
|
||||||
|
"Processing operation: Revoke"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(uuid, response_payload.unique_identifier.value)
|
||||||
|
|
||||||
|
# Destroy the symmetric key and verify it cannot be accessed again
|
||||||
|
payload = destroy.DestroyRequestPayload(
|
||||||
|
unique_identifier=attributes.UniqueIdentifier(uuid)
|
||||||
|
)
|
||||||
|
|
||||||
|
response_payload = e._process_destroy(payload)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
e._logger.info.assert_any_call(
|
||||||
|
"Processing operation: Destroy"
|
||||||
|
)
|
||||||
|
self.assertEqual(str(uuid), response_payload.unique_identifier.value)
|
||||||
|
|
||||||
|
args = (payload, )
|
||||||
|
regex = "Could not locate object: {0}".format(uuid)
|
||||||
|
six.assertRaisesRegex(
|
||||||
|
self,
|
||||||
|
exceptions.ItemNotFound,
|
||||||
|
regex,
|
||||||
|
e._process_destroy,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_store_session_factory()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user