Merge pull request #327 from OpenKMIP/feat/add-signature-verify-to-server

Add SignatureVerify support to the server
This commit is contained in:
Peter Hamilton 2017-08-29 16:33:48 -04:00 committed by GitHub
commit f565fbf93a
2 changed files with 428 additions and 2 deletions

View File

@ -55,6 +55,7 @@ from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import mac
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import sign
from kmip.core.messages.payloads import signature_verify
from kmip.core import misc
@ -990,6 +991,8 @@ class KmipEngine(object):
return self._process_encrypt(payload)
elif operation == enums.Operation.DECRYPT:
return self._process_decrypt(payload)
elif operation == enums.Operation.SIGNATURE_VERIFY:
return self._process_signature_verify(payload)
elif operation == enums.Operation.MAC:
return self._process_mac(payload)
elif operation == enums.Operation.SIGN:
@ -1946,6 +1949,7 @@ class KmipEngine(object):
contents.Operation(enums.Operation.ENCRYPT),
contents.Operation(enums.Operation.DECRYPT),
contents.Operation(enums.Operation.SIGN),
contents.Operation(enums.Operation.SIGNATURE_VERIFY),
contents.Operation(enums.Operation.MAC)
])
@ -2117,6 +2121,74 @@ class KmipEngine(object):
)
return response_payload
@_kmip_version_supported('1.2')
def _process_signature_verify(self, payload):
self._logger.info("Processing operation: Signature Verify")
unique_identifier = self._id_placeholder
if payload.unique_identifier:
unique_identifier = payload.unique_identifier
# The KMIP spec does not indicate that the SignatureVerify operation
# should have it's own operation policy entry. Rather, the
# cryptographic usage mask should be used to determine if the object
# can be used to verify signatures (see below).
managed_object = self._get_object_with_access_controls(
unique_identifier,
enums.Operation.GET
)
parameters = payload.cryptographic_parameters
if parameters is None:
# TODO (peter-hamilton): Pull the cryptographic parameters from
# the attributes associated with the signing key.
raise exceptions.InvalidField(
"The cryptographic parameters must be specified."
)
# TODO (peter-hamilton): Check the usage limitations for the key to
# confirm that it can be used for this operation.
if managed_object._object_type != enums.ObjectType.PUBLIC_KEY:
raise exceptions.PermissionDenied(
"The requested signing key is not a public key. A public key "
"must be specified."
)
if managed_object.state != enums.State.ACTIVE:
raise exceptions.PermissionDenied(
"The signing key must be in the Active state to be used for "
"signature verification."
)
masks = managed_object.cryptographic_usage_masks
if enums.CryptographicUsageMask.VERIFY not in masks:
raise exceptions.PermissionDenied(
"The Verify bit must be set in the signing key's "
"cryptographic usage mask."
)
result = self._cryptography_engine.verify_signature(
signing_key=managed_object.value,
message=payload.data,
signature=payload.signature_data,
padding_method=parameters.padding_method,
signing_algorithm=parameters.cryptographic_algorithm,
hashing_algorithm=parameters.hashing_algorithm,
digital_signature_algorithm=parameters.digital_signature_algorithm
)
if result:
validity = enums.ValidityIndicator.VALID
else:
validity = enums.ValidityIndicator.INVALID
response_payload = signature_verify.SignatureVerifyResponsePayload(
unique_identifier=unique_identifier,
validity_indicator=validity
)
return response_payload
@_kmip_version_supported('1.2')
def _process_mac(self, payload):
self._logger.info("Processing operation: MAC")

View File

@ -56,6 +56,7 @@ from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import mac
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import sign
from kmip.core.messages.payloads import signature_verify
from kmip.pie import objects as pie_objects
from kmip.pie import sqltypes
@ -902,6 +903,7 @@ class TestKmipEngine(testtools.TestCase):
e._process_discover_versions = mock.MagicMock()
e._process_encrypt = mock.MagicMock()
e._process_decrypt = mock.MagicMock()
e._process_signature_verify = mock.MagicMock()
e._process_mac = mock.MagicMock()
e._process_sign = mock.MagicMock()
@ -920,6 +922,7 @@ class TestKmipEngine(testtools.TestCase):
e._process_operation(enums.Operation.ENCRYPT, None)
e._process_operation(enums.Operation.DECRYPT, None)
e._process_operation(enums.Operation.SIGN, None)
e._process_operation(enums.Operation.SIGNATURE_VERIFY, None)
e._process_operation(enums.Operation.MAC, None)
e._process_create.assert_called_with(None)
@ -936,6 +939,7 @@ class TestKmipEngine(testtools.TestCase):
e._process_discover_versions.assert_called_with(None)
e._process_encrypt.assert_called_with(None)
e._process_decrypt.assert_called_with(None)
e._process_signature_verify.assert_called_with(None)
e._process_mac.assert_called_with(None)
def test_unsupported_operation(self):
@ -6520,7 +6524,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertEqual(16, len(result.operations))
self.assertEqual(17, len(result.operations))
self.assertEqual(
enums.Operation.CREATE,
result.operations[0].value
@ -6582,9 +6586,13 @@ class TestKmipEngine(testtools.TestCase):
result.operations[14].value
)
self.assertEqual(
enums.Operation.MAC,
enums.Operation.SIGNATURE_VERIFY,
result.operations[15].value
)
self.assertEqual(
enums.Operation.MAC,
result.operations[16].value
)
self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(
@ -7294,6 +7302,352 @@ class TestKmipEngine(testtools.TestCase):
*args
)
def test_signature_verify(self):
"""
Test that a SignatureVerify request can be processed correctly.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
signing_key = pie_objects.PublicKey(
enums.CryptographicAlgorithm.RSA,
1120,
(
b'\x30\x81\x89\x02\x81\x81\x00\xac\x13\xd9\xfd\xae\x7b\x73\x35'
b'\xb6\x9c\xd9\x85\x67\xe9\x64\x7d\x99\xbf\x37\x3a\x9e\x05\xce'
b'\x34\x35\xd6\x64\x65\xf3\x28\xb7\xf7\x33\x4b\x79\x2a\xee\x7e'
b'\xfa\x04\x4e\xbc\x4c\x7a\x30\xb2\x1a\x5d\x7a\x89\xcd\xb3\xa3'
b'\x0d\xfc\xd9\xfe\xe9\x99\x5e\x09\x41\x5e\xdc\x0b\xf9\xe5\xb4'
b'\xc3\xf7\x4f\xf5\x3f\xb4\xd2\x94\x41\xbf\x1b\x7e\xd6\xcb\xdd'
b'\x4a\x47\xf9\x25\x22\x69\xe1\x64\x6f\x6c\x1a\xee\x05\x14\xe9'
b'\x3f\x6c\xb9\xdf\x71\xd0\x6c\x06\x0a\x21\x04\xb4\x7b\x72\x60'
b'\xac\x37\xc1\x06\x86\x1d\xc7\x8c\xa5\xa2\x5f\xaa\x9c\xb2\xe3'
b'\x02\x03\x01\x00\x01'
),
masks=[
enums.CryptographicUsageMask.SIGN,
enums.CryptographicUsageMask.VERIFY
]
)
signing_key.state = enums.State.ACTIVE
e._data_session.add(signing_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
# Test a valid signature
unique_identifier = str(signing_key.unique_identifier)
payload = signature_verify.SignatureVerifyRequestPayload(
unique_identifier=unique_identifier,
cryptographic_parameters=attributes.CryptographicParameters(
padding_method=enums.PaddingMethod.PSS,
digital_signature_algorithm=enums.DigitalSignatureAlgorithm.
SHA1_WITH_RSA_ENCRYPTION
),
data=(
b'\xe1\xc0\xf9\x8d\x53\xf8\xf8\xb1\x41\x90\x57\xd5\xb9\xb1\x0b'
b'\x07\xfe\xea\xec\x32\xc0\x46\x3a\x4d\x68\x38\x2f\x53\x1b\xa1'
b'\xd6\xcf\xe4\xed\x38\xa2\x69\x4a\x34\xb9\xc8\x05\xad\xf0\x72'
b'\xff\xbc\xeb\xe2\x1d\x8d\x4b\x5c\x0e\x8c\x33\x45\x2d\xd8\xf9'
b'\xc9\xbf\x45\xd1\xe6\x33\x75\x11\x33\x58\x82\x29\xd2\x93\xc6'
b'\x49\x6b\x7c\x98\x3c\x2c\x72\xbd\x21\xd3\x39\x27\x2d\x78\x28'
b'\xb0\xd0\x9d\x01\x0b\xba\xd3\x18\xd9\x98\xf7\x04\x79\x67\x33'
b'\x8a\xce\xfd\x01\xe8\x74\xac\xe5\xf8\x6d\x2a\x60\xf3\xb3\xca'
b'\xe1\x3f\xc5\xc6\x65\x08\xcf\xb7\x23\x78\xfd\xd6\xc8\xde\x24'
b'\x97\x65\x10\x3c\xe8\xfe\x7c\xd3\x3a\xd0\xef\x16\x86\xfe\xb2'
b'\x5e\x6a\x35\xfb\x64\xe0\x96\xa4'
),
signature_data=(
b'\x01\xf6\xe5\xff\x04\x22\x1a\xdc\x6c\x2f\x22\xa7\x61\x05\x3b'
b'\xc4\x73\x27\x65\xdd\xdc\x3f\x76\x56\xd0\xd1\x22\xad\x3b\x8a'
b'\x4e\x4f\x8f\xe5\x5b\xd0\xc0\x9e\xb1\x07\x80\xa1\x39\xcd\xa9'
b'\x32\x34\xef\x98\x8f\xe2\x50\x20\x1e\xb2\xfe\xbd\x08\xb6\xee'
b'\x85\xd7\x0d\x16\x05\xa5\xba\x56\x85\x21\x52\x99\xf0\x74\xc8'
b'\x0b\xaf\xf8\x1e\x2c\xa3\x10\x7d\xa9\x17\x5c\x2f\x5a\x7c\x6b'
b'\x60\xea\xa2\x8a\x75\x8c\xa9\x34\xf2\xff\x16\x98\x8f\xe8\x5f'
b'\xf8\x41\x57\xd9\x51\x44\x8a\x85\xec\x1e\xd1\x71\xf9\xef\x8b'
b'\xb8\xa1\x0c\xfa\x14\x7b\x7e\xf8'
)
)
response_payload = e._process_signature_verify(payload)
e._logger.info.assert_any_call(
"Processing operation: Signature Verify"
)
self.assertEqual(
unique_identifier,
response_payload.unique_identifier
)
self.assertEqual(
enums.ValidityIndicator.VALID,
response_payload.validity_indicator
)
# Test an invalid signature
payload = signature_verify.SignatureVerifyRequestPayload(
unique_identifier=unique_identifier,
cryptographic_parameters=attributes.CryptographicParameters(
padding_method=enums.PaddingMethod.PSS,
digital_signature_algorithm=enums.DigitalSignatureAlgorithm.
SHA1_WITH_RSA_ENCRYPTION
),
data=(
b'\xe1\xc0\xf9\x8d\x53\xf8\xf8\xb1\x41\x90\x57\xd5\xb9\xb1\x0b'
b'\x07\xfe\xea\xec\x32\xc0\x46\x3a\x4d\x68\x38\x2f\x53\x1b\xa1'
b'\xd6\xcf\xe4\xed\x38\xa2\x69\x4a\x34\xb9\xc8\x05\xad\xf0\x72'
b'\xff\xbc\xeb\xe2\x1d\x8d\x4b\x5c\x0e\x8c\x33\x45\x2d\xd8\xf9'
b'\xc9\xbf\x45\xd1\xe6\x33\x75\x11\x33\x58\x82\x29\xd2\x93\xc6'
b'\x49\x6b\x7c\x98\x3c\x2c\x72\xbd\x21\xd3\x39\x27\x2d\x78\x28'
b'\xb0\xd0\x9d\x01\x0b\xba\xd3\x18\xd9\x98\xf7\x04\x79\x67\x33'
b'\x8a\xce\xfd\x01\xe8\x74\xac\xe5\xf8\x6d\x2a\x60\xf3\xb3\xca'
b'\xe1\x3f\xc5\xc6\x65\x08\xcf\xb7\x23\x78\xfd\xd6\xc8\xde\x24'
b'\x97\x65\x10\x3c\xe8\xfe\x7c\xd3\x3a\xd0\xef\x16\x86\xfe\xb2'
b'\x5e\x6a\x35\xfb\x64\xe0\x96\xa4'
),
signature_data=(
b'\x01\xf6\xe5\xff\x04\x22\x1a\xdc\x6c\x2f\x22\xa7\x61\x05\x3b'
b'\xc4\x73\x27\x65\xdd\xdc\x3f\x76\x56\xd0\xd1\x22\xad\x3b\x8a'
b'\x4e\x4f\x8f\xe5\x5b\xd0\xc0\x9e\xb1\x07\x80\xa1\x39\xcd\xa9'
b'\x32\x34\xef\x98\x8f\xe2\x50\x20\x1e\xb2\xfe\xbd\x08\xb6\xee'
b'\x85\xd7\x0d\x16\x05\xa5\xba\x56\x85\x21\x52\x99\xf0\x74\xc8'
b'\x0b\xaf\xf8\x1e\x2c\xa3\x10\x7d\xa9\x17\x5c\x2f\x5a\x7c\x6b'
b'\x60\xea\xa2\x8a\x75\x8c\xa9\x34\xf2\xff\x16\x98\x8f\xe8\x5f'
b'\xf8\x41\x57\xd9\x51\x44\x8a\x85\xec\x1e\xd1\x71\xf9\xef\x8b'
b'\xb8\xa1\x0c\xfa\x14\x7b\x7e\x00'
)
)
response_payload = e._process_signature_verify(payload)
self.assertEqual(
unique_identifier,
response_payload.unique_identifier
)
self.assertEqual(
enums.ValidityIndicator.INVALID,
response_payload.validity_indicator
)
def test_signature_verify_no_cryptographic_parameters(self):
"""
Test that the right error is thrown when cryptographic parameters
are not provided with a SignatureVerify request.
Note: once the cryptographic parameters can be obtained from the
encryption key's attributes, this test should be updated to
reflect that.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
signing_key = pie_objects.PublicKey(
enums.CryptographicAlgorithm.RSA,
1120,
(
b'\x30\x81\x89\x02\x81\x81\x00\xac\x13\xd9\xfd\xae\x7b\x73\x35'
b'\xb6\x9c\xd9\x85\x67\xe9\x64\x7d\x99\xbf\x37\x3a\x9e\x05\xce'
b'\x34\x35\xd6\x64\x65\xf3\x28\xb7\xf7\x33\x4b\x79\x2a\xee\x7e'
b'\xfa\x04\x4e\xbc\x4c\x7a\x30\xb2\x1a\x5d\x7a\x89\xcd\xb3\xa3'
b'\x0d\xfc\xd9\xfe\xe9\x99\x5e\x09\x41\x5e\xdc\x0b\xf9\xe5\xb4'
b'\xc3\xf7\x4f\xf5\x3f\xb4\xd2\x94\x41\xbf\x1b\x7e\xd6\xcb\xdd'
b'\x4a\x47\xf9\x25\x22\x69\xe1\x64\x6f\x6c\x1a\xee\x05\x14\xe9'
b'\x3f\x6c\xb9\xdf\x71\xd0\x6c\x06\x0a\x21\x04\xb4\x7b\x72\x60'
b'\xac\x37\xc1\x06\x86\x1d\xc7\x8c\xa5\xa2\x5f\xaa\x9c\xb2\xe3'
b'\x02\x03\x01\x00\x01'
),
masks=[
enums.CryptographicUsageMask.SIGN,
enums.CryptographicUsageMask.VERIFY
]
)
signing_key.state = enums.State.ACTIVE
e._data_session.add(signing_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
unique_identifier = str(signing_key.unique_identifier)
payload = signature_verify.SignatureVerifyRequestPayload(
unique_identifier=unique_identifier,
data=b'',
signature_data=b''
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.InvalidField,
"The cryptographic parameters must be specified.",
e._process_signature_verify,
*args
)
def test_signature_verify_invalid_signing_key(self):
"""
Test that the right error is thrown when an invalid signing key
is specified with a SignatureVerify request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
signing_key = pie_objects.OpaqueObject(
b'\x01\x02\x03\x04',
enums.OpaqueDataType.NONE
)
e._data_session.add(signing_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
unique_identifier = str(signing_key.unique_identifier)
payload = signature_verify.SignatureVerifyRequestPayload(
unique_identifier=unique_identifier,
cryptographic_parameters=attributes.CryptographicParameters(
padding_method=enums.PaddingMethod.PSS,
digital_signature_algorithm=enums.DigitalSignatureAlgorithm.
SHA1_WITH_RSA_ENCRYPTION
),
data=b'',
signature_data=b''
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The requested signing key is not a public key. A public key must "
"be specified.",
e._process_signature_verify,
*args
)
def test_signature_verify_inactive_signing_key(self):
"""
Test that the right error is thrown when an inactive signing key
is specified with a SignatureVerify request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
signing_key = pie_objects.PublicKey(
enums.CryptographicAlgorithm.RSA,
1120,
(
b'\x30\x81\x89\x02\x81\x81\x00\xac\x13\xd9\xfd\xae\x7b\x73\x35'
b'\xb6\x9c\xd9\x85\x67\xe9\x64\x7d\x99\xbf\x37\x3a\x9e\x05\xce'
b'\x34\x35\xd6\x64\x65\xf3\x28\xb7\xf7\x33\x4b\x79\x2a\xee\x7e'
b'\xfa\x04\x4e\xbc\x4c\x7a\x30\xb2\x1a\x5d\x7a\x89\xcd\xb3\xa3'
b'\x0d\xfc\xd9\xfe\xe9\x99\x5e\x09\x41\x5e\xdc\x0b\xf9\xe5\xb4'
b'\xc3\xf7\x4f\xf5\x3f\xb4\xd2\x94\x41\xbf\x1b\x7e\xd6\xcb\xdd'
b'\x4a\x47\xf9\x25\x22\x69\xe1\x64\x6f\x6c\x1a\xee\x05\x14\xe9'
b'\x3f\x6c\xb9\xdf\x71\xd0\x6c\x06\x0a\x21\x04\xb4\x7b\x72\x60'
b'\xac\x37\xc1\x06\x86\x1d\xc7\x8c\xa5\xa2\x5f\xaa\x9c\xb2\xe3'
b'\x02\x03\x01\x00\x01'
),
masks=[
enums.CryptographicUsageMask.SIGN,
enums.CryptographicUsageMask.VERIFY
]
)
e._data_session.add(signing_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
unique_identifier = str(signing_key.unique_identifier)
payload = signature_verify.SignatureVerifyRequestPayload(
unique_identifier=unique_identifier,
cryptographic_parameters=attributes.CryptographicParameters(
padding_method=enums.PaddingMethod.PSS,
digital_signature_algorithm=enums.DigitalSignatureAlgorithm.
SHA1_WITH_RSA_ENCRYPTION
),
data=b'',
signature_data=b''
)
args = (payload,)
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The signing key must be in the Active state to be used for "
"signature verification.",
e._process_signature_verify,
*args
)
def test_signature_verify_non_verification_key(self):
"""
Test that the right error is thrown when a non-verification key
is specified with a SignatureVerify request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
signing_key = pie_objects.PublicKey(
enums.CryptographicAlgorithm.RSA,
1120,
(
b'\x30\x81\x89\x02\x81\x81\x00\xac\x13\xd9\xfd\xae\x7b\x73\x35'
b'\xb6\x9c\xd9\x85\x67\xe9\x64\x7d\x99\xbf\x37\x3a\x9e\x05\xce'
b'\x34\x35\xd6\x64\x65\xf3\x28\xb7\xf7\x33\x4b\x79\x2a\xee\x7e'
b'\xfa\x04\x4e\xbc\x4c\x7a\x30\xb2\x1a\x5d\x7a\x89\xcd\xb3\xa3'
b'\x0d\xfc\xd9\xfe\xe9\x99\x5e\x09\x41\x5e\xdc\x0b\xf9\xe5\xb4'
b'\xc3\xf7\x4f\xf5\x3f\xb4\xd2\x94\x41\xbf\x1b\x7e\xd6\xcb\xdd'
b'\x4a\x47\xf9\x25\x22\x69\xe1\x64\x6f\x6c\x1a\xee\x05\x14\xe9'
b'\x3f\x6c\xb9\xdf\x71\xd0\x6c\x06\x0a\x21\x04\xb4\x7b\x72\x60'
b'\xac\x37\xc1\x06\x86\x1d\xc7\x8c\xa5\xa2\x5f\xaa\x9c\xb2\xe3'
b'\x02\x03\x01\x00\x01'
),
masks=[
enums.CryptographicUsageMask.SIGN
]
)
signing_key.state = enums.State.ACTIVE
e._data_session.add(signing_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
unique_identifier = str(signing_key.unique_identifier)
payload = signature_verify.SignatureVerifyRequestPayload(
unique_identifier=unique_identifier,
cryptographic_parameters=attributes.CryptographicParameters(
padding_method=enums.PaddingMethod.PSS,
digital_signature_algorithm=enums.DigitalSignatureAlgorithm.
SHA1_WITH_RSA_ENCRYPTION
),
data=b'',
signature_data=b''
)
args = (payload,)
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The Verify bit must be set in the signing key's cryptographic "
"usage mask.",
e._process_signature_verify,
*args
)
def test_mac(self):
"""
Test that a MAC request can be processed correctly.