From fc7224e20d519c8cd5550b33fc4a771745697175 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Fri, 25 Aug 2017 14:28:35 -0400 Subject: [PATCH] Add SignatureVerify support to the server This change adds the SignatureVerify operation to the server. Unit tests covering the additions are included. The Query operation has been updated to reflect this addition. --- kmip/services/server/engine.py | 72 ++++ .../tests/unit/services/server/test_engine.py | 358 +++++++++++++++++- 2 files changed, 428 insertions(+), 2 deletions(-) diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 68f0e21..2e20ce7 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -55,6 +55,7 @@ from kmip.core.messages.payloads import register from kmip.core.messages.payloads import mac from kmip.core.messages.payloads import locate from kmip.core.messages.payloads import sign +from kmip.core.messages.payloads import signature_verify from kmip.core import misc @@ -990,6 +991,8 @@ class KmipEngine(object): return self._process_encrypt(payload) elif operation == enums.Operation.DECRYPT: return self._process_decrypt(payload) + elif operation == enums.Operation.SIGNATURE_VERIFY: + return self._process_signature_verify(payload) elif operation == enums.Operation.MAC: return self._process_mac(payload) elif operation == enums.Operation.SIGN: @@ -1946,6 +1949,7 @@ class KmipEngine(object): contents.Operation(enums.Operation.ENCRYPT), contents.Operation(enums.Operation.DECRYPT), contents.Operation(enums.Operation.SIGN), + contents.Operation(enums.Operation.SIGNATURE_VERIFY), contents.Operation(enums.Operation.MAC) ]) @@ -2117,6 +2121,74 @@ class KmipEngine(object): ) return response_payload + @_kmip_version_supported('1.2') + def _process_signature_verify(self, payload): + self._logger.info("Processing operation: Signature Verify") + + unique_identifier = self._id_placeholder + if payload.unique_identifier: + unique_identifier = payload.unique_identifier + + # The KMIP spec does not indicate that the SignatureVerify operation + # should have it's own operation policy entry. Rather, the + # cryptographic usage mask should be used to determine if the object + # can be used to verify signatures (see below). + managed_object = self._get_object_with_access_controls( + unique_identifier, + enums.Operation.GET + ) + + parameters = payload.cryptographic_parameters + if parameters is None: + # TODO (peter-hamilton): Pull the cryptographic parameters from + # the attributes associated with the signing key. + raise exceptions.InvalidField( + "The cryptographic parameters must be specified." + ) + + # TODO (peter-hamilton): Check the usage limitations for the key to + # confirm that it can be used for this operation. + + if managed_object._object_type != enums.ObjectType.PUBLIC_KEY: + raise exceptions.PermissionDenied( + "The requested signing key is not a public key. A public key " + "must be specified." + ) + + if managed_object.state != enums.State.ACTIVE: + raise exceptions.PermissionDenied( + "The signing key must be in the Active state to be used for " + "signature verification." + ) + + masks = managed_object.cryptographic_usage_masks + if enums.CryptographicUsageMask.VERIFY not in masks: + raise exceptions.PermissionDenied( + "The Verify bit must be set in the signing key's " + "cryptographic usage mask." + ) + + result = self._cryptography_engine.verify_signature( + signing_key=managed_object.value, + message=payload.data, + signature=payload.signature_data, + padding_method=parameters.padding_method, + signing_algorithm=parameters.cryptographic_algorithm, + hashing_algorithm=parameters.hashing_algorithm, + digital_signature_algorithm=parameters.digital_signature_algorithm + ) + + if result: + validity = enums.ValidityIndicator.VALID + else: + validity = enums.ValidityIndicator.INVALID + + response_payload = signature_verify.SignatureVerifyResponsePayload( + unique_identifier=unique_identifier, + validity_indicator=validity + ) + return response_payload + @_kmip_version_supported('1.2') def _process_mac(self, payload): self._logger.info("Processing operation: MAC") diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 382ef54..48154bc 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -56,6 +56,7 @@ from kmip.core.messages.payloads import register from kmip.core.messages.payloads import mac from kmip.core.messages.payloads import locate from kmip.core.messages.payloads import sign +from kmip.core.messages.payloads import signature_verify from kmip.pie import objects as pie_objects from kmip.pie import sqltypes @@ -902,6 +903,7 @@ class TestKmipEngine(testtools.TestCase): e._process_discover_versions = mock.MagicMock() e._process_encrypt = mock.MagicMock() e._process_decrypt = mock.MagicMock() + e._process_signature_verify = mock.MagicMock() e._process_mac = mock.MagicMock() e._process_sign = mock.MagicMock() @@ -920,6 +922,7 @@ class TestKmipEngine(testtools.TestCase): e._process_operation(enums.Operation.ENCRYPT, None) e._process_operation(enums.Operation.DECRYPT, None) e._process_operation(enums.Operation.SIGN, None) + e._process_operation(enums.Operation.SIGNATURE_VERIFY, None) e._process_operation(enums.Operation.MAC, None) e._process_create.assert_called_with(None) @@ -936,6 +939,7 @@ class TestKmipEngine(testtools.TestCase): e._process_discover_versions.assert_called_with(None) e._process_encrypt.assert_called_with(None) e._process_decrypt.assert_called_with(None) + e._process_signature_verify.assert_called_with(None) e._process_mac.assert_called_with(None) def test_unsupported_operation(self): @@ -6520,7 +6524,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsNotNone(result.operations) - self.assertEqual(16, len(result.operations)) + self.assertEqual(17, len(result.operations)) self.assertEqual( enums.Operation.CREATE, result.operations[0].value @@ -6582,9 +6586,13 @@ class TestKmipEngine(testtools.TestCase): result.operations[14].value ) self.assertEqual( - enums.Operation.MAC, + enums.Operation.SIGNATURE_VERIFY, result.operations[15].value ) + self.assertEqual( + enums.Operation.MAC, + result.operations[16].value + ) self.assertEqual(list(), result.object_types) self.assertIsNotNone(result.vendor_identification) self.assertEqual( @@ -7294,6 +7302,352 @@ class TestKmipEngine(testtools.TestCase): *args ) + def test_signature_verify(self): + """ + Test that a SignatureVerify request can be processed correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + signing_key = pie_objects.PublicKey( + enums.CryptographicAlgorithm.RSA, + 1120, + ( + b'\x30\x81\x89\x02\x81\x81\x00\xac\x13\xd9\xfd\xae\x7b\x73\x35' + b'\xb6\x9c\xd9\x85\x67\xe9\x64\x7d\x99\xbf\x37\x3a\x9e\x05\xce' + b'\x34\x35\xd6\x64\x65\xf3\x28\xb7\xf7\x33\x4b\x79\x2a\xee\x7e' + b'\xfa\x04\x4e\xbc\x4c\x7a\x30\xb2\x1a\x5d\x7a\x89\xcd\xb3\xa3' + b'\x0d\xfc\xd9\xfe\xe9\x99\x5e\x09\x41\x5e\xdc\x0b\xf9\xe5\xb4' + b'\xc3\xf7\x4f\xf5\x3f\xb4\xd2\x94\x41\xbf\x1b\x7e\xd6\xcb\xdd' + b'\x4a\x47\xf9\x25\x22\x69\xe1\x64\x6f\x6c\x1a\xee\x05\x14\xe9' + b'\x3f\x6c\xb9\xdf\x71\xd0\x6c\x06\x0a\x21\x04\xb4\x7b\x72\x60' + b'\xac\x37\xc1\x06\x86\x1d\xc7\x8c\xa5\xa2\x5f\xaa\x9c\xb2\xe3' + b'\x02\x03\x01\x00\x01' + ), + masks=[ + enums.CryptographicUsageMask.SIGN, + enums.CryptographicUsageMask.VERIFY + ] + ) + signing_key.state = enums.State.ACTIVE + + e._data_session.add(signing_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + # Test a valid signature + unique_identifier = str(signing_key.unique_identifier) + payload = signature_verify.SignatureVerifyRequestPayload( + unique_identifier=unique_identifier, + cryptographic_parameters=attributes.CryptographicParameters( + padding_method=enums.PaddingMethod.PSS, + digital_signature_algorithm=enums.DigitalSignatureAlgorithm. + SHA1_WITH_RSA_ENCRYPTION + ), + data=( + b'\xe1\xc0\xf9\x8d\x53\xf8\xf8\xb1\x41\x90\x57\xd5\xb9\xb1\x0b' + b'\x07\xfe\xea\xec\x32\xc0\x46\x3a\x4d\x68\x38\x2f\x53\x1b\xa1' + b'\xd6\xcf\xe4\xed\x38\xa2\x69\x4a\x34\xb9\xc8\x05\xad\xf0\x72' + b'\xff\xbc\xeb\xe2\x1d\x8d\x4b\x5c\x0e\x8c\x33\x45\x2d\xd8\xf9' + b'\xc9\xbf\x45\xd1\xe6\x33\x75\x11\x33\x58\x82\x29\xd2\x93\xc6' + b'\x49\x6b\x7c\x98\x3c\x2c\x72\xbd\x21\xd3\x39\x27\x2d\x78\x28' + b'\xb0\xd0\x9d\x01\x0b\xba\xd3\x18\xd9\x98\xf7\x04\x79\x67\x33' + b'\x8a\xce\xfd\x01\xe8\x74\xac\xe5\xf8\x6d\x2a\x60\xf3\xb3\xca' + b'\xe1\x3f\xc5\xc6\x65\x08\xcf\xb7\x23\x78\xfd\xd6\xc8\xde\x24' + b'\x97\x65\x10\x3c\xe8\xfe\x7c\xd3\x3a\xd0\xef\x16\x86\xfe\xb2' + b'\x5e\x6a\x35\xfb\x64\xe0\x96\xa4' + ), + signature_data=( + b'\x01\xf6\xe5\xff\x04\x22\x1a\xdc\x6c\x2f\x22\xa7\x61\x05\x3b' + b'\xc4\x73\x27\x65\xdd\xdc\x3f\x76\x56\xd0\xd1\x22\xad\x3b\x8a' + b'\x4e\x4f\x8f\xe5\x5b\xd0\xc0\x9e\xb1\x07\x80\xa1\x39\xcd\xa9' + b'\x32\x34\xef\x98\x8f\xe2\x50\x20\x1e\xb2\xfe\xbd\x08\xb6\xee' + b'\x85\xd7\x0d\x16\x05\xa5\xba\x56\x85\x21\x52\x99\xf0\x74\xc8' + b'\x0b\xaf\xf8\x1e\x2c\xa3\x10\x7d\xa9\x17\x5c\x2f\x5a\x7c\x6b' + b'\x60\xea\xa2\x8a\x75\x8c\xa9\x34\xf2\xff\x16\x98\x8f\xe8\x5f' + b'\xf8\x41\x57\xd9\x51\x44\x8a\x85\xec\x1e\xd1\x71\xf9\xef\x8b' + b'\xb8\xa1\x0c\xfa\x14\x7b\x7e\xf8' + ) + ) + + response_payload = e._process_signature_verify(payload) + + e._logger.info.assert_any_call( + "Processing operation: Signature Verify" + ) + self.assertEqual( + unique_identifier, + response_payload.unique_identifier + ) + self.assertEqual( + enums.ValidityIndicator.VALID, + response_payload.validity_indicator + ) + + # Test an invalid signature + payload = signature_verify.SignatureVerifyRequestPayload( + unique_identifier=unique_identifier, + cryptographic_parameters=attributes.CryptographicParameters( + padding_method=enums.PaddingMethod.PSS, + digital_signature_algorithm=enums.DigitalSignatureAlgorithm. + SHA1_WITH_RSA_ENCRYPTION + ), + data=( + b'\xe1\xc0\xf9\x8d\x53\xf8\xf8\xb1\x41\x90\x57\xd5\xb9\xb1\x0b' + b'\x07\xfe\xea\xec\x32\xc0\x46\x3a\x4d\x68\x38\x2f\x53\x1b\xa1' + b'\xd6\xcf\xe4\xed\x38\xa2\x69\x4a\x34\xb9\xc8\x05\xad\xf0\x72' + b'\xff\xbc\xeb\xe2\x1d\x8d\x4b\x5c\x0e\x8c\x33\x45\x2d\xd8\xf9' + b'\xc9\xbf\x45\xd1\xe6\x33\x75\x11\x33\x58\x82\x29\xd2\x93\xc6' + b'\x49\x6b\x7c\x98\x3c\x2c\x72\xbd\x21\xd3\x39\x27\x2d\x78\x28' + b'\xb0\xd0\x9d\x01\x0b\xba\xd3\x18\xd9\x98\xf7\x04\x79\x67\x33' + b'\x8a\xce\xfd\x01\xe8\x74\xac\xe5\xf8\x6d\x2a\x60\xf3\xb3\xca' + b'\xe1\x3f\xc5\xc6\x65\x08\xcf\xb7\x23\x78\xfd\xd6\xc8\xde\x24' + b'\x97\x65\x10\x3c\xe8\xfe\x7c\xd3\x3a\xd0\xef\x16\x86\xfe\xb2' + b'\x5e\x6a\x35\xfb\x64\xe0\x96\xa4' + ), + signature_data=( + b'\x01\xf6\xe5\xff\x04\x22\x1a\xdc\x6c\x2f\x22\xa7\x61\x05\x3b' + b'\xc4\x73\x27\x65\xdd\xdc\x3f\x76\x56\xd0\xd1\x22\xad\x3b\x8a' + b'\x4e\x4f\x8f\xe5\x5b\xd0\xc0\x9e\xb1\x07\x80\xa1\x39\xcd\xa9' + b'\x32\x34\xef\x98\x8f\xe2\x50\x20\x1e\xb2\xfe\xbd\x08\xb6\xee' + b'\x85\xd7\x0d\x16\x05\xa5\xba\x56\x85\x21\x52\x99\xf0\x74\xc8' + b'\x0b\xaf\xf8\x1e\x2c\xa3\x10\x7d\xa9\x17\x5c\x2f\x5a\x7c\x6b' + b'\x60\xea\xa2\x8a\x75\x8c\xa9\x34\xf2\xff\x16\x98\x8f\xe8\x5f' + b'\xf8\x41\x57\xd9\x51\x44\x8a\x85\xec\x1e\xd1\x71\xf9\xef\x8b' + b'\xb8\xa1\x0c\xfa\x14\x7b\x7e\x00' + ) + ) + + response_payload = e._process_signature_verify(payload) + + self.assertEqual( + unique_identifier, + response_payload.unique_identifier + ) + self.assertEqual( + enums.ValidityIndicator.INVALID, + response_payload.validity_indicator + ) + + def test_signature_verify_no_cryptographic_parameters(self): + """ + Test that the right error is thrown when cryptographic parameters + are not provided with a SignatureVerify request. + + Note: once the cryptographic parameters can be obtained from the + encryption key's attributes, this test should be updated to + reflect that. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + signing_key = pie_objects.PublicKey( + enums.CryptographicAlgorithm.RSA, + 1120, + ( + b'\x30\x81\x89\x02\x81\x81\x00\xac\x13\xd9\xfd\xae\x7b\x73\x35' + b'\xb6\x9c\xd9\x85\x67\xe9\x64\x7d\x99\xbf\x37\x3a\x9e\x05\xce' + b'\x34\x35\xd6\x64\x65\xf3\x28\xb7\xf7\x33\x4b\x79\x2a\xee\x7e' + b'\xfa\x04\x4e\xbc\x4c\x7a\x30\xb2\x1a\x5d\x7a\x89\xcd\xb3\xa3' + b'\x0d\xfc\xd9\xfe\xe9\x99\x5e\x09\x41\x5e\xdc\x0b\xf9\xe5\xb4' + b'\xc3\xf7\x4f\xf5\x3f\xb4\xd2\x94\x41\xbf\x1b\x7e\xd6\xcb\xdd' + b'\x4a\x47\xf9\x25\x22\x69\xe1\x64\x6f\x6c\x1a\xee\x05\x14\xe9' + b'\x3f\x6c\xb9\xdf\x71\xd0\x6c\x06\x0a\x21\x04\xb4\x7b\x72\x60' + b'\xac\x37\xc1\x06\x86\x1d\xc7\x8c\xa5\xa2\x5f\xaa\x9c\xb2\xe3' + b'\x02\x03\x01\x00\x01' + ), + masks=[ + enums.CryptographicUsageMask.SIGN, + enums.CryptographicUsageMask.VERIFY + ] + ) + signing_key.state = enums.State.ACTIVE + + e._data_session.add(signing_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + unique_identifier = str(signing_key.unique_identifier) + payload = signature_verify.SignatureVerifyRequestPayload( + unique_identifier=unique_identifier, + data=b'', + signature_data=b'' + ) + + args = (payload, ) + self.assertRaisesRegexp( + exceptions.InvalidField, + "The cryptographic parameters must be specified.", + e._process_signature_verify, + *args + ) + + def test_signature_verify_invalid_signing_key(self): + """ + Test that the right error is thrown when an invalid signing key + is specified with a SignatureVerify request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + signing_key = pie_objects.OpaqueObject( + b'\x01\x02\x03\x04', + enums.OpaqueDataType.NONE + ) + + e._data_session.add(signing_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + unique_identifier = str(signing_key.unique_identifier) + payload = signature_verify.SignatureVerifyRequestPayload( + unique_identifier=unique_identifier, + cryptographic_parameters=attributes.CryptographicParameters( + padding_method=enums.PaddingMethod.PSS, + digital_signature_algorithm=enums.DigitalSignatureAlgorithm. + SHA1_WITH_RSA_ENCRYPTION + ), + data=b'', + signature_data=b'' + ) + + args = (payload, ) + self.assertRaisesRegexp( + exceptions.PermissionDenied, + "The requested signing key is not a public key. A public key must " + "be specified.", + e._process_signature_verify, + *args + ) + + def test_signature_verify_inactive_signing_key(self): + """ + Test that the right error is thrown when an inactive signing key + is specified with a SignatureVerify request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + signing_key = pie_objects.PublicKey( + enums.CryptographicAlgorithm.RSA, + 1120, + ( + b'\x30\x81\x89\x02\x81\x81\x00\xac\x13\xd9\xfd\xae\x7b\x73\x35' + b'\xb6\x9c\xd9\x85\x67\xe9\x64\x7d\x99\xbf\x37\x3a\x9e\x05\xce' + b'\x34\x35\xd6\x64\x65\xf3\x28\xb7\xf7\x33\x4b\x79\x2a\xee\x7e' + b'\xfa\x04\x4e\xbc\x4c\x7a\x30\xb2\x1a\x5d\x7a\x89\xcd\xb3\xa3' + b'\x0d\xfc\xd9\xfe\xe9\x99\x5e\x09\x41\x5e\xdc\x0b\xf9\xe5\xb4' + b'\xc3\xf7\x4f\xf5\x3f\xb4\xd2\x94\x41\xbf\x1b\x7e\xd6\xcb\xdd' + b'\x4a\x47\xf9\x25\x22\x69\xe1\x64\x6f\x6c\x1a\xee\x05\x14\xe9' + b'\x3f\x6c\xb9\xdf\x71\xd0\x6c\x06\x0a\x21\x04\xb4\x7b\x72\x60' + b'\xac\x37\xc1\x06\x86\x1d\xc7\x8c\xa5\xa2\x5f\xaa\x9c\xb2\xe3' + b'\x02\x03\x01\x00\x01' + ), + masks=[ + enums.CryptographicUsageMask.SIGN, + enums.CryptographicUsageMask.VERIFY + ] + ) + + e._data_session.add(signing_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + unique_identifier = str(signing_key.unique_identifier) + payload = signature_verify.SignatureVerifyRequestPayload( + unique_identifier=unique_identifier, + cryptographic_parameters=attributes.CryptographicParameters( + padding_method=enums.PaddingMethod.PSS, + digital_signature_algorithm=enums.DigitalSignatureAlgorithm. + SHA1_WITH_RSA_ENCRYPTION + ), + data=b'', + signature_data=b'' + ) + + args = (payload,) + self.assertRaisesRegexp( + exceptions.PermissionDenied, + "The signing key must be in the Active state to be used for " + "signature verification.", + e._process_signature_verify, + *args + ) + + def test_signature_verify_non_verification_key(self): + """ + Test that the right error is thrown when a non-verification key + is specified with a SignatureVerify request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + signing_key = pie_objects.PublicKey( + enums.CryptographicAlgorithm.RSA, + 1120, + ( + b'\x30\x81\x89\x02\x81\x81\x00\xac\x13\xd9\xfd\xae\x7b\x73\x35' + b'\xb6\x9c\xd9\x85\x67\xe9\x64\x7d\x99\xbf\x37\x3a\x9e\x05\xce' + b'\x34\x35\xd6\x64\x65\xf3\x28\xb7\xf7\x33\x4b\x79\x2a\xee\x7e' + b'\xfa\x04\x4e\xbc\x4c\x7a\x30\xb2\x1a\x5d\x7a\x89\xcd\xb3\xa3' + b'\x0d\xfc\xd9\xfe\xe9\x99\x5e\x09\x41\x5e\xdc\x0b\xf9\xe5\xb4' + b'\xc3\xf7\x4f\xf5\x3f\xb4\xd2\x94\x41\xbf\x1b\x7e\xd6\xcb\xdd' + b'\x4a\x47\xf9\x25\x22\x69\xe1\x64\x6f\x6c\x1a\xee\x05\x14\xe9' + b'\x3f\x6c\xb9\xdf\x71\xd0\x6c\x06\x0a\x21\x04\xb4\x7b\x72\x60' + b'\xac\x37\xc1\x06\x86\x1d\xc7\x8c\xa5\xa2\x5f\xaa\x9c\xb2\xe3' + b'\x02\x03\x01\x00\x01' + ), + masks=[ + enums.CryptographicUsageMask.SIGN + ] + ) + signing_key.state = enums.State.ACTIVE + + e._data_session.add(signing_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + unique_identifier = str(signing_key.unique_identifier) + payload = signature_verify.SignatureVerifyRequestPayload( + unique_identifier=unique_identifier, + cryptographic_parameters=attributes.CryptographicParameters( + padding_method=enums.PaddingMethod.PSS, + digital_signature_algorithm=enums.DigitalSignatureAlgorithm. + SHA1_WITH_RSA_ENCRYPTION + ), + data=b'', + signature_data=b'' + ) + + args = (payload,) + self.assertRaisesRegexp( + exceptions.PermissionDenied, + "The Verify bit must be set in the signing key's cryptographic " + "usage mask.", + e._process_signature_verify, + *args + ) + def test_mac(self): """ Test that a MAC request can be processed correctly.