Refactor access control enforcement in the server

This change restructures how access controls are enforced for
different server operations, pulling common code into a shared
method for reuse. The server unit tests have been updated to
reflect this change.
This commit is contained in:
Peter Hamilton 2017-01-09 10:27:51 -05:00
parent 04e3301694
commit 57c703c52f
2 changed files with 71 additions and 91 deletions

View File

@ -901,6 +901,33 @@ class KmipEngine(object):
else:
return False
def _get_object_with_access_controls(
self,
uid,
operation
):
object_type = self._get_object_type(uid)
managed_object = self._data_session.query(object_type).filter(
object_type.unique_identifier == uid
).one()
# Determine if the request should be carried out under the object's
# operation policy. If not, feign ignorance of the object.
is_allowed = self._is_allowed_by_operation_policy(
managed_object.operation_policy_name,
self._client_identity,
managed_object._owner,
managed_object.object_type,
operation
)
if not is_allowed:
raise exceptions.ItemNotFound(
"Could not locate object: {0}".format(uid)
)
return managed_object
def _process_operation(self, operation, payload):
if operation == enums.Operation.CREATE:
return self._process_create(payload)
@ -1292,25 +1319,10 @@ class KmipEngine(object):
# TODO (peterhamilton) Process key wrapping information
# 1. Error check wrapping keys for accessibility and usability
object_type = self._get_object_type(unique_identifier)
managed_object = self._data_session.query(object_type).filter(
object_type.unique_identifier == unique_identifier
).one()
# Determine if the request should be carried out under the object's
# operation policy. If not, feign ignorance of the object.
is_allowed = self._is_allowed_by_operation_policy(
managed_object.operation_policy_name,
self._client_identity,
managed_object._owner,
managed_object._object_type,
managed_object = self._get_object_with_access_controls(
unique_identifier,
enums.Operation.GET
)
if not is_allowed:
raise exceptions.ItemNotFound(
"Could not locate object: {0}".format(unique_identifier)
)
if key_format_type:
if not hasattr(managed_object, 'key_format_type'):
@ -1355,26 +1367,10 @@ class KmipEngine(object):
else:
unique_identifier = self._id_placeholder
object_type = self._get_object_type(unique_identifier)
managed_object = self._data_session.query(object_type).filter(
object_type.unique_identifier == unique_identifier
).one()
# Determine if the request should be carried out under the object's
# operation policy. If not, feign ignorance of the object.
is_allowed = self._is_allowed_by_operation_policy(
managed_object.operation_policy_name,
self._client_identity,
managed_object._owner,
managed_object._object_type,
managed_object = self._get_object_with_access_controls(
unique_identifier,
enums.Operation.GET_ATTRIBUTES
)
if not is_allowed:
raise exceptions.ItemNotFound(
"Could not locate object: {0}".format(unique_identifier)
)
attrs = self._get_attributes_from_managed_object(
managed_object,
payload.attribute_names
@ -1396,26 +1392,10 @@ class KmipEngine(object):
else:
unique_identifier = self._id_placeholder
object_type = self._get_object_type(unique_identifier)
managed_object = self._data_session.query(object_type).filter(
object_type.unique_identifier == unique_identifier
).one()
# Determine if the request should be carried out under the object's
# operation policy. If not, feign ignorance of the object.
is_allowed = self._is_allowed_by_operation_policy(
managed_object.operation_policy_name,
self._client_identity,
managed_object._owner,
managed_object._object_type,
enums.Operation.GET_ATTRIBUTES
managed_object = self._get_object_with_access_controls(
unique_identifier,
enums.Operation.GET_ATTRIBUTE_LIST
)
if not is_allowed:
raise exceptions.ItemNotFound(
"Could not locate object: {0}".format(unique_identifier)
)
object_attributes = self._get_attributes_from_managed_object(
managed_object,
list()
@ -1441,26 +1421,10 @@ class KmipEngine(object):
else:
unique_identifier = self._id_placeholder
object_type = self._get_object_type(unique_identifier)
managed_object = self._data_session.query(object_type).filter(
object_type.unique_identifier == unique_identifier
).one()
# Determine if the request should be carried out under the object's
# operation policy. If not, feign ignorance of the object.
is_allowed = self._is_allowed_by_operation_policy(
managed_object.operation_policy_name,
self._client_identity,
managed_object._owner,
managed_object._object_type,
managed_object = self._get_object_with_access_controls(
unique_identifier,
enums.Operation.ACTIVATE
)
if not is_allowed:
raise exceptions.ItemNotFound(
"Could not locate object: {0}".format(unique_identifier)
)
object_type = managed_object._object_type
if not hasattr(managed_object, 'state'):
raise exceptions.IllegalOperation(
@ -1494,30 +1458,15 @@ class KmipEngine(object):
else:
unique_identifier = self._id_placeholder
object_type = self._get_object_type(unique_identifier)
self._get_object_with_access_controls(
unique_identifier,
enums.Operation.DESTROY
)
# TODO (peterhamilton) Process attributes to see if destroy possible
# 1. Check object state. If invalid, error out.
# 2. Check object deactivation date. If invalid, error out.
managed_object = self._data_session.query(object_type).filter(
object_type.unique_identifier == unique_identifier
).one()
# Determine if the request should be carried out under the object's
# operation policy. If not, feign ignorance of the object.
is_allowed = self._is_allowed_by_operation_policy(
managed_object.operation_policy_name,
self._client_identity,
managed_object._owner,
managed_object._object_type,
enums.Operation.DESTROY
)
if not is_allowed:
raise exceptions.ItemNotFound(
"Could not locate object: {0}".format(unique_identifier)
)
self._logger.info(
"Destroying an object with ID: {0}".format(unique_identifier)
)

View File

@ -2307,6 +2307,37 @@ class TestKmipEngine(testtools.TestCase):
)
)
def test_get_object_with_access_controls(self):
"""
Test that an unallowed object access request is handled correctly.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._client_identity = 'test'
obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
obj_a._owner = 'admin'
e._data_session.add(obj_a)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
id_a = str(obj_a.unique_identifier)
# Test by specifying the ID of the object to retrieve and the
# operation context.
args = [id_a, enums.Operation.GET]
six.assertRaisesRegex(
self,
exceptions.ItemNotFound,
"Could not locate object: {0}".format(id_a),
e._get_object_with_access_controls,
*args
)
def test_create(self):
"""
Test that a Create request can be processed correctly.