diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 68f0e21..2e20ce7 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -55,6 +55,7 @@ from kmip.core.messages.payloads import register from kmip.core.messages.payloads import mac from kmip.core.messages.payloads import locate from kmip.core.messages.payloads import sign +from kmip.core.messages.payloads import signature_verify from kmip.core import misc @@ -990,6 +991,8 @@ class KmipEngine(object): return self._process_encrypt(payload) elif operation == enums.Operation.DECRYPT: return self._process_decrypt(payload) + elif operation == enums.Operation.SIGNATURE_VERIFY: + return self._process_signature_verify(payload) elif operation == enums.Operation.MAC: return self._process_mac(payload) elif operation == enums.Operation.SIGN: @@ -1946,6 +1949,7 @@ class KmipEngine(object): contents.Operation(enums.Operation.ENCRYPT), contents.Operation(enums.Operation.DECRYPT), contents.Operation(enums.Operation.SIGN), + contents.Operation(enums.Operation.SIGNATURE_VERIFY), contents.Operation(enums.Operation.MAC) ]) @@ -2117,6 +2121,74 @@ class KmipEngine(object): ) return response_payload + @_kmip_version_supported('1.2') + def _process_signature_verify(self, payload): + self._logger.info("Processing operation: Signature Verify") + + unique_identifier = self._id_placeholder + if payload.unique_identifier: + unique_identifier = payload.unique_identifier + + # The KMIP spec does not indicate that the SignatureVerify operation + # should have it's own operation policy entry. Rather, the + # cryptographic usage mask should be used to determine if the object + # can be used to verify signatures (see below). + managed_object = self._get_object_with_access_controls( + unique_identifier, + enums.Operation.GET + ) + + parameters = payload.cryptographic_parameters + if parameters is None: + # TODO (peter-hamilton): Pull the cryptographic parameters from + # the attributes associated with the signing key. + raise exceptions.InvalidField( + "The cryptographic parameters must be specified." + ) + + # TODO (peter-hamilton): Check the usage limitations for the key to + # confirm that it can be used for this operation. + + if managed_object._object_type != enums.ObjectType.PUBLIC_KEY: + raise exceptions.PermissionDenied( + "The requested signing key is not a public key. A public key " + "must be specified." + ) + + if managed_object.state != enums.State.ACTIVE: + raise exceptions.PermissionDenied( + "The signing key must be in the Active state to be used for " + "signature verification." + ) + + masks = managed_object.cryptographic_usage_masks + if enums.CryptographicUsageMask.VERIFY not in masks: + raise exceptions.PermissionDenied( + "The Verify bit must be set in the signing key's " + "cryptographic usage mask." + ) + + result = self._cryptography_engine.verify_signature( + signing_key=managed_object.value, + message=payload.data, + signature=payload.signature_data, + padding_method=parameters.padding_method, + signing_algorithm=parameters.cryptographic_algorithm, + hashing_algorithm=parameters.hashing_algorithm, + digital_signature_algorithm=parameters.digital_signature_algorithm + ) + + if result: + validity = enums.ValidityIndicator.VALID + else: + validity = enums.ValidityIndicator.INVALID + + response_payload = signature_verify.SignatureVerifyResponsePayload( + unique_identifier=unique_identifier, + validity_indicator=validity + ) + return response_payload + @_kmip_version_supported('1.2') def _process_mac(self, payload): self._logger.info("Processing operation: MAC") diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 382ef54..48154bc 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -56,6 +56,7 @@ from kmip.core.messages.payloads import register from kmip.core.messages.payloads import mac from kmip.core.messages.payloads import locate from kmip.core.messages.payloads import sign +from kmip.core.messages.payloads import signature_verify from kmip.pie import objects as pie_objects from kmip.pie import sqltypes @@ -902,6 +903,7 @@ class TestKmipEngine(testtools.TestCase): e._process_discover_versions = mock.MagicMock() e._process_encrypt = mock.MagicMock() e._process_decrypt = mock.MagicMock() + e._process_signature_verify = mock.MagicMock() e._process_mac = mock.MagicMock() e._process_sign = mock.MagicMock() @@ -920,6 +922,7 @@ class TestKmipEngine(testtools.TestCase): e._process_operation(enums.Operation.ENCRYPT, None) e._process_operation(enums.Operation.DECRYPT, None) e._process_operation(enums.Operation.SIGN, None) + e._process_operation(enums.Operation.SIGNATURE_VERIFY, None) e._process_operation(enums.Operation.MAC, None) e._process_create.assert_called_with(None) @@ -936,6 +939,7 @@ class TestKmipEngine(testtools.TestCase): e._process_discover_versions.assert_called_with(None) e._process_encrypt.assert_called_with(None) e._process_decrypt.assert_called_with(None) + e._process_signature_verify.assert_called_with(None) e._process_mac.assert_called_with(None) def test_unsupported_operation(self): @@ -6520,7 +6524,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsNotNone(result.operations) - self.assertEqual(16, len(result.operations)) + self.assertEqual(17, len(result.operations)) self.assertEqual( enums.Operation.CREATE, result.operations[0].value @@ -6582,9 +6586,13 @@ class TestKmipEngine(testtools.TestCase): result.operations[14].value ) self.assertEqual( - enums.Operation.MAC, + enums.Operation.SIGNATURE_VERIFY, result.operations[15].value ) + self.assertEqual( + enums.Operation.MAC, + result.operations[16].value + ) self.assertEqual(list(), result.object_types) self.assertIsNotNone(result.vendor_identification) self.assertEqual( @@ -7294,6 +7302,352 @@ class TestKmipEngine(testtools.TestCase): *args ) + def test_signature_verify(self): + """ + Test that a SignatureVerify request can be processed correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + signing_key = pie_objects.PublicKey( + enums.CryptographicAlgorithm.RSA, + 1120, + ( + b'\x30\x81\x89\x02\x81\x81\x00\xac\x13\xd9\xfd\xae\x7b\x73\x35' + b'\xb6\x9c\xd9\x85\x67\xe9\x64\x7d\x99\xbf\x37\x3a\x9e\x05\xce' + b'\x34\x35\xd6\x64\x65\xf3\x28\xb7\xf7\x33\x4b\x79\x2a\xee\x7e' + b'\xfa\x04\x4e\xbc\x4c\x7a\x30\xb2\x1a\x5d\x7a\x89\xcd\xb3\xa3' + b'\x0d\xfc\xd9\xfe\xe9\x99\x5e\x09\x41\x5e\xdc\x0b\xf9\xe5\xb4' + b'\xc3\xf7\x4f\xf5\x3f\xb4\xd2\x94\x41\xbf\x1b\x7e\xd6\xcb\xdd' + b'\x4a\x47\xf9\x25\x22\x69\xe1\x64\x6f\x6c\x1a\xee\x05\x14\xe9' + b'\x3f\x6c\xb9\xdf\x71\xd0\x6c\x06\x0a\x21\x04\xb4\x7b\x72\x60' + b'\xac\x37\xc1\x06\x86\x1d\xc7\x8c\xa5\xa2\x5f\xaa\x9c\xb2\xe3' + b'\x02\x03\x01\x00\x01' + ), + masks=[ + enums.CryptographicUsageMask.SIGN, + enums.CryptographicUsageMask.VERIFY + ] + ) + signing_key.state = enums.State.ACTIVE + + e._data_session.add(signing_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + # Test a valid signature + unique_identifier = str(signing_key.unique_identifier) + payload = signature_verify.SignatureVerifyRequestPayload( + unique_identifier=unique_identifier, + cryptographic_parameters=attributes.CryptographicParameters( + padding_method=enums.PaddingMethod.PSS, + digital_signature_algorithm=enums.DigitalSignatureAlgorithm. + SHA1_WITH_RSA_ENCRYPTION + ), + data=( + b'\xe1\xc0\xf9\x8d\x53\xf8\xf8\xb1\x41\x90\x57\xd5\xb9\xb1\x0b' + b'\x07\xfe\xea\xec\x32\xc0\x46\x3a\x4d\x68\x38\x2f\x53\x1b\xa1' + b'\xd6\xcf\xe4\xed\x38\xa2\x69\x4a\x34\xb9\xc8\x05\xad\xf0\x72' + b'\xff\xbc\xeb\xe2\x1d\x8d\x4b\x5c\x0e\x8c\x33\x45\x2d\xd8\xf9' + b'\xc9\xbf\x45\xd1\xe6\x33\x75\x11\x33\x58\x82\x29\xd2\x93\xc6' + b'\x49\x6b\x7c\x98\x3c\x2c\x72\xbd\x21\xd3\x39\x27\x2d\x78\x28' + b'\xb0\xd0\x9d\x01\x0b\xba\xd3\x18\xd9\x98\xf7\x04\x79\x67\x33' + b'\x8a\xce\xfd\x01\xe8\x74\xac\xe5\xf8\x6d\x2a\x60\xf3\xb3\xca' + b'\xe1\x3f\xc5\xc6\x65\x08\xcf\xb7\x23\x78\xfd\xd6\xc8\xde\x24' + b'\x97\x65\x10\x3c\xe8\xfe\x7c\xd3\x3a\xd0\xef\x16\x86\xfe\xb2' + b'\x5e\x6a\x35\xfb\x64\xe0\x96\xa4' + ), + signature_data=( + b'\x01\xf6\xe5\xff\x04\x22\x1a\xdc\x6c\x2f\x22\xa7\x61\x05\x3b' + b'\xc4\x73\x27\x65\xdd\xdc\x3f\x76\x56\xd0\xd1\x22\xad\x3b\x8a' + b'\x4e\x4f\x8f\xe5\x5b\xd0\xc0\x9e\xb1\x07\x80\xa1\x39\xcd\xa9' + b'\x32\x34\xef\x98\x8f\xe2\x50\x20\x1e\xb2\xfe\xbd\x08\xb6\xee' + b'\x85\xd7\x0d\x16\x05\xa5\xba\x56\x85\x21\x52\x99\xf0\x74\xc8' + b'\x0b\xaf\xf8\x1e\x2c\xa3\x10\x7d\xa9\x17\x5c\x2f\x5a\x7c\x6b' + b'\x60\xea\xa2\x8a\x75\x8c\xa9\x34\xf2\xff\x16\x98\x8f\xe8\x5f' + b'\xf8\x41\x57\xd9\x51\x44\x8a\x85\xec\x1e\xd1\x71\xf9\xef\x8b' + b'\xb8\xa1\x0c\xfa\x14\x7b\x7e\xf8' + ) + ) + + response_payload = e._process_signature_verify(payload) + + e._logger.info.assert_any_call( + "Processing operation: Signature Verify" + ) + self.assertEqual( + unique_identifier, + response_payload.unique_identifier + ) + self.assertEqual( + enums.ValidityIndicator.VALID, + response_payload.validity_indicator + ) + + # Test an invalid signature + payload = signature_verify.SignatureVerifyRequestPayload( + unique_identifier=unique_identifier, + cryptographic_parameters=attributes.CryptographicParameters( + padding_method=enums.PaddingMethod.PSS, + digital_signature_algorithm=enums.DigitalSignatureAlgorithm. + SHA1_WITH_RSA_ENCRYPTION + ), + data=( + b'\xe1\xc0\xf9\x8d\x53\xf8\xf8\xb1\x41\x90\x57\xd5\xb9\xb1\x0b' + b'\x07\xfe\xea\xec\x32\xc0\x46\x3a\x4d\x68\x38\x2f\x53\x1b\xa1' + b'\xd6\xcf\xe4\xed\x38\xa2\x69\x4a\x34\xb9\xc8\x05\xad\xf0\x72' + b'\xff\xbc\xeb\xe2\x1d\x8d\x4b\x5c\x0e\x8c\x33\x45\x2d\xd8\xf9' + b'\xc9\xbf\x45\xd1\xe6\x33\x75\x11\x33\x58\x82\x29\xd2\x93\xc6' + b'\x49\x6b\x7c\x98\x3c\x2c\x72\xbd\x21\xd3\x39\x27\x2d\x78\x28' + b'\xb0\xd0\x9d\x01\x0b\xba\xd3\x18\xd9\x98\xf7\x04\x79\x67\x33' + b'\x8a\xce\xfd\x01\xe8\x74\xac\xe5\xf8\x6d\x2a\x60\xf3\xb3\xca' + b'\xe1\x3f\xc5\xc6\x65\x08\xcf\xb7\x23\x78\xfd\xd6\xc8\xde\x24' + b'\x97\x65\x10\x3c\xe8\xfe\x7c\xd3\x3a\xd0\xef\x16\x86\xfe\xb2' + b'\x5e\x6a\x35\xfb\x64\xe0\x96\xa4' + ), + signature_data=( + b'\x01\xf6\xe5\xff\x04\x22\x1a\xdc\x6c\x2f\x22\xa7\x61\x05\x3b' + b'\xc4\x73\x27\x65\xdd\xdc\x3f\x76\x56\xd0\xd1\x22\xad\x3b\x8a' + b'\x4e\x4f\x8f\xe5\x5b\xd0\xc0\x9e\xb1\x07\x80\xa1\x39\xcd\xa9' + b'\x32\x34\xef\x98\x8f\xe2\x50\x20\x1e\xb2\xfe\xbd\x08\xb6\xee' + b'\x85\xd7\x0d\x16\x05\xa5\xba\x56\x85\x21\x52\x99\xf0\x74\xc8' + b'\x0b\xaf\xf8\x1e\x2c\xa3\x10\x7d\xa9\x17\x5c\x2f\x5a\x7c\x6b' + b'\x60\xea\xa2\x8a\x75\x8c\xa9\x34\xf2\xff\x16\x98\x8f\xe8\x5f' + b'\xf8\x41\x57\xd9\x51\x44\x8a\x85\xec\x1e\xd1\x71\xf9\xef\x8b' + b'\xb8\xa1\x0c\xfa\x14\x7b\x7e\x00' + ) + ) + + response_payload = e._process_signature_verify(payload) + + self.assertEqual( + unique_identifier, + response_payload.unique_identifier + ) + self.assertEqual( + enums.ValidityIndicator.INVALID, + response_payload.validity_indicator + ) + + def test_signature_verify_no_cryptographic_parameters(self): + """ + Test that the right error is thrown when cryptographic parameters + are not provided with a SignatureVerify request. + + Note: once the cryptographic parameters can be obtained from the + encryption key's attributes, this test should be updated to + reflect that. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + signing_key = pie_objects.PublicKey( + enums.CryptographicAlgorithm.RSA, + 1120, + ( + b'\x30\x81\x89\x02\x81\x81\x00\xac\x13\xd9\xfd\xae\x7b\x73\x35' + b'\xb6\x9c\xd9\x85\x67\xe9\x64\x7d\x99\xbf\x37\x3a\x9e\x05\xce' + b'\x34\x35\xd6\x64\x65\xf3\x28\xb7\xf7\x33\x4b\x79\x2a\xee\x7e' + b'\xfa\x04\x4e\xbc\x4c\x7a\x30\xb2\x1a\x5d\x7a\x89\xcd\xb3\xa3' + b'\x0d\xfc\xd9\xfe\xe9\x99\x5e\x09\x41\x5e\xdc\x0b\xf9\xe5\xb4' + b'\xc3\xf7\x4f\xf5\x3f\xb4\xd2\x94\x41\xbf\x1b\x7e\xd6\xcb\xdd' + b'\x4a\x47\xf9\x25\x22\x69\xe1\x64\x6f\x6c\x1a\xee\x05\x14\xe9' + b'\x3f\x6c\xb9\xdf\x71\xd0\x6c\x06\x0a\x21\x04\xb4\x7b\x72\x60' + b'\xac\x37\xc1\x06\x86\x1d\xc7\x8c\xa5\xa2\x5f\xaa\x9c\xb2\xe3' + b'\x02\x03\x01\x00\x01' + ), + masks=[ + enums.CryptographicUsageMask.SIGN, + enums.CryptographicUsageMask.VERIFY + ] + ) + signing_key.state = enums.State.ACTIVE + + e._data_session.add(signing_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + unique_identifier = str(signing_key.unique_identifier) + payload = signature_verify.SignatureVerifyRequestPayload( + unique_identifier=unique_identifier, + data=b'', + signature_data=b'' + ) + + args = (payload, ) + self.assertRaisesRegexp( + exceptions.InvalidField, + "The cryptographic parameters must be specified.", + e._process_signature_verify, + *args + ) + + def test_signature_verify_invalid_signing_key(self): + """ + Test that the right error is thrown when an invalid signing key + is specified with a SignatureVerify request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + signing_key = pie_objects.OpaqueObject( + b'\x01\x02\x03\x04', + enums.OpaqueDataType.NONE + ) + + e._data_session.add(signing_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + unique_identifier = str(signing_key.unique_identifier) + payload = signature_verify.SignatureVerifyRequestPayload( + unique_identifier=unique_identifier, + cryptographic_parameters=attributes.CryptographicParameters( + padding_method=enums.PaddingMethod.PSS, + digital_signature_algorithm=enums.DigitalSignatureAlgorithm. + SHA1_WITH_RSA_ENCRYPTION + ), + data=b'', + signature_data=b'' + ) + + args = (payload, ) + self.assertRaisesRegexp( + exceptions.PermissionDenied, + "The requested signing key is not a public key. A public key must " + "be specified.", + e._process_signature_verify, + *args + ) + + def test_signature_verify_inactive_signing_key(self): + """ + Test that the right error is thrown when an inactive signing key + is specified with a SignatureVerify request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + signing_key = pie_objects.PublicKey( + enums.CryptographicAlgorithm.RSA, + 1120, + ( + b'\x30\x81\x89\x02\x81\x81\x00\xac\x13\xd9\xfd\xae\x7b\x73\x35' + b'\xb6\x9c\xd9\x85\x67\xe9\x64\x7d\x99\xbf\x37\x3a\x9e\x05\xce' + b'\x34\x35\xd6\x64\x65\xf3\x28\xb7\xf7\x33\x4b\x79\x2a\xee\x7e' + b'\xfa\x04\x4e\xbc\x4c\x7a\x30\xb2\x1a\x5d\x7a\x89\xcd\xb3\xa3' + b'\x0d\xfc\xd9\xfe\xe9\x99\x5e\x09\x41\x5e\xdc\x0b\xf9\xe5\xb4' + b'\xc3\xf7\x4f\xf5\x3f\xb4\xd2\x94\x41\xbf\x1b\x7e\xd6\xcb\xdd' + b'\x4a\x47\xf9\x25\x22\x69\xe1\x64\x6f\x6c\x1a\xee\x05\x14\xe9' + b'\x3f\x6c\xb9\xdf\x71\xd0\x6c\x06\x0a\x21\x04\xb4\x7b\x72\x60' + b'\xac\x37\xc1\x06\x86\x1d\xc7\x8c\xa5\xa2\x5f\xaa\x9c\xb2\xe3' + b'\x02\x03\x01\x00\x01' + ), + masks=[ + enums.CryptographicUsageMask.SIGN, + enums.CryptographicUsageMask.VERIFY + ] + ) + + e._data_session.add(signing_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + unique_identifier = str(signing_key.unique_identifier) + payload = signature_verify.SignatureVerifyRequestPayload( + unique_identifier=unique_identifier, + cryptographic_parameters=attributes.CryptographicParameters( + padding_method=enums.PaddingMethod.PSS, + digital_signature_algorithm=enums.DigitalSignatureAlgorithm. + SHA1_WITH_RSA_ENCRYPTION + ), + data=b'', + signature_data=b'' + ) + + args = (payload,) + self.assertRaisesRegexp( + exceptions.PermissionDenied, + "The signing key must be in the Active state to be used for " + "signature verification.", + e._process_signature_verify, + *args + ) + + def test_signature_verify_non_verification_key(self): + """ + Test that the right error is thrown when a non-verification key + is specified with a SignatureVerify request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + signing_key = pie_objects.PublicKey( + enums.CryptographicAlgorithm.RSA, + 1120, + ( + b'\x30\x81\x89\x02\x81\x81\x00\xac\x13\xd9\xfd\xae\x7b\x73\x35' + b'\xb6\x9c\xd9\x85\x67\xe9\x64\x7d\x99\xbf\x37\x3a\x9e\x05\xce' + b'\x34\x35\xd6\x64\x65\xf3\x28\xb7\xf7\x33\x4b\x79\x2a\xee\x7e' + b'\xfa\x04\x4e\xbc\x4c\x7a\x30\xb2\x1a\x5d\x7a\x89\xcd\xb3\xa3' + b'\x0d\xfc\xd9\xfe\xe9\x99\x5e\x09\x41\x5e\xdc\x0b\xf9\xe5\xb4' + b'\xc3\xf7\x4f\xf5\x3f\xb4\xd2\x94\x41\xbf\x1b\x7e\xd6\xcb\xdd' + b'\x4a\x47\xf9\x25\x22\x69\xe1\x64\x6f\x6c\x1a\xee\x05\x14\xe9' + b'\x3f\x6c\xb9\xdf\x71\xd0\x6c\x06\x0a\x21\x04\xb4\x7b\x72\x60' + b'\xac\x37\xc1\x06\x86\x1d\xc7\x8c\xa5\xa2\x5f\xaa\x9c\xb2\xe3' + b'\x02\x03\x01\x00\x01' + ), + masks=[ + enums.CryptographicUsageMask.SIGN + ] + ) + signing_key.state = enums.State.ACTIVE + + e._data_session.add(signing_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + unique_identifier = str(signing_key.unique_identifier) + payload = signature_verify.SignatureVerifyRequestPayload( + unique_identifier=unique_identifier, + cryptographic_parameters=attributes.CryptographicParameters( + padding_method=enums.PaddingMethod.PSS, + digital_signature_algorithm=enums.DigitalSignatureAlgorithm. + SHA1_WITH_RSA_ENCRYPTION + ), + data=b'', + signature_data=b'' + ) + + args = (payload,) + self.assertRaisesRegexp( + exceptions.PermissionDenied, + "The Verify bit must be set in the signing key's cryptographic " + "usage mask.", + e._process_signature_verify, + *args + ) + def test_mac(self): """ Test that a MAC request can be processed correctly.