Add MAC operation support for server

This commit is contained in:
Hao Shen 2017-02-01 15:30:05 -08:00
parent 84fe76e15d
commit ac1206082c
2 changed files with 210 additions and 0 deletions

View File

@ -30,6 +30,8 @@ from kmip.core import attributes
from kmip.core import enums from kmip.core import enums
from kmip.core import exceptions from kmip.core import exceptions
from kmip.core.objects import MACData
from kmip.core.factories import attributes as attribute_factory from kmip.core.factories import attributes as attribute_factory
from kmip.core.factories import secrets from kmip.core.factories import secrets
@ -46,6 +48,7 @@ from kmip.core.messages.payloads import get_attributes
from kmip.core.messages.payloads import get_attribute_list from kmip.core.messages.payloads import get_attribute_list
from kmip.core.messages.payloads import query from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import register from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import mac
from kmip.core import misc from kmip.core import misc
@ -949,6 +952,8 @@ class KmipEngine(object):
return self._process_query(payload) return self._process_query(payload)
elif operation == enums.Operation.DISCOVER_VERSIONS: elif operation == enums.Operation.DISCOVER_VERSIONS:
return self._process_discover_versions(payload) return self._process_discover_versions(payload)
elif operation == enums.Operation.MAC:
return self._process_mac(payload)
else: else:
raise exceptions.OperationNotSupported( raise exceptions.OperationNotSupported(
"{0} operation is not supported by the server.".format( "{0} operation is not supported by the server.".format(
@ -1556,3 +1561,64 @@ class KmipEngine(object):
) )
return response_payload return response_payload
@_kmip_version_supported('1.2')
def _process_mac(self, payload):
self._logger.info("Processing operation: MAC")
unique_identifier = self._id_placeholder
if payload.unique_identifier:
unique_identifier = payload.unique_identifier.value
# TODO: Currently use the GET operation policy here to ensure only a
# user with read access to the secret can use it to compute a MAC
# value. However, the MAC operation's access should be controlled by
# cryptographic usage mask instead of operation policy.
managed_object = self._get_object_with_access_controls(
unique_identifier,
enums.Operation.GET
)
algorithm = None
if (payload.cryptographic_parameters and
payload.cryptographic_parameters.cryptographic_algorithm):
algorithm = \
payload.cryptographic_parameters.cryptographic_algorithm.value
elif (isinstance(managed_object, objects.Key) and
managed_object.cryptographic_algorithm):
algorithm = managed_object.cryptographic_algorithm
else:
raise exceptions.InvalidField(
"The cryptographic algorithm must be specified "
"for the MAC operation"
)
key = None
if managed_object.value:
key = managed_object.value
else:
raise exceptions.InvalidField(
"A secret key value must be specified "
"for the MAC operation"
)
data = None
if payload.data:
data = payload.data.value
else:
raise exceptions.InvalidField(
"No data to be MACed"
)
result = self._cryptography_engine.mac(
algorithm,
key,
data
)
response_payload = mac.MACResponsePayload(
unique_identifier=attributes.UniqueIdentifier(unique_identifier),
mac_data=MACData(result)
)
return response_payload

View File

@ -48,6 +48,7 @@ from kmip.core.messages.payloads import get_attribute_list
from kmip.core.messages.payloads import get_attributes from kmip.core.messages.payloads import get_attributes
from kmip.core.messages.payloads import query from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import register from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import mac
from kmip.pie import objects as pie_objects from kmip.pie import objects as pie_objects
from kmip.pie import sqltypes from kmip.pie import sqltypes
@ -4537,6 +4538,149 @@ class TestKmipEngine(testtools.TestCase):
) )
self.assertEqual([], result.protocol_versions) self.assertEqual([], result.protocol_versions)
def test_mac(self):
"""
Test that a MAC request can be processed correctly.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
key = (b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x00')
data = (b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A'
b'\x0B\x0C\x0D\x0E\x0F')
algorithm_a = enums.CryptographicAlgorithm.AES
algorithm_b = enums.CryptographicAlgorithm.HMAC_SHA512
obj = pie_objects.SymmetricKey(algorithm_a, 128, key)
e._data_session.add(obj)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
uuid = str(obj.unique_identifier)
cryptographic_parameters = attributes.CryptographicParameters(
cryptographic_algorithm=attributes.
CryptographicAlgorithm(algorithm_b)
)
# Verify when cryptographic_parameters is specified in request
payload = mac.MACRequestPayload(
unique_identifier=attributes.UniqueIdentifier(uuid),
cryptographic_parameters=cryptographic_parameters,
data=objects.Data(data)
)
response_payload = e._process_mac(payload)
e._logger.info.assert_any_call(
"Processing operation: MAC"
)
e._cryptography_engine.logger.info.assert_any_call(
"Generating a hash-based message authentication code using {0}".
format(algorithm_b.name)
)
e._cryptography_engine.logger.reset_mock()
self.assertEqual(str(uuid), response_payload.unique_identifier.value)
self.assertIsInstance(response_payload.mac_data, objects.MACData)
# Verify when cryptographic_parameters is not specified in request
payload = mac.MACRequestPayload(
unique_identifier=attributes.UniqueIdentifier(uuid),
cryptographic_parameters=None,
data=objects.Data(data)
)
response_payload = e._process_mac(payload)
e._cryptography_engine.logger.info.assert_any_call(
"Generating a cipher-based message authentication code using {0}".
format(algorithm_a.name)
)
self.assertEqual(str(uuid), response_payload.unique_identifier.value)
self.assertIsInstance(response_payload.mac_data, objects.MACData)
def test_mac_with_missing_fields(self):
"""
Test that the right errors are generated when required fields
are missing.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
key = (b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00')
data = (b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B'
b'\x0C\x0D\x0E\x0F')
algorithm = enums.CryptographicAlgorithm.AES
obj_no_key = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
obj_no_algorithm = pie_objects.OpaqueObject(
key, enums.OpaqueDataType.NONE)
e._data_session.add(obj_no_key)
e._data_session.add(obj_no_algorithm)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
uuid_no_key = str(obj_no_key.unique_identifier)
uuid_no_algorithm = str(obj_no_algorithm.unique_identifier)
cryptographic_parameters = attributes.CryptographicParameters(
cryptographic_algorithm=attributes.
CryptographicAlgorithm(algorithm))
payload_no_key = mac.MACRequestPayload(
unique_identifier=attributes.UniqueIdentifier(uuid_no_key),
cryptographic_parameters=cryptographic_parameters,
data=objects.Data(data)
)
args = (payload_no_key, )
regex = "A secret key value must be specified"
self.assertRaisesRegexp(
exceptions.InvalidField,
regex,
e._process_mac,
*args
)
payload_no_algorithm = mac.MACRequestPayload(
unique_identifier=attributes.UniqueIdentifier(uuid_no_algorithm),
cryptographic_parameters=None,
data=objects.Data(data)
)
args = (payload_no_algorithm, )
regex = "The cryptographic algorithm must be specified"
self.assertRaisesRegexp(
exceptions.InvalidField,
regex,
e._process_mac,
*args
)
payload_no_data = mac.MACRequestPayload(
unique_identifier=attributes.UniqueIdentifier(uuid_no_algorithm),
cryptographic_parameters=cryptographic_parameters,
data=None
)
args = (payload_no_data, )
regex = "No data to be MACed"
self.assertRaisesRegexp(
exceptions.InvalidField,
regex,
e._process_mac,
*args
)
def test_create_get_destroy(self): def test_create_get_destroy(self):
""" """
Test that a managed object can be created, retrieved, and destroyed Test that a managed object can be created, retrieved, and destroyed