This change adds Sign operation support to the server.

This commit is contained in:
Dane Fichter 2017-08-28 08:27:53 -04:00
parent 10121b1d63
commit d48b590c35
2 changed files with 389 additions and 2 deletions

View File

@ -54,6 +54,7 @@ from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import mac
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import sign
from kmip.core import misc
@ -991,6 +992,8 @@ class KmipEngine(object):
return self._process_decrypt(payload)
elif operation == enums.Operation.MAC:
return self._process_mac(payload)
elif operation == enums.Operation.SIGN:
return self._process_sign(payload)
else:
raise exceptions.OperationNotSupported(
"{0} operation is not supported by the server.".format(
@ -1942,6 +1945,7 @@ class KmipEngine(object):
operations.extend([
contents.Operation(enums.Operation.ENCRYPT),
contents.Operation(enums.Operation.DECRYPT),
contents.Operation(enums.Operation.SIGN),
contents.Operation(enums.Operation.MAC)
])
@ -2185,3 +2189,59 @@ class KmipEngine(object):
)
return response_payload
@_kmip_version_supported('1.2')
def _process_sign(self, payload):
self._logger.info("Processing operation: Sign")
unique_identifier = self._id_placeholder
if payload.unique_identifier:
unique_identifier = payload.unique_identifier
managed_object = self._get_object_with_access_controls(
unique_identifier,
enums.Operation.GET
)
parameters = payload.cryptographic_parameters
if parameters is None:
# TODO (dane-fichter): Pull the cryptographic parameters from
# the managed object with lowest attribute index.
raise exceptions.InvalidField(
"The cryptographic parameters must be specified."
)
if managed_object._object_type != enums.ObjectType.PRIVATE_KEY:
raise exceptions.PermissionDenied(
"The requested signing key is not a private key. A private "
"key must be specified."
)
if managed_object.state != enums.State.ACTIVE:
raise exceptions.PermissionDenied(
"The signing key must be in the Active state to be used for "
"signing."
)
masks = managed_object.cryptographic_usage_masks
if enums.CryptographicUsageMask.SIGN not in masks:
raise exceptions.PermissionDenied(
"The Sign bit must be set in the signing key's "
"cryptographic usage mask."
)
result = self._cryptography_engine.sign(
digital_signature_algorithm=parameters.digital_signature_algorithm,
crypto_alg=parameters.cryptographic_algorithm,
hash_algorithm=parameters.hashing_algorithm,
padding=parameters.padding_method,
signing_key=managed_object.value,
data=payload.data
)
response_payload = sign.SignResponsePayload(
unique_identifier=unique_identifier,
signature_data=result
)
return response_payload

View File

@ -55,6 +55,7 @@ from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import mac
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import sign
from kmip.pie import objects as pie_objects
from kmip.pie import sqltypes
@ -902,6 +903,7 @@ class TestKmipEngine(testtools.TestCase):
e._process_encrypt = mock.MagicMock()
e._process_decrypt = mock.MagicMock()
e._process_mac = mock.MagicMock()
e._process_sign = mock.MagicMock()
e._process_operation(enums.Operation.CREATE, None)
e._process_operation(enums.Operation.CREATE_KEY_PAIR, None)
@ -917,6 +919,7 @@ class TestKmipEngine(testtools.TestCase):
e._process_operation(enums.Operation.DISCOVER_VERSIONS, None)
e._process_operation(enums.Operation.ENCRYPT, None)
e._process_operation(enums.Operation.DECRYPT, None)
e._process_operation(enums.Operation.SIGN, None)
e._process_operation(enums.Operation.MAC, None)
e._process_create.assert_called_with(None)
@ -6517,7 +6520,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertEqual(15, len(result.operations))
self.assertEqual(16, len(result.operations))
self.assertEqual(
enums.Operation.CREATE,
result.operations[0].value
@ -6575,9 +6578,13 @@ class TestKmipEngine(testtools.TestCase):
result.operations[13].value
)
self.assertEqual(
enums.Operation.MAC,
enums.Operation.SIGN,
result.operations[14].value
)
self.assertEqual(
enums.Operation.MAC,
result.operations[15].value
)
self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(
@ -8212,3 +8219,323 @@ class TestKmipEngine(testtools.TestCase):
e._data_session.commit()
e._data_store_session_factory()
def test_sign(self):
"""
Test that a Sign request can be processed correctly.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
signing_key = pie_objects.PrivateKey(
enums.CryptographicAlgorithm.RSA,
1024,
(b'3082025e02010002818100aebac1b9a174315d27cc3c201e215789'
b'4372d6450d4cf80ce0ebcf5169519b9e8550036f4abe0fe4f94fbf'
b'9cca606f39743365499611ba3f25a9a47158ba05214b655f4258a4'
b'c29516becaa583f2d26650696ad6fc03d5b47d3aba9c5479fdb047'
b'7d29513399cb19283ccdc28dbb23b7c7eee4b35dc940daca0055dc'
b'd28f503b02030100010281810092890942d6c68d47a4c2c181e602'
b'ec58af7a357c7fa5173a25bf5d84d7209bb41bf5788bf350e61f8f'
b'7e7421d80f7bf7e11de14a0f531ab12eb2d0b84642eb5d181170c2'
b'c58aabbd6754842fafee57fef2f545d09fdc664902e55baced5a3c'
b'6d26f3465859d33a33a555537daf2263aaef28354c8b53513145a7'
b'e228824dabb1024100d3aa237e8942b93d56a681254c27be1f4a49'
b'6ca4a87fc0604b0cff8f980e742d2bbb91b88a247b6ebbed01458c'
b'4afdb68c0f8c6d4a37e028c5fcb3a6a39ca64f024100d354168c61'
b'9c836e8597fef50193a6f42607952a1c87ebae91db5043b8855072'
b'b4e92af5dcedb2148773dfbd217bafc8dc9da8ae8e757e7248c1e5'
b'13a144685502410090fda214c2b7b726825dca679f3436333ef2ee'
b'fe180272e84360e30b1d11019a13b4080d0e6c1135787bd07c30af'
b'09feeb10979421dc06ac477b6420c940bc570240164de8b7565213'
b'9925a67e3553be46bfbc07ced98bfb5887ab434f7c664c43ca6787'
b'b88e0c8c55e04ecf8f0cc22cf0c7ad69427571f9baa7cb4013b277'
b'b1e5a5024100cae150f5fa559b2e2c39444e0f5c651034092ac97b'
b'ac10d528dd15dfda254cb06bef41e39881f7e7496910b4655659dc'
b'842d30b9ae2759f3c2cd41c79a3684ec'),
enums.KeyFormatType.RAW,
masks=[enums.CryptographicUsageMask.SIGN],
)
signing_key.state = enums.State.ACTIVE
e._data_session.add(signing_key)
e._data_session.commit()
e.data_session = e._data_store_session_factory()
unique_identifier = str(signing_key.unique_identifier)
payload = sign.SignRequestPayload(
unique_identifier=unique_identifier,
cryptographic_parameters=attributes.CryptographicParameters(
padding_method=enums.PaddingMethod.PSS,
digital_signature_algorithm=enums.DigitalSignatureAlgorithm.
SHA1_WITH_RSA_ENCRYPTION
),
data=(
b'\x01\x02\x03\x04\x05\x06\x07\x08'
b'\x09\x10\x11\x12\x13\x14\x15\x16'
)
)
response_payload = e._process_sign(payload)
e._logger.info.assert_any_call(
"Processing operation: Sign"
)
self.assertEqual(
unique_identifier,
response_payload.unique_identifier
)
def test_sign_no_cryptographic_parameters(self):
"""
Test that the right error is thrown when cryptographic parameters
are not provided with a SignatureVerify request.
TODO (dane-fichter): update this test once cryptographic
parameters can be fetched using the key's attributes.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
signing_key = pie_objects.PrivateKey(
enums.CryptographicAlgorithm.RSA,
1024,
(b'3082025e02010002818100aebac1b9a174315d27cc3c201e215789'
b'4372d6450d4cf80ce0ebcf5169519b9e8550036f4abe0fe4f94fbf'
b'9cca606f39743365499611ba3f25a9a47158ba05214b655f4258a4'
b'c29516becaa583f2d26650696ad6fc03d5b47d3aba9c5479fdb047'
b'7d29513399cb19283ccdc28dbb23b7c7eee4b35dc940daca0055dc'
b'd28f503b02030100010281810092890942d6c68d47a4c2c181e602'
b'ec58af7a357c7fa5173a25bf5d84d7209bb41bf5788bf350e61f8f'
b'7e7421d80f7bf7e11de14a0f531ab12eb2d0b84642eb5d181170c2'
b'c58aabbd6754842fafee57fef2f545d09fdc664902e55baced5a3c'
b'6d26f3465859d33a33a555537daf2263aaef28354c8b53513145a7'
b'e228824dabb1024100d3aa237e8942b93d56a681254c27be1f4a49'
b'6ca4a87fc0604b0cff8f980e742d2bbb91b88a247b6ebbed01458c'
b'4afdb68c0f8c6d4a37e028c5fcb3a6a39ca64f024100d354168c61'
b'9c836e8597fef50193a6f42607952a1c87ebae91db5043b8855072'
b'b4e92af5dcedb2148773dfbd217bafc8dc9da8ae8e757e7248c1e5'
b'13a144685502410090fda214c2b7b726825dca679f3436333ef2ee'
b'fe180272e84360e30b1d11019a13b4080d0e6c1135787bd07c30af'
b'09feeb10979421dc06ac477b6420c940bc570240164de8b7565213'
b'9925a67e3553be46bfbc07ced98bfb5887ab434f7c664c43ca6787'
b'b88e0c8c55e04ecf8f0cc22cf0c7ad69427571f9baa7cb4013b277'
b'b1e5a5024100cae150f5fa559b2e2c39444e0f5c651034092ac97b'
b'ac10d528dd15dfda254cb06bef41e39881f7e7496910b4655659dc'
b'842d30b9ae2759f3c2cd41c79a3684ec'),
enums.KeyFormatType.RAW,
masks=[enums.CryptographicUsageMask.SIGN],
)
signing_key.state = enums.State.ACTIVE
e._data_session.add(signing_key)
e._data_session.commit()
e.data_session = e._data_store_session_factory()
unique_identifier = str(signing_key.unique_identifier)
payload = sign.SignRequestPayload(
unique_identifier=unique_identifier,
data=b'',
)
args = (payload,)
self.assertRaisesRegexp(
exceptions.InvalidField,
"The cryptographic parameters must be specified.",
e._process_sign,
*args
)
def test_sign_invalid_signing_key(self):
"""
Test that the right error is thrown when an invalid signing key
is specified with a Sign request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
signing_key = pie_objects.OpaqueObject(
b'\x01\x02\x03\x04',
enums.OpaqueDataType.NONE
)
e._data_session.add(signing_key)
e._data_session.commit()
e.data_session = e._data_store_session_factory()
unique_identifier = str(signing_key.unique_identifier)
payload = sign.SignRequestPayload(
unique_identifier=unique_identifier,
cryptographic_parameters=attributes.CryptographicParameters(
padding_method=enums.PaddingMethod.PSS,
digital_signature_algorithm=enums.DigitalSignatureAlgorithm.
SHA1_WITH_RSA_ENCRYPTION
),
data=(
b'\x01\x02\x03\x04\x05\x06\x07\x08'
b'\x09\x10\x11\x12\x13\x14\x15\x16'
)
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The requested signing key is not a private key. "
"A private key must be specified.",
e._process_sign,
*args
)
def test_sign_inactive_signing_key(self):
"""
Test that the right error is thrown when an inactive signing key
is specified in a Sign request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
signing_key = pie_objects.PrivateKey(
enums.CryptographicAlgorithm.RSA,
1024,
(b'3082025e02010002818100aebac1b9a174315d27cc3c201e215789'
b'4372d6450d4cf80ce0ebcf5169519b9e8550036f4abe0fe4f94fbf'
b'9cca606f39743365499611ba3f25a9a47158ba05214b655f4258a4'
b'c29516becaa583f2d26650696ad6fc03d5b47d3aba9c5479fdb047'
b'7d29513399cb19283ccdc28dbb23b7c7eee4b35dc940daca0055dc'
b'd28f503b02030100010281810092890942d6c68d47a4c2c181e602'
b'ec58af7a357c7fa5173a25bf5d84d7209bb41bf5788bf350e61f8f'
b'7e7421d80f7bf7e11de14a0f531ab12eb2d0b84642eb5d181170c2'
b'c58aabbd6754842fafee57fef2f545d09fdc664902e55baced5a3c'
b'6d26f3465859d33a33a555537daf2263aaef28354c8b53513145a7'
b'e228824dabb1024100d3aa237e8942b93d56a681254c27be1f4a49'
b'6ca4a87fc0604b0cff8f980e742d2bbb91b88a247b6ebbed01458c'
b'4afdb68c0f8c6d4a37e028c5fcb3a6a39ca64f024100d354168c61'
b'9c836e8597fef50193a6f42607952a1c87ebae91db5043b8855072'
b'b4e92af5dcedb2148773dfbd217bafc8dc9da8ae8e757e7248c1e5'
b'13a144685502410090fda214c2b7b726825dca679f3436333ef2ee'
b'fe180272e84360e30b1d11019a13b4080d0e6c1135787bd07c30af'
b'09feeb10979421dc06ac477b6420c940bc570240164de8b7565213'
b'9925a67e3553be46bfbc07ced98bfb5887ab434f7c664c43ca6787'
b'b88e0c8c55e04ecf8f0cc22cf0c7ad69427571f9baa7cb4013b277'
b'b1e5a5024100cae150f5fa559b2e2c39444e0f5c651034092ac97b'
b'ac10d528dd15dfda254cb06bef41e39881f7e7496910b4655659dc'
b'842d30b9ae2759f3c2cd41c79a3684ec'),
enums.KeyFormatType.RAW,
masks=[enums.CryptographicUsageMask.SIGN],
)
e._data_session.add(signing_key)
e._data_session.commit()
e.data_session = e._data_store_session_factory()
unique_identifier = str(signing_key.unique_identifier)
payload = sign.SignRequestPayload(
unique_identifier=unique_identifier,
cryptographic_parameters=attributes.CryptographicParameters(
padding_method=enums.PaddingMethod.PSS,
digital_signature_algorithm=enums.DigitalSignatureAlgorithm.
SHA1_WITH_RSA_ENCRYPTION
),
data=(
b'\x01\x02\x03\x04\x05\x06\x07\x08'
b'\x09\x10\x11\x12\x13\x14\x15\x16'
)
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The signing key must be in the Active state to be "
"used for signing.",
e._process_sign,
*args
)
def test_sign_non_signing_key(self):
"""
Test that the right error is thrown when a non-signing key
is specified in a Sign request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
signing_key = pie_objects.PrivateKey(
enums.CryptographicAlgorithm.RSA,
1024,
(b'3082025e02010002818100aebac1b9a174315d27cc3c201e215789'
b'4372d6450d4cf80ce0ebcf5169519b9e8550036f4abe0fe4f94fbf'
b'9cca606f39743365499611ba3f25a9a47158ba05214b655f4258a4'
b'c29516becaa583f2d26650696ad6fc03d5b47d3aba9c5479fdb047'
b'7d29513399cb19283ccdc28dbb23b7c7eee4b35dc940daca0055dc'
b'd28f503b02030100010281810092890942d6c68d47a4c2c181e602'
b'ec58af7a357c7fa5173a25bf5d84d7209bb41bf5788bf350e61f8f'
b'7e7421d80f7bf7e11de14a0f531ab12eb2d0b84642eb5d181170c2'
b'c58aabbd6754842fafee57fef2f545d09fdc664902e55baced5a3c'
b'6d26f3465859d33a33a555537daf2263aaef28354c8b53513145a7'
b'e228824dabb1024100d3aa237e8942b93d56a681254c27be1f4a49'
b'6ca4a87fc0604b0cff8f980e742d2bbb91b88a247b6ebbed01458c'
b'4afdb68c0f8c6d4a37e028c5fcb3a6a39ca64f024100d354168c61'
b'9c836e8597fef50193a6f42607952a1c87ebae91db5043b8855072'
b'b4e92af5dcedb2148773dfbd217bafc8dc9da8ae8e757e7248c1e5'
b'13a144685502410090fda214c2b7b726825dca679f3436333ef2ee'
b'fe180272e84360e30b1d11019a13b4080d0e6c1135787bd07c30af'
b'09feeb10979421dc06ac477b6420c940bc570240164de8b7565213'
b'9925a67e3553be46bfbc07ced98bfb5887ab434f7c664c43ca6787'
b'b88e0c8c55e04ecf8f0cc22cf0c7ad69427571f9baa7cb4013b277'
b'b1e5a5024100cae150f5fa559b2e2c39444e0f5c651034092ac97b'
b'ac10d528dd15dfda254cb06bef41e39881f7e7496910b4655659dc'
b'842d30b9ae2759f3c2cd41c79a3684ec'),
enums.KeyFormatType.RAW,
masks=[enums.CryptographicUsageMask.VERIFY],
)
signing_key.state = enums.State.ACTIVE
e._data_session.add(signing_key)
e._data_session.commit()
e.data_session = e._data_store_session_factory()
unique_identifier = str(signing_key.unique_identifier)
payload = sign.SignRequestPayload(
unique_identifier=unique_identifier,
cryptographic_parameters=attributes.CryptographicParameters(
padding_method=enums.PaddingMethod.PSS,
digital_signature_algorithm=enums.DigitalSignatureAlgorithm.
SHA1_WITH_RSA_ENCRYPTION
),
data=(
b'\x01\x02\x03\x04\x05\x06\x07\x08'
b'\x09\x10\x11\x12\x13\x14\x15\x16'
)
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.PermissionDenied,
"The Sign bit must be set in the signing key's cryptographic "
"usage mask.",
e._process_sign,
*args
)