Add SignatureVerify support to the server

This change adds the SignatureVerify operation to the server. Unit
tests covering the additions are included. The Query operation has
been updated to reflect this addition.
This commit is contained in:
Peter Hamilton 2017-08-25 14:28:35 -04:00
parent 32cc84acd3
commit fc7224e20d
2 changed files with 428 additions and 2 deletions

View File

@ -55,6 +55,7 @@ from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import mac
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import sign
from kmip.core.messages.payloads import signature_verify
from kmip.core import misc
@ -990,6 +991,8 @@ class KmipEngine(object):
return self._process_encrypt(payload)
elif operation == enums.Operation.DECRYPT:
return self._process_decrypt(payload)
elif operation == enums.Operation.SIGNATURE_VERIFY:
return self._process_signature_verify(payload)
elif operation == enums.Operation.MAC:
return self._process_mac(payload)
elif operation == enums.Operation.SIGN:
@ -1946,6 +1949,7 @@ class KmipEngine(object):
contents.Operation(enums.Operation.ENCRYPT),
contents.Operation(enums.Operation.DECRYPT),
contents.Operation(enums.Operation.SIGN),
contents.Operation(enums.Operation.SIGNATURE_VERIFY),
contents.Operation(enums.Operation.MAC)
])
@ -2117,6 +2121,74 @@ class KmipEngine(object):
)
return response_payload
@_kmip_version_supported('1.2')
def _process_signature_verify(self, payload):
self._logger.info("Processing operation: Signature Verify")
unique_identifier = self._id_placeholder
if payload.unique_identifier:
unique_identifier = payload.unique_identifier
# The KMIP spec does not indicate that the SignatureVerify operation
# should have it's own operation policy entry. Rather, the
# cryptographic usage mask should be used to determine if the object
# can be used to verify signatures (see below).
managed_object = self._get_object_with_access_controls(
unique_identifier,
enums.Operation.GET
)
parameters = payload.cryptographic_parameters
if parameters is None:
# TODO (peter-hamilton): Pull the cryptographic parameters from
# the attributes associated with the signing key.
raise exceptions.InvalidField(
"The cryptographic parameters must be specified."
)
# TODO (peter-hamilton): Check the usage limitations for the key to
# confirm that it can be used for this operation.
if managed_object._object_type != enums.ObjectType.PUBLIC_KEY:
raise exceptions.PermissionDenied(
"The requested signing key is not a public key. A public key "
"must be specified."
)
if managed_object.state != enums.State.ACTIVE:
raise exceptions.PermissionDenied(
"The signing key must be in the Active state to be used for "
"signature verification."
)
masks = managed_object.cryptographic_usage_masks
if enums.CryptographicUsageMask.VERIFY not in masks:
raise exceptions.PermissionDenied(
"The Verify bit must be set in the signing key's "
"cryptographic usage mask."
)
result = self._cryptography_engine.verify_signature(
signing_key=managed_object.value,
message=payload.data,
signature=payload.signature_data,
padding_method=parameters.padding_method,
signing_algorithm=parameters.cryptographic_algorithm,
hashing_algorithm=parameters.hashing_algorithm,
digital_signature_algorithm=parameters.digital_signature_algorithm
)
if result:
validity = enums.ValidityIndicator.VALID
else:
validity = enums.ValidityIndicator.INVALID
response_payload = signature_verify.SignatureVerifyResponsePayload(
unique_identifier=unique_identifier,
validity_indicator=validity
)
return response_payload
@_kmip_version_supported('1.2')
def _process_mac(self, payload):
self._logger.info("Processing operation: MAC")

View File

@ -56,6 +56,7 @@ from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import mac
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import sign
from kmip.core.messages.payloads import signature_verify
from kmip.pie import objects as pie_objects
from kmip.pie import sqltypes
@ -902,6 +903,7 @@ class TestKmipEngine(testtools.TestCase):
e._process_discover_versions = mock.MagicMock()
e._process_encrypt = mock.MagicMock()
e._process_decrypt = mock.MagicMock()
e._process_signature_verify = mock.MagicMock()
e._process_mac = mock.MagicMock()
e._process_sign = mock.MagicMock()
@ -920,6 +922,7 @@ class TestKmipEngine(testtools.TestCase):
e._process_operation(enums.Operation.ENCRYPT, None)
e._process_operation(enums.Operation.DECRYPT, None)
e._process_operation(enums.Operation.SIGN, None)
e._process_operation(enums.Operation.SIGNATURE_VERIFY, None)
e._process_operation(enums.Operation.MAC, None)
e._process_create.assert_called_with(None)
@ -936,6 +939,7 @@ class TestKmipEngine(testtools.TestCase):
e._process_discover_versions.assert_called_with(None)
e._process_encrypt.assert_called_with(None)
e._process_decrypt.assert_called_with(None)
e._process_signature_verify.assert_called_with(None)
e._process_mac.assert_called_with(None)
def test_unsupported_operation(self):
@ -6520,7 +6524,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertEqual(16, len(result.operations))
self.assertEqual(17, len(result.operations))
self.assertEqual(
enums.Operation.CREATE,
result.operations[0].value
@ -6582,9 +6586,13 @@ class TestKmipEngine(testtools.TestCase):
result.operations[14].value
)
self.assertEqual(
enums.Operation.MAC,
enums.Operation.SIGNATURE_VERIFY,
result.operations[15].value
)
self.assertEqual(
enums.Operation.MAC,
result.operations[16].value
)
self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(
@ -7294,6 +7302,352 @@ class TestKmipEngine(testtools.TestCase):
*args
)
def test_signature_verify(self):
"""
Test that a SignatureVerify request can be processed correctly.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
signing_key = pie_objects.PublicKey(
enums.CryptographicAlgorithm.RSA,
1120,
(
b'\x30\x81\x89\x02\x81\x81\x00\xac\x13\xd9\xfd\xae\x7b\x73\x35'
b'\xb6\x9c\xd9\x85\x67\xe9\x64\x7d\x99\xbf\x37\x3a\x9e\x05\xce'
b'\x34\x35\xd6\x64\x65\xf3\x28\xb7\xf7\x33\x4b\x79\x2a\xee\x7e'
b'\xfa\x04\x4e\xbc\x4c\x7a\x30\xb2\x1a\x5d\x7a\x89\xcd\xb3\xa3'
b'\x0d\xfc\xd9\xfe\xe9\x99\x5e\x09\x41\x5e\xdc\x0b\xf9\xe5\xb4'
b'\xc3\xf7\x4f\xf5\x3f\xb4\xd2\x94\x41\xbf\x1b\x7e\xd6\xcb\xdd'
b'\x4a\x47\xf9\x25\x22\x69\xe1\x64\x6f\x6c\x1a\xee\x05\x14\xe9'
b'\x3f\x6c\xb9\xdf\x71\xd0\x6c\x06\x0a\x21\x04\xb4\x7b\x72\x60'
b'\xac\x37\xc1\x06\x86\x1d\xc7\x8c\xa5\xa2\x5f\xaa\x9c\xb2\xe3'
b'\x02\x03\x01\x00\x01'
),
masks=[
enums.CryptographicUsageMask.SIGN,
enums.CryptographicUsageMask.VERIFY
]
)
signing_key.state = enums.State.ACTIVE
e._data_session.add(signing_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
# Test a valid signature
unique_identifier = str(signing_key.unique_identifier)
payload = signature_verify.SignatureVerifyRequestPayload(
unique_identifier=unique_identifier,
cryptographic_parameters=attributes.CryptographicParameters(
padding_method=enums.PaddingMethod.PSS,
digital_signature_algorithm=enums.DigitalSignatureAlgorithm.
SHA1_WITH_RSA_ENCRYPTION
),
data=(
b'\xe1\xc0\xf9\x8d\x53\xf8\xf8\xb1\x41\x90\x57\xd5\xb9\xb1\x0b'
b'\x07\xfe\xea\xec\x32\xc0\x46\x3a\x4d\x68\x38\x2f\x53\x1b\xa1'
b'\xd6\xcf\xe4\xed\x38\xa2\x69\x4a\x34\xb9\xc8\x05\xad\xf0\x72'
b'\xff\xbc\xeb\xe2\x1d\x8d\x4b\x5c\x0e\x8c\x33\x45\x2d\xd8\xf9'
b'\xc9\xbf\x45\xd1\xe6\x33\x75\x11\x33\x58\x82\x29\xd2\x93\xc6'
b'\x49\x6b\x7c\x98\x3c\x2c\x72\xbd\x21\xd3\x39\x27\x2d\x78\x28'
b'\xb0\xd0\x9d\x01\x0b\xba\xd3\x18\xd9\x98\xf7\x04\x79\x67\x33'
b'\x8a\xce\xfd\x01\xe8\x74\xac\xe5\xf8\x6d\x2a\x60\xf3\xb3\xca'
b'\xe1\x3f\xc5\xc6\x65\x08\xcf\xb7\x23\x78\xfd\xd6\xc8\xde\x24'
b'\x97\x65\x10\x3c\xe8\xfe\x7c\xd3\x3a\xd0\xef\x16\x86\xfe\xb2'
b'\x5e\x6a\x35\xfb\x64\xe0\x96\xa4'
),
signature_data=(
b'\x01\xf6\xe5\xff\x04\x22\x1a\xdc\x6c\x2f\x22\xa7\x61\x05\x3b'
b'\xc4\x73\x27\x65\xdd\xdc\x3f\x76\x56\xd0\xd1\x22\xad\x3b\x8a'
b'\x4e\x4f\x8f\xe5\x5b\xd0\xc0\x9e\xb1\x07\x80\xa1\x39\xcd\xa9'
b'\x32\x34\xef\x98\x8f\xe2\x50\x20\x1e\xb2\xfe\xbd\x08\xb6\xee'
b'\x85\xd7\x0d\x16\x05\xa5\xba\x56\x85\x21\x52\x99\xf0\x74\xc8'
b'\x0b\xaf\xf8\x1e\x2c\xa3\x10\x7d\xa9\x17\x5c\x2f\x5a\x7c\x6b'
b'\x60\xea\xa2\x8a\x75\x8c\xa9\x34\xf2\xff\x16\x98\x8f\xe8\x5f'
b'\xf8\x41\x57\xd9\x51\x44\x8a\x85\xec\x1e\xd1\x71\xf9\xef\x8b'
b'\xb8\xa1\x0c\xfa\x14\x7b\x7e\xf8'
)
)
response_payload = e._process_signature_verify(payload)
e._logger.info.assert_any_call(
"Processing operation: Signature Verify"
)
self.assertEqual(
unique_identifier,
response_payload.unique_identifier
)
self.assertEqual(
enums.ValidityIndicator.VALID,
response_payload.validity_indicator
)
# Test an invalid signature
payload = signature_verify.SignatureVerifyRequestPayload(
unique_identifier=unique_identifier,
cryptographic_parameters=attributes.CryptographicParameters(
padding_method=enums.PaddingMethod.PSS,
digital_signature_algorithm=enums.DigitalSignatureAlgorithm.
SHA1_WITH_RSA_ENCRYPTION
),
data=(
b'\xe1\xc0\xf9\x8d\x53\xf8\xf8\xb1\x41\x90\x57\xd5\xb9\xb1\x0b'
b'\x07\xfe\xea\xec\x32\xc0\x46\x3a\x4d\x68\x38\x2f\x53\x1b\xa1'
b'\xd6\xcf\xe4\xed\x38\xa2\x69\x4a\x34\xb9\xc8\x05\xad\xf0\x72'
b'\xff\xbc\xeb\xe2\x1d\x8d\x4b\x5c\x0e\x8c\x33\x45\x2d\xd8\xf9'
b'\xc9\xbf\x45\xd1\xe6\x33\x75\x11\x33\x58\x82\x29\xd2\x93\xc6'
b'\x49\x6b\x7c\x98\x3c\x2c\x72\xbd\x21\xd3\x39\x27\x2d\x78\x28'
b'\xb0\xd0\x9d\x01\x0b\xba\xd3\x18\xd9\x98\xf7\x04\x79\x67\x33'
b'\x8a\xce\xfd\x01\xe8\x74\xac\xe5\xf8\x6d\x2a\x60\xf3\xb3\xca'
b'\xe1\x3f\xc5\xc6\x65\x08\xcf\xb7\x23\x78\xfd\xd6\xc8\xde\x24'
b'\x97\x65\x10\x3c\xe8\xfe\x7c\xd3\x3a\xd0\xef\x16\x86\xfe\xb2'
b'\x5e\x6a\x35\xfb\x64\xe0\x96\xa4'
),
signature_data=(
b'\x01\xf6\xe5\xff\x04\x22\x1a\xdc\x6c\x2f\x22\xa7\x61\x05\x3b'
b'\xc4\x73\x27\x65\xdd\xdc\x3f\x76\x56\xd0\xd1\x22\xad\x3b\x8a'
b'\x4e\x4f\x8f\xe5\x5b\xd0\xc0\x9e\xb1\x07\x80\xa1\x39\xcd\xa9'
b'\x32\x34\xef\x98\x8f\xe2\x50\x20\x1e\xb2\xfe\xbd\x08\xb6\xee'
b'\x85\xd7\x0d\x16\x05\xa5\xba\x56\x85\x21\x52\x99\xf0\x74\xc8'
b'\x0b\xaf\xf8\x1e\x2c\xa3\x10\x7d\xa9\x17\x5c\x2f\x5a\x7c\x6b'
b'\x60\xea\xa2\x8a\x75\x8c\xa9\x34\xf2\xff\x16\x98\x8f\xe8\x5f'
b'\xf8\x41\x57\xd9\x51\x44\x8a\x85\xec\x1e\xd1\x71\xf9\xef\x8b'
b'\xb8\xa1\x0c\xfa\x14\x7b\x7e\x00'
)
)
response_payload = e._process_signature_verify(payload)
self.assertEqual(
unique_identifier,
response_payload.unique_identifier
)
self.assertEqual(
enums.ValidityIndicator.INVALID,
response_payload.validity_indicator
)
def test_signature_verify_no_cryptographic_parameters(self):
"""
Test that the right error is thrown when cryptographic parameters
are not provided with a SignatureVerify request.
Note: once the cryptographic parameters can be obtained from the
encryption key's attributes, this test should be updated to
reflect that.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
signing_key = pie_objects.PublicKey(
enums.CryptographicAlgorithm.RSA,
1120,
(
b'\x30\x81\x89\x02\x81\x81\x00\xac\x13\xd9\xfd\xae\x7b\x73\x35'
b'\xb6\x9c\xd9\x85\x67\xe9\x64\x7d\x99\xbf\x37\x3a\x9e\x05\xce'
b'\x34\x35\xd6\x64\x65\xf3\x28\xb7\xf7\x33\x4b\x79\x2a\xee\x7e'
b'\xfa\x04\x4e\xbc\x4c\x7a\x30\xb2\x1a\x5d\x7a\x89\xcd\xb3\xa3'
b'\x0d\xfc\xd9\xfe\xe9\x99\x5e\x09\x41\x5e\xdc\x0b\xf9\xe5\xb4'
b'\xc3\xf7\x4f\xf5\x3f\xb4\xd2\x94\x41\xbf\x1b\x7e\xd6\xcb\xdd'
b'\x4a\x47\xf9\x25\x22\x69\xe1\x64\x6f\x6c\x1a\xee\x05\x14\xe9'
b'\x3f\x6c\xb9\xdf\x71\xd0\x6c\x06\x0a\x21\x04\xb4\x7b\x72\x60'
b'\xac\x37\xc1\x06\x86\x1d\xc7\x8c\xa5\xa2\x5f\xaa\x9c\xb2\xe3'
b'\x02\x03\x01\x00\x01'
),
masks=[
enums.CryptographicUsageMask.SIGN,
enums.CryptographicUsageMask.VERIFY
]
)
signing_key.state = enums.State.ACTIVE
e._data_session.add(signing_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
unique_identifier = str(signing_key.unique_identifier)
payload = signature_verify.SignatureVerifyRequestPayload(
unique_identifier=unique_identifier,
data=b'',
signature_data=b''
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.InvalidField,
"The cryptographic parameters must be specified.",
e._process_signature_verify,
*args
)
def test_signature_verify_invalid_signing_key(self):
"""
Test that the right error is thrown when an invalid signing key
is specified with a SignatureVerify request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
signing_key = pie_objects.OpaqueObject(
b'\x01\x02\x03\x04',
enums.OpaqueDataType.NONE
)
e._data_session.add(signing_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
unique_identifier = str(signing_key.unique_identifier)
payload = signature_verify.SignatureVerifyRequestPayload(
unique_identifier=unique_identifier,
cryptographic_parameters=attributes.CryptographicParameters(
padding_method=enums.PaddingMethod.PSS,
digital_signature_algorithm=enums.DigitalSignatureAlgorithm.
SHA1_WITH_RSA_ENCRYPTION
),
data=b'',
signature_data=b''
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The requested signing key is not a public key. A public key must "
"be specified.",
e._process_signature_verify,
*args
)
def test_signature_verify_inactive_signing_key(self):
"""
Test that the right error is thrown when an inactive signing key
is specified with a SignatureVerify request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
signing_key = pie_objects.PublicKey(
enums.CryptographicAlgorithm.RSA,
1120,
(
b'\x30\x81\x89\x02\x81\x81\x00\xac\x13\xd9\xfd\xae\x7b\x73\x35'
b'\xb6\x9c\xd9\x85\x67\xe9\x64\x7d\x99\xbf\x37\x3a\x9e\x05\xce'
b'\x34\x35\xd6\x64\x65\xf3\x28\xb7\xf7\x33\x4b\x79\x2a\xee\x7e'
b'\xfa\x04\x4e\xbc\x4c\x7a\x30\xb2\x1a\x5d\x7a\x89\xcd\xb3\xa3'
b'\x0d\xfc\xd9\xfe\xe9\x99\x5e\x09\x41\x5e\xdc\x0b\xf9\xe5\xb4'
b'\xc3\xf7\x4f\xf5\x3f\xb4\xd2\x94\x41\xbf\x1b\x7e\xd6\xcb\xdd'
b'\x4a\x47\xf9\x25\x22\x69\xe1\x64\x6f\x6c\x1a\xee\x05\x14\xe9'
b'\x3f\x6c\xb9\xdf\x71\xd0\x6c\x06\x0a\x21\x04\xb4\x7b\x72\x60'
b'\xac\x37\xc1\x06\x86\x1d\xc7\x8c\xa5\xa2\x5f\xaa\x9c\xb2\xe3'
b'\x02\x03\x01\x00\x01'
),
masks=[
enums.CryptographicUsageMask.SIGN,
enums.CryptographicUsageMask.VERIFY
]
)
e._data_session.add(signing_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
unique_identifier = str(signing_key.unique_identifier)
payload = signature_verify.SignatureVerifyRequestPayload(
unique_identifier=unique_identifier,
cryptographic_parameters=attributes.CryptographicParameters(
padding_method=enums.PaddingMethod.PSS,
digital_signature_algorithm=enums.DigitalSignatureAlgorithm.
SHA1_WITH_RSA_ENCRYPTION
),
data=b'',
signature_data=b''
)
args = (payload,)
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The signing key must be in the Active state to be used for "
"signature verification.",
e._process_signature_verify,
*args
)
def test_signature_verify_non_verification_key(self):
"""
Test that the right error is thrown when a non-verification key
is specified with a SignatureVerify request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
signing_key = pie_objects.PublicKey(
enums.CryptographicAlgorithm.RSA,
1120,
(
b'\x30\x81\x89\x02\x81\x81\x00\xac\x13\xd9\xfd\xae\x7b\x73\x35'
b'\xb6\x9c\xd9\x85\x67\xe9\x64\x7d\x99\xbf\x37\x3a\x9e\x05\xce'
b'\x34\x35\xd6\x64\x65\xf3\x28\xb7\xf7\x33\x4b\x79\x2a\xee\x7e'
b'\xfa\x04\x4e\xbc\x4c\x7a\x30\xb2\x1a\x5d\x7a\x89\xcd\xb3\xa3'
b'\x0d\xfc\xd9\xfe\xe9\x99\x5e\x09\x41\x5e\xdc\x0b\xf9\xe5\xb4'
b'\xc3\xf7\x4f\xf5\x3f\xb4\xd2\x94\x41\xbf\x1b\x7e\xd6\xcb\xdd'
b'\x4a\x47\xf9\x25\x22\x69\xe1\x64\x6f\x6c\x1a\xee\x05\x14\xe9'
b'\x3f\x6c\xb9\xdf\x71\xd0\x6c\x06\x0a\x21\x04\xb4\x7b\x72\x60'
b'\xac\x37\xc1\x06\x86\x1d\xc7\x8c\xa5\xa2\x5f\xaa\x9c\xb2\xe3'
b'\x02\x03\x01\x00\x01'
),
masks=[
enums.CryptographicUsageMask.SIGN
]
)
signing_key.state = enums.State.ACTIVE
e._data_session.add(signing_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
unique_identifier = str(signing_key.unique_identifier)
payload = signature_verify.SignatureVerifyRequestPayload(
unique_identifier=unique_identifier,
cryptographic_parameters=attributes.CryptographicParameters(
padding_method=enums.PaddingMethod.PSS,
digital_signature_algorithm=enums.DigitalSignatureAlgorithm.
SHA1_WITH_RSA_ENCRYPTION
),
data=b'',
signature_data=b''
)
args = (payload,)
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The Verify bit must be set in the signing key's cryptographic "
"usage mask.",
e._process_signature_verify,
*args
)
def test_mac(self):
"""
Test that a MAC request can be processed correctly.