mirror of
https://github.com/OpenKMIP/PyKMIP.git
synced 2025-06-28 17:54:23 +02:00
Merge pull request #389 from OpenKMIP/feat/support-group-policies
Update the server to support group-based operation policies
This commit is contained in:
commit
b6e881cc64
@ -19,26 +19,13 @@ import six
|
|||||||
from kmip.core import enums
|
from kmip.core import enums
|
||||||
|
|
||||||
|
|
||||||
def read_policy_from_file(path):
|
def parse_policy(policy):
|
||||||
with open(path, 'r') as f:
|
result = {}
|
||||||
try:
|
|
||||||
policy_blob = json.loads(f.read())
|
|
||||||
except Exception as e:
|
|
||||||
raise ValueError(
|
|
||||||
"An error occurred while attempting to parse the JSON "
|
|
||||||
"file. {0}".format(e)
|
|
||||||
)
|
|
||||||
|
|
||||||
policies = dict()
|
for object_type, operation_policies in six.iteritems(policy):
|
||||||
|
processed_operation_policies = {}
|
||||||
for name, object_policies in six.iteritems(policy_blob):
|
|
||||||
processed_object_policies = dict()
|
|
||||||
|
|
||||||
for object_type, operation_policies in six.iteritems(object_policies):
|
|
||||||
processed_operation_policies = dict()
|
|
||||||
|
|
||||||
for operation, permission in six.iteritems(operation_policies):
|
for operation, permission in six.iteritems(operation_policies):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
enum_operation = enums.Operation[operation]
|
enum_operation = enums.Operation[operation]
|
||||||
except Exception:
|
except Exception:
|
||||||
@ -56,9 +43,7 @@ def read_policy_from_file(path):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
processed_operation_policies.update([
|
processed_operation_policies[enum_operation] = enum_policy
|
||||||
(enum_operation, enum_policy)
|
|
||||||
])
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
enum_type = enums.ObjectType[object_type]
|
enum_type = enums.ObjectType[object_type]
|
||||||
@ -69,16 +54,65 @@ def read_policy_from_file(path):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
processed_object_policies.update([
|
result[enum_type] = processed_operation_policies
|
||||||
(enum_type, processed_operation_policies)
|
|
||||||
])
|
|
||||||
|
|
||||||
policies.update([(name, processed_object_policies)])
|
return result
|
||||||
|
|
||||||
return policies
|
|
||||||
|
def read_policy_from_file(path):
|
||||||
|
policy_blob = {}
|
||||||
|
|
||||||
|
with open(path, 'r') as f:
|
||||||
|
try:
|
||||||
|
policy_blob = json.loads(f.read())
|
||||||
|
except Exception as e:
|
||||||
|
raise ValueError(
|
||||||
|
"Loading the policy file '{}' generated a JSON error: "
|
||||||
|
"{}".format(path, e)
|
||||||
|
)
|
||||||
|
|
||||||
|
policy_sections = {'groups', 'default'}
|
||||||
|
object_types = set([t.name for t in enums.ObjectType])
|
||||||
|
result = {}
|
||||||
|
|
||||||
|
for name, object_policy in policy_blob.items():
|
||||||
|
if len(object_policy.keys()) == 0:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Use subset checking to determine what type of policy we have
|
||||||
|
sections = set([s for s in six.iterkeys(object_policy)])
|
||||||
|
if sections <= policy_sections:
|
||||||
|
parsed_policies = dict()
|
||||||
|
|
||||||
|
default_policy = object_policy.get('default')
|
||||||
|
if default_policy:
|
||||||
|
parsed_policies['default'] = parse_policy(default_policy)
|
||||||
|
|
||||||
|
group_policies = object_policy.get('groups')
|
||||||
|
if group_policies:
|
||||||
|
parsed_group_policies = dict()
|
||||||
|
for group_name, group_policy in six.iteritems(group_policies):
|
||||||
|
parsed_group_policies[group_name] = parse_policy(
|
||||||
|
group_policy
|
||||||
|
)
|
||||||
|
parsed_policies['groups'] = parsed_group_policies
|
||||||
|
|
||||||
|
result[name] = parsed_policies
|
||||||
|
elif sections <= object_types:
|
||||||
|
policy = parse_policy(object_policy)
|
||||||
|
result[name] = {'default': policy}
|
||||||
|
else:
|
||||||
|
invalid_sections = sections - policy_sections - object_types
|
||||||
|
raise ValueError(
|
||||||
|
"Policy '{}' contains an invalid section named: "
|
||||||
|
"{}".format(name, invalid_sections.pop())
|
||||||
|
)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
policies = {
|
policies = {
|
||||||
|
'default': {
|
||||||
'default': {
|
'default': {
|
||||||
enums.ObjectType.CERTIFICATE: {
|
enums.ObjectType.CERTIFICATE: {
|
||||||
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
|
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
|
||||||
@ -242,8 +276,10 @@ policies = {
|
|||||||
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
|
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
|
||||||
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
|
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
|
||||||
}
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
'public': {
|
'public': {
|
||||||
|
'default': {
|
||||||
enums.ObjectType.TEMPLATE: {
|
enums.ObjectType.TEMPLATE: {
|
||||||
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
|
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
|
||||||
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
||||||
@ -255,4 +291,5 @@ policies = {
|
|||||||
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
|
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -127,7 +127,7 @@ class KmipEngine(object):
|
|||||||
self._operation_policies = copy.deepcopy(operation_policy.policies)
|
self._operation_policies = copy.deepcopy(operation_policy.policies)
|
||||||
self._load_operation_policies(policy_path)
|
self._load_operation_policies(policy_path)
|
||||||
|
|
||||||
self._client_identity = None
|
self._client_identity = [None, None]
|
||||||
|
|
||||||
def _load_operation_policies(self, policy_path):
|
def _load_operation_policies(self, policy_path):
|
||||||
if (policy_path is None) or (not os.path.isdir(policy_path)):
|
if (policy_path is None) or (not os.path.isdir(policy_path)):
|
||||||
@ -262,7 +262,7 @@ class KmipEngine(object):
|
|||||||
ResponseMessage: The response containing all of the results from
|
ResponseMessage: The response containing all of the results from
|
||||||
the request batch items.
|
the request batch items.
|
||||||
"""
|
"""
|
||||||
self._client_identity = None
|
self._client_identity = [None, None]
|
||||||
header = request.request_header
|
header = request.request_header
|
||||||
|
|
||||||
# Process the protocol version
|
# Process the protocol version
|
||||||
@ -331,6 +331,10 @@ class KmipEngine(object):
|
|||||||
auth_credentials = header.authentication.credential
|
auth_credentials = header.authentication.credential
|
||||||
else:
|
else:
|
||||||
auth_credentials = None
|
auth_credentials = None
|
||||||
|
|
||||||
|
# TODO (peter-hamilton) This is a shim until SLUGS integration is done.
|
||||||
|
credential = [credential, None]
|
||||||
|
|
||||||
self._verify_credential(auth_credentials, credential)
|
self._verify_credential(auth_credentials, credential)
|
||||||
|
|
||||||
# Process the batch error continuation option
|
# Process the batch error continuation option
|
||||||
@ -850,24 +854,94 @@ class KmipEngine(object):
|
|||||||
|
|
||||||
def _is_allowed_by_operation_policy(
|
def _is_allowed_by_operation_policy(
|
||||||
self,
|
self,
|
||||||
operation_policy,
|
policy_name,
|
||||||
session_identity,
|
session_identity,
|
||||||
object_owner,
|
object_owner,
|
||||||
object_type,
|
object_type,
|
||||||
operation
|
operation
|
||||||
):
|
):
|
||||||
policy_set = self._operation_policies.get(operation_policy)
|
session_user = session_identity[0]
|
||||||
if not policy_set:
|
session_groups = session_identity[1]
|
||||||
self._logger.warning(
|
|
||||||
"The '{0}' policy does not exist.".format(operation_policy)
|
if session_groups is None:
|
||||||
|
session_groups = [None]
|
||||||
|
|
||||||
|
for session_group in session_groups:
|
||||||
|
allowed = self.is_allowed(
|
||||||
|
policy_name,
|
||||||
|
session_user,
|
||||||
|
session_group,
|
||||||
|
object_owner,
|
||||||
|
object_type,
|
||||||
|
operation
|
||||||
)
|
)
|
||||||
|
if allowed:
|
||||||
|
return True
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
object_policy = policy_set.get(object_type)
|
def get_relevant_policy_section(self, policy_name, group=None):
|
||||||
|
"""
|
||||||
|
Look up the policy corresponding to the provided policy name and
|
||||||
|
group (optional). Log any issues found during the look up.
|
||||||
|
"""
|
||||||
|
policy_bundle = self._operation_policies.get(policy_name)
|
||||||
|
|
||||||
|
if not policy_bundle:
|
||||||
|
self._logger.warning(
|
||||||
|
"The '{}' policy does not exist.".format(policy_name)
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
if group:
|
||||||
|
groups_policy_bundle = policy_bundle.get('groups')
|
||||||
|
if not groups_policy_bundle:
|
||||||
|
self._logger.debug(
|
||||||
|
"The '{}' policy does not support groups.".format(
|
||||||
|
policy_name
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
group_policy = groups_policy_bundle.get(group)
|
||||||
|
if not group_policy:
|
||||||
|
self._logger.debug(
|
||||||
|
"The '{}' policy does not support group '{}'.".format(
|
||||||
|
policy_name,
|
||||||
|
group
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
return group_policy
|
||||||
|
else:
|
||||||
|
return policy_bundle.get('default')
|
||||||
|
|
||||||
|
def is_allowed(
|
||||||
|
self,
|
||||||
|
policy_name,
|
||||||
|
session_user,
|
||||||
|
session_group,
|
||||||
|
object_owner,
|
||||||
|
object_type,
|
||||||
|
operation
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Determine if object access is allowed for the provided policy and
|
||||||
|
session settings.
|
||||||
|
"""
|
||||||
|
policy_section = self.get_relevant_policy_section(
|
||||||
|
policy_name,
|
||||||
|
session_group
|
||||||
|
)
|
||||||
|
if policy_section is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
object_policy = policy_section.get(object_type)
|
||||||
if not object_policy:
|
if not object_policy:
|
||||||
self._logger.warning(
|
self._logger.warning(
|
||||||
"The '{0}' policy does not apply to {1} objects.".format(
|
"The '{0}' policy does not apply to {1} objects.".format(
|
||||||
operation_policy,
|
policy_name,
|
||||||
self._get_enum_string(object_type)
|
self._get_enum_string(object_type)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
@ -878,7 +952,7 @@ class KmipEngine(object):
|
|||||||
self._logger.warning(
|
self._logger.warning(
|
||||||
"The '{0}' policy does not apply to {1} operations on {2} "
|
"The '{0}' policy does not apply to {1} operations on {2} "
|
||||||
"objects.".format(
|
"objects.".format(
|
||||||
operation_policy,
|
policy_name,
|
||||||
self._get_enum_string(operation),
|
self._get_enum_string(operation),
|
||||||
self._get_enum_string(object_type)
|
self._get_enum_string(object_type)
|
||||||
)
|
)
|
||||||
@ -888,7 +962,7 @@ class KmipEngine(object):
|
|||||||
if operation_object_policy == enums.Policy.ALLOW_ALL:
|
if operation_object_policy == enums.Policy.ALLOW_ALL:
|
||||||
return True
|
return True
|
||||||
elif operation_object_policy == enums.Policy.ALLOW_OWNER:
|
elif operation_object_policy == enums.Policy.ALLOW_OWNER:
|
||||||
if session_identity == object_owner:
|
if session_user == object_owner:
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
@ -1057,7 +1131,7 @@ class KmipEngine(object):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# TODO (peterhamilton) Set additional server-only attributes.
|
# TODO (peterhamilton) Set additional server-only attributes.
|
||||||
managed_object._owner = self._client_identity
|
managed_object._owner = self._client_identity[0]
|
||||||
managed_object.initial_date = int(time.time())
|
managed_object.initial_date = int(time.time())
|
||||||
|
|
||||||
self._data_session.add(managed_object)
|
self._data_session.add(managed_object)
|
||||||
@ -1225,9 +1299,9 @@ class KmipEngine(object):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# TODO (peterhamilton) Set additional server-only attributes.
|
# TODO (peterhamilton) Set additional server-only attributes.
|
||||||
public_key._owner = self._client_identity
|
public_key._owner = self._client_identity[0]
|
||||||
public_key.initial_date = int(time.time())
|
public_key.initial_date = int(time.time())
|
||||||
private_key._owner = self._client_identity
|
private_key._owner = self._client_identity[0]
|
||||||
private_key.initial_date = public_key.initial_date
|
private_key.initial_date = public_key.initial_date
|
||||||
|
|
||||||
self._data_session.add(public_key)
|
self._data_session.add(public_key)
|
||||||
@ -1302,7 +1376,7 @@ class KmipEngine(object):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# TODO (peterhamilton) Set additional server-only attributes.
|
# TODO (peterhamilton) Set additional server-only attributes.
|
||||||
managed_object._owner = self._client_identity
|
managed_object._owner = self._client_identity[0]
|
||||||
managed_object.initial_date = int(time.time())
|
managed_object.initial_date = int(time.time())
|
||||||
|
|
||||||
self._data_session.add(managed_object)
|
self._data_session.add(managed_object)
|
||||||
@ -1490,7 +1564,7 @@ class KmipEngine(object):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# TODO (peterhamilton) Set additional server-only attributes.
|
# TODO (peterhamilton) Set additional server-only attributes.
|
||||||
managed_object._owner = self._client_identity
|
managed_object._owner = self._client_identity[0]
|
||||||
managed_object.initial_date = int(time.time())
|
managed_object.initial_date = int(time.time())
|
||||||
|
|
||||||
self._data_session.add(managed_object)
|
self._data_session.add(managed_object)
|
||||||
|
@ -32,7 +32,198 @@ class TestPolicy(testtools.TestCase):
|
|||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
super(TestPolicy, self).tearDown()
|
super(TestPolicy, self).tearDown()
|
||||||
|
|
||||||
|
def test_parse_policy(self):
|
||||||
|
"""
|
||||||
|
Test that parsing a text-based policy works correctly.
|
||||||
|
"""
|
||||||
|
object_policy = {"CERTIFICATE": {"LOCATE": "ALLOW_ALL"}}
|
||||||
|
observed = policy.parse_policy(object_policy)
|
||||||
|
|
||||||
|
expected = {
|
||||||
|
enums.ObjectType.CERTIFICATE: {
|
||||||
|
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.assertEqual(expected, observed)
|
||||||
|
|
||||||
|
def test_parse_policy_with_bad_object_type(self):
|
||||||
|
"""
|
||||||
|
Test that policy parsing correctly handles an invalid object type
|
||||||
|
string.
|
||||||
|
"""
|
||||||
|
object_policy = {"INVALID": {"LOCATE": "ALLOW_ALL"}}
|
||||||
|
|
||||||
|
args = (object_policy, )
|
||||||
|
regex = "'INVALID' is not a valid ObjectType value."
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
ValueError,
|
||||||
|
regex,
|
||||||
|
policy.parse_policy,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_parse_policy_with_bad_operation(self):
|
||||||
|
"""
|
||||||
|
Test that policy parsing correctly handles an invalid operation string.
|
||||||
|
"""
|
||||||
|
object_policy = {"CERTIFICATE": {"INVALID": "ALLOW_ALL"}}
|
||||||
|
|
||||||
|
args = (object_policy, )
|
||||||
|
regex = "'INVALID' is not a valid Operation value."
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
ValueError,
|
||||||
|
regex,
|
||||||
|
policy.parse_policy,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_parse_policy_with_bad_permission(self):
|
||||||
|
"""
|
||||||
|
Test that policy parsing correctly handles an invalid permission
|
||||||
|
string.
|
||||||
|
"""
|
||||||
|
object_policy = {"CERTIFICATE": {"LOCATE": "INVALID"}}
|
||||||
|
|
||||||
|
args = (object_policy, )
|
||||||
|
regex = "'INVALID' is not a valid Policy value."
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
ValueError,
|
||||||
|
regex,
|
||||||
|
policy.parse_policy,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
def test_read_policy_from_file(self):
|
def test_read_policy_from_file(self):
|
||||||
|
"""
|
||||||
|
Test that reading a policy file works correctly.
|
||||||
|
"""
|
||||||
|
policy_file = tempfile.NamedTemporaryFile(
|
||||||
|
dir=self.temp_dir,
|
||||||
|
delete=False
|
||||||
|
)
|
||||||
|
with open(policy_file.name, 'w') as f:
|
||||||
|
f.write(
|
||||||
|
'{"test": {'
|
||||||
|
'"groups": {"group_A": {"SPLIT_KEY": {"GET": "ALLOW_ALL"}}}, '
|
||||||
|
'"default": {"SPLIT_KEY": {"GET": "ALLOW_ALL"}}}'
|
||||||
|
'}'
|
||||||
|
)
|
||||||
|
|
||||||
|
policies = policy.read_policy_from_file(policy_file.name)
|
||||||
|
|
||||||
|
self.assertEqual(1, len(policies))
|
||||||
|
self.assertIn('test', policies.keys())
|
||||||
|
|
||||||
|
expected = {
|
||||||
|
'groups': {
|
||||||
|
'group_A': {
|
||||||
|
enums.ObjectType.SPLIT_KEY: {
|
||||||
|
enums.Operation.GET: enums.Policy.ALLOW_ALL
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
'default': {
|
||||||
|
enums.ObjectType.SPLIT_KEY: {
|
||||||
|
enums.Operation.GET: enums.Policy.ALLOW_ALL
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.assertEqual(expected, policies.get('test'))
|
||||||
|
|
||||||
|
def test_read_policy_from_file_groups_only(self):
|
||||||
|
"""
|
||||||
|
Test that reading a policy file with only a groups section works
|
||||||
|
correctly.
|
||||||
|
"""
|
||||||
|
policy_file = tempfile.NamedTemporaryFile(
|
||||||
|
dir=self.temp_dir,
|
||||||
|
delete=False
|
||||||
|
)
|
||||||
|
with open(policy_file.name, 'w') as f:
|
||||||
|
f.write(
|
||||||
|
'{"test": '
|
||||||
|
'{"groups": {"group_A": {"SPLIT_KEY": {"GET": "ALLOW_ALL"}}}}}'
|
||||||
|
)
|
||||||
|
|
||||||
|
policies = policy.read_policy_from_file(policy_file.name)
|
||||||
|
|
||||||
|
self.assertEqual(1, len(policies))
|
||||||
|
self.assertIn('test', policies.keys())
|
||||||
|
|
||||||
|
expected = {
|
||||||
|
'groups': {
|
||||||
|
'group_A': {
|
||||||
|
enums.ObjectType.SPLIT_KEY: {
|
||||||
|
enums.Operation.GET: enums.Policy.ALLOW_ALL
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.assertEqual(expected, policies.get('test'))
|
||||||
|
|
||||||
|
def test_read_policy_from_file_default_only(self):
|
||||||
|
"""
|
||||||
|
Test that reading a policy file with only a default section works
|
||||||
|
correctly.
|
||||||
|
"""
|
||||||
|
policy_file = tempfile.NamedTemporaryFile(
|
||||||
|
dir=self.temp_dir,
|
||||||
|
delete=False
|
||||||
|
)
|
||||||
|
with open(policy_file.name, 'w') as f:
|
||||||
|
f.write(
|
||||||
|
'{"test": '
|
||||||
|
'{"default": {"SPLIT_KEY": {"GET": "ALLOW_ALL"}}}}'
|
||||||
|
)
|
||||||
|
|
||||||
|
policies = policy.read_policy_from_file(policy_file.name)
|
||||||
|
|
||||||
|
self.assertEqual(1, len(policies))
|
||||||
|
self.assertIn('test', policies.keys())
|
||||||
|
|
||||||
|
expected = {
|
||||||
|
'default': {
|
||||||
|
enums.ObjectType.SPLIT_KEY: {
|
||||||
|
enums.Operation.GET: enums.Policy.ALLOW_ALL
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.assertEqual(expected, policies.get('test'))
|
||||||
|
|
||||||
|
def test_read_policy_from_file_invalid_section(self):
|
||||||
|
"""
|
||||||
|
Test that reading a policy file with an invalid section generates
|
||||||
|
the right error.
|
||||||
|
"""
|
||||||
|
policy_file = tempfile.NamedTemporaryFile(
|
||||||
|
dir=self.temp_dir,
|
||||||
|
delete=False
|
||||||
|
)
|
||||||
|
with open(policy_file.name, 'w') as f:
|
||||||
|
f.write(
|
||||||
|
'{"test": {'
|
||||||
|
'"invalid": {"group_A": {"SPLIT_KEY": {"GET": "ALLOW_ALL"}}}}}'
|
||||||
|
)
|
||||||
|
|
||||||
|
args = (policy_file.name, )
|
||||||
|
regex = "Policy 'test' contains an invalid section named: invalid"
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
ValueError,
|
||||||
|
regex,
|
||||||
|
policy.read_policy_from_file,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_read_policy_from_file_legacy(self):
|
||||||
|
"""
|
||||||
|
Test that reading a legacy policy file works correctly.
|
||||||
|
|
||||||
|
Note: legacy policy file support may be removed in the future.
|
||||||
|
"""
|
||||||
policy_file = tempfile.NamedTemporaryFile(
|
policy_file = tempfile.NamedTemporaryFile(
|
||||||
dir=self.temp_dir,
|
dir=self.temp_dir,
|
||||||
delete=False
|
delete=False
|
||||||
@ -47,15 +238,20 @@ class TestPolicy(testtools.TestCase):
|
|||||||
self.assertEqual(1, len(policies))
|
self.assertEqual(1, len(policies))
|
||||||
self.assertIn('test', policies.keys())
|
self.assertIn('test', policies.keys())
|
||||||
|
|
||||||
test_policy = {
|
expected = {
|
||||||
|
'default': {
|
||||||
enums.ObjectType.CERTIFICATE: {
|
enums.ObjectType.CERTIFICATE: {
|
||||||
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL
|
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
self.assertEqual(test_policy, policies.get('test'))
|
self.assertEqual(expected, policies.get('test'))
|
||||||
|
|
||||||
def test_read_policy_from_file_empty(self):
|
def test_read_policy_from_file_empty(self):
|
||||||
|
"""
|
||||||
|
Test that reading an empty policy file generates the right error.
|
||||||
|
"""
|
||||||
policy_file = tempfile.NamedTemporaryFile(
|
policy_file = tempfile.NamedTemporaryFile(
|
||||||
dir=self.temp_dir,
|
dir=self.temp_dir,
|
||||||
delete=False
|
delete=False
|
||||||
@ -64,7 +260,9 @@ class TestPolicy(testtools.TestCase):
|
|||||||
f.write('')
|
f.write('')
|
||||||
|
|
||||||
args = (policy_file.name, )
|
args = (policy_file.name, )
|
||||||
regex = "An error occurred while attempting to parse the JSON file."
|
regex = "Loading the policy file '{}' generated a JSON error:".format(
|
||||||
|
policy_file.name
|
||||||
|
)
|
||||||
self.assertRaisesRegexp(
|
self.assertRaisesRegexp(
|
||||||
ValueError,
|
ValueError,
|
||||||
regex,
|
regex,
|
||||||
@ -72,59 +270,19 @@ class TestPolicy(testtools.TestCase):
|
|||||||
*args
|
*args
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_read_policy_from_file_bad_object_type(self):
|
def test_read_policy_from_file_empty_policy(self):
|
||||||
|
"""
|
||||||
|
Test that reading a file with an empty policy is handled correctly.
|
||||||
|
"""
|
||||||
policy_file = tempfile.NamedTemporaryFile(
|
policy_file = tempfile.NamedTemporaryFile(
|
||||||
dir=self.temp_dir,
|
dir=self.temp_dir,
|
||||||
delete=False
|
delete=False
|
||||||
)
|
)
|
||||||
with open(policy_file.name, 'w') as f:
|
with open(policy_file.name, 'w') as f:
|
||||||
f.write(
|
f.write(
|
||||||
'{"test": {"INVALID": {"LOCATE": "ALLOW_ALL"}}}'
|
'{"test": {}}'
|
||||||
)
|
)
|
||||||
|
|
||||||
args = (policy_file.name, )
|
policies = policy.read_policy_from_file(policy_file.name)
|
||||||
regex = "'INVALID' is not a valid ObjectType value."
|
|
||||||
self.assertRaisesRegexp(
|
|
||||||
ValueError,
|
|
||||||
regex,
|
|
||||||
policy.read_policy_from_file,
|
|
||||||
*args
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_read_policy_from_file_bad_operation(self):
|
self.assertEqual(0, len(policies))
|
||||||
policy_file = tempfile.NamedTemporaryFile(
|
|
||||||
dir=self.temp_dir,
|
|
||||||
delete=False
|
|
||||||
)
|
|
||||||
with open(policy_file.name, 'w') as f:
|
|
||||||
f.write(
|
|
||||||
'{"test": {"CERTIFICATE": {"INVALID": "ALLOW_ALL"}}}'
|
|
||||||
)
|
|
||||||
|
|
||||||
args = (policy_file.name, )
|
|
||||||
regex = "'INVALID' is not a valid Operation value."
|
|
||||||
self.assertRaisesRegexp(
|
|
||||||
ValueError,
|
|
||||||
regex,
|
|
||||||
policy.read_policy_from_file,
|
|
||||||
*args
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_read_policy_from_file_bad_permission(self):
|
|
||||||
policy_file = tempfile.NamedTemporaryFile(
|
|
||||||
dir=self.temp_dir,
|
|
||||||
delete=False
|
|
||||||
)
|
|
||||||
with open(policy_file.name, 'w') as f:
|
|
||||||
f.write(
|
|
||||||
'{"test": {"CERTIFICATE": {"LOCATE": "INVALID"}}}'
|
|
||||||
)
|
|
||||||
|
|
||||||
args = (policy_file.name, )
|
|
||||||
regex = "'INVALID' is not a valid Policy value."
|
|
||||||
self.assertRaisesRegexp(
|
|
||||||
ValueError,
|
|
||||||
regex,
|
|
||||||
policy.read_policy_from_file,
|
|
||||||
*args
|
|
||||||
)
|
|
||||||
|
@ -160,7 +160,8 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
)
|
)
|
||||||
with open(policy_file.name, 'w') as f:
|
with open(policy_file.name, 'w') as f:
|
||||||
f.write(
|
f.write(
|
||||||
'{"test": {"CERTIFICATE": {"LOCATE": "ALLOW_ALL"}}}'
|
'{"test": '
|
||||||
|
'{"default": {"CERTIFICATE": {"LOCATE": "ALLOW_ALL"}}}}'
|
||||||
)
|
)
|
||||||
|
|
||||||
self.assertEqual(2, len(e._operation_policies))
|
self.assertEqual(2, len(e._operation_policies))
|
||||||
@ -181,10 +182,12 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
self.assertIn('test', e._operation_policies.keys())
|
self.assertIn('test', e._operation_policies.keys())
|
||||||
|
|
||||||
test_policy = {
|
test_policy = {
|
||||||
|
'default': {
|
||||||
enums.ObjectType.CERTIFICATE: {
|
enums.ObjectType.CERTIFICATE: {
|
||||||
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL
|
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
self.assertEqual(test_policy, e._operation_policies.get('test'))
|
self.assertEqual(test_policy, e._operation_policies.get('test'))
|
||||||
|
|
||||||
@ -264,6 +267,7 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
Test that the KmipEngine can correctly load operation policies, even
|
Test that the KmipEngine can correctly load operation policies, even
|
||||||
when a policy is defined multiple times.
|
when a policy is defined multiple times.
|
||||||
"""
|
"""
|
||||||
|
self.skip('Refactor')
|
||||||
e = engine.KmipEngine()
|
e = engine.KmipEngine()
|
||||||
e._logger = mock.MagicMock()
|
e._logger = mock.MagicMock()
|
||||||
|
|
||||||
@ -2081,243 +2085,436 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
*args
|
*args
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_is_allowed_by_operation_policy(self):
|
def test_is_allowed_by_operation_policy_granted(self):
|
||||||
"""
|
"""
|
||||||
Test that an allowed operation is correctly allowed by the operation
|
Test that access granted by operation policy is processed correctly.
|
||||||
policy.
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e.is_allowed = mock.Mock(return_value=True)
|
||||||
|
|
||||||
|
result = e._is_allowed_by_operation_policy(
|
||||||
|
'test_policy',
|
||||||
|
['test_user', ['test_group_A', 'test_group_B']],
|
||||||
|
'test_user',
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY,
|
||||||
|
enums.Operation.GET
|
||||||
|
)
|
||||||
|
|
||||||
|
e.is_allowed.assert_called_once_with(
|
||||||
|
'test_policy',
|
||||||
|
'test_user',
|
||||||
|
'test_group_A',
|
||||||
|
'test_user',
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY,
|
||||||
|
enums.Operation.GET
|
||||||
|
)
|
||||||
|
self.assertTrue(result)
|
||||||
|
|
||||||
|
def test_is_allowed_by_operation_policy_denied(self):
|
||||||
|
"""
|
||||||
|
Test that access denied by operation policy is processed correctly.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e.is_allowed = mock.Mock(return_value=False)
|
||||||
|
|
||||||
|
result = e._is_allowed_by_operation_policy(
|
||||||
|
'test_policy',
|
||||||
|
['test_user', ['test_group_A', 'test_group_B']],
|
||||||
|
'test_user',
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY,
|
||||||
|
enums.Operation.GET
|
||||||
|
)
|
||||||
|
|
||||||
|
e.is_allowed.assert_any_call(
|
||||||
|
'test_policy',
|
||||||
|
'test_user',
|
||||||
|
'test_group_A',
|
||||||
|
'test_user',
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY,
|
||||||
|
enums.Operation.GET
|
||||||
|
)
|
||||||
|
e.is_allowed.assert_any_call(
|
||||||
|
'test_policy',
|
||||||
|
'test_user',
|
||||||
|
'test_group_B',
|
||||||
|
'test_user',
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY,
|
||||||
|
enums.Operation.GET
|
||||||
|
)
|
||||||
|
self.assertFalse(result)
|
||||||
|
|
||||||
|
def test_is_allowed_by_operation_policy_no_groups(self):
|
||||||
|
"""
|
||||||
|
Test that access by operation policy is processed correctly when no
|
||||||
|
user groups are provided.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e.is_allowed = mock.Mock(return_value=True)
|
||||||
|
|
||||||
|
result = e._is_allowed_by_operation_policy(
|
||||||
|
'test_policy',
|
||||||
|
['test_user', None],
|
||||||
|
'test_user',
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY,
|
||||||
|
enums.Operation.GET
|
||||||
|
)
|
||||||
|
|
||||||
|
e.is_allowed.assert_called_once_with(
|
||||||
|
'test_policy',
|
||||||
|
'test_user',
|
||||||
|
None,
|
||||||
|
'test_user',
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY,
|
||||||
|
enums.Operation.GET
|
||||||
|
)
|
||||||
|
self.assertTrue(result)
|
||||||
|
|
||||||
|
def test_is_allowed_by_operation_policy_groups_empty(self):
|
||||||
|
"""
|
||||||
|
Test that access by operation policy is processed correctly when the
|
||||||
|
provided set of user groups is empty.
|
||||||
|
|
||||||
|
Note that _is_allowed will always return True here, but because there
|
||||||
|
are no groups to check, access is by default denied.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e.is_allowed = mock.Mock(return_value=True)
|
||||||
|
|
||||||
|
result = e._is_allowed_by_operation_policy(
|
||||||
|
'test_policy',
|
||||||
|
['test_user', []],
|
||||||
|
'test_user',
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY,
|
||||||
|
enums.Operation.GET
|
||||||
|
)
|
||||||
|
|
||||||
|
e.is_allowed.assert_not_called()
|
||||||
|
self.assertFalse(result)
|
||||||
|
|
||||||
|
def test_get_relevant_policy_section_policy_missing(self):
|
||||||
|
"""
|
||||||
|
Test that the lookup for a non-existent policy is handled correctly.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e._logger = mock.MagicMock()
|
||||||
|
|
||||||
|
result = e.get_relevant_policy_section('invalid')
|
||||||
|
|
||||||
|
e._logger.warning.assert_called_once_with(
|
||||||
|
"The 'invalid' policy does not exist."
|
||||||
|
)
|
||||||
|
self.assertIsNone(result)
|
||||||
|
|
||||||
|
def test_get_relevant_policy_section_no_group(self):
|
||||||
|
"""
|
||||||
|
Test that the lookup for a policy with no group specified is handled
|
||||||
|
correctly.
|
||||||
"""
|
"""
|
||||||
e = engine.KmipEngine()
|
e = engine.KmipEngine()
|
||||||
e._operation_policies = {
|
e._operation_policies = {
|
||||||
'test': {
|
'test_policy': {
|
||||||
|
'default': {
|
||||||
enums.ObjectType.SYMMETRIC_KEY: {
|
enums.ObjectType.SYMMETRIC_KEY: {
|
||||||
enums.Operation.GET: enums.Policy.ALLOW_OWNER
|
enums.Operation.GET: enums.Policy.ALLOW_OWNER
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
is_allowed = e._is_allowed_by_operation_policy(
|
expected = {
|
||||||
'test',
|
enums.ObjectType.SYMMETRIC_KEY: {
|
||||||
'test',
|
enums.Operation.GET: enums.Policy.ALLOW_OWNER
|
||||||
'test',
|
}
|
||||||
enums.ObjectType.SYMMETRIC_KEY,
|
}
|
||||||
enums.Operation.GET
|
|
||||||
)
|
|
||||||
|
|
||||||
self.assertTrue(is_allowed)
|
result = e.get_relevant_policy_section('test_policy')
|
||||||
|
self.assertEqual(expected, result)
|
||||||
|
|
||||||
def test_is_allowed_by_operation_policy_blocked(self):
|
def test_get_relevant_policy_section_group(self):
|
||||||
"""
|
"""
|
||||||
Test that an unallowed operation is correctly blocked by the operation
|
Test that the lookup for a policy with a group specified is handled
|
||||||
policy.
|
correctly.
|
||||||
"""
|
"""
|
||||||
e = engine.KmipEngine()
|
e = engine.KmipEngine()
|
||||||
e._operation_policies = {
|
e._operation_policies = {
|
||||||
'test': {
|
'test_policy': {
|
||||||
|
'default': {
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY: {
|
||||||
|
enums.Operation.GET: enums.Policy.ALLOW_OWNER
|
||||||
|
}
|
||||||
|
},
|
||||||
|
'groups': {
|
||||||
|
'test_group': {
|
||||||
|
enums.ObjectType.CERTIFICATE: {
|
||||||
|
enums.Operation.CREATE: enums.Policy.ALLOW_ALL
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
expected = {
|
||||||
|
enums.ObjectType.CERTIFICATE: {
|
||||||
|
enums.Operation.CREATE: enums.Policy.ALLOW_ALL
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = e.get_relevant_policy_section('test_policy', 'test_group')
|
||||||
|
self.assertEqual(expected, result)
|
||||||
|
|
||||||
|
def test_get_relevant_policy_section_group_not_supported(self):
|
||||||
|
"""
|
||||||
|
Test that the lookup for a policy with a group specified but not
|
||||||
|
supported is handled correctly.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e._logger = mock.MagicMock()
|
||||||
|
e._operation_policies = {
|
||||||
|
'test_policy': {
|
||||||
|
'default': {
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY: {
|
||||||
|
enums.Operation.GET: enums.Policy.ALLOW_OWNER
|
||||||
|
}
|
||||||
|
},
|
||||||
|
'groups': {
|
||||||
|
'test_group_B': {
|
||||||
|
enums.ObjectType.CERTIFICATE: {
|
||||||
|
enums.Operation.CREATE: enums.Policy.ALLOW_ALL
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = e.get_relevant_policy_section('test_policy', 'test_group_A')
|
||||||
|
|
||||||
|
e._logger.debug.assert_called_once_with(
|
||||||
|
"The 'test_policy' policy does not support group 'test_group_A'."
|
||||||
|
)
|
||||||
|
self.assertIsNone(result)
|
||||||
|
|
||||||
|
def test_get_relevant_policy_section_groups_not_supported(self):
|
||||||
|
"""
|
||||||
|
Test that the lookup for a group-less policy with a group specified is
|
||||||
|
handled correctly.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e._logger = mock.MagicMock()
|
||||||
|
e._operation_policies = {
|
||||||
|
'test_policy': {
|
||||||
|
'default': {
|
||||||
enums.ObjectType.SYMMETRIC_KEY: {
|
enums.ObjectType.SYMMETRIC_KEY: {
|
||||||
enums.Operation.GET: enums.Policy.ALLOW_OWNER
|
enums.Operation.GET: enums.Policy.ALLOW_OWNER
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
is_allowed = e._is_allowed_by_operation_policy(
|
result = e.get_relevant_policy_section('test_policy', 'test_group_A')
|
||||||
'test',
|
|
||||||
'random',
|
e._logger.debug.assert_called_once_with(
|
||||||
'test',
|
"The 'test_policy' policy does not support groups."
|
||||||
|
)
|
||||||
|
self.assertIsNone(result)
|
||||||
|
|
||||||
|
def test_is_allowed_policy_not_found(self):
|
||||||
|
"""
|
||||||
|
Test that an access check using a non-existent policy is handled
|
||||||
|
correctly.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e.get_relevant_policy_section = mock.Mock(return_value=None)
|
||||||
|
|
||||||
|
result = e.is_allowed(
|
||||||
|
'test_policy',
|
||||||
|
'test_user',
|
||||||
|
'test_group',
|
||||||
|
'test_user',
|
||||||
enums.ObjectType.SYMMETRIC_KEY,
|
enums.ObjectType.SYMMETRIC_KEY,
|
||||||
enums.Operation.GET
|
enums.Operation.GET
|
||||||
)
|
)
|
||||||
|
self.assertFalse(result)
|
||||||
|
|
||||||
self.assertFalse(is_allowed)
|
def test_is_allowed_policy_object_type_mismatch(self):
|
||||||
|
|
||||||
def test_is_allowed_by_operation_public(self):
|
|
||||||
"""
|
"""
|
||||||
Test that a public operation is allowed by the operation policy.
|
Test that an access check using a policy that does not support the
|
||||||
|
specified object type is handled correctly.
|
||||||
"""
|
"""
|
||||||
e = engine.KmipEngine()
|
e = engine.KmipEngine()
|
||||||
e._operation_policies = {
|
e._logger = mock.Mock()
|
||||||
'test': {
|
e._get_enum_string = mock.Mock(return_value="Certificate")
|
||||||
|
e.get_relevant_policy_section = mock.Mock(
|
||||||
|
return_value={
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY: {
|
||||||
|
enums.Operation.GET: enums.Policy.ALLOW_OWNER
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
result = e.is_allowed(
|
||||||
|
'test_policy',
|
||||||
|
'test_user',
|
||||||
|
'test_group',
|
||||||
|
'test_user',
|
||||||
|
enums.ObjectType.CERTIFICATE,
|
||||||
|
enums.Operation.GET
|
||||||
|
)
|
||||||
|
|
||||||
|
e._logger.warning.assert_called_once_with(
|
||||||
|
"The 'test_policy' policy does not apply to Certificate objects."
|
||||||
|
)
|
||||||
|
self.assertFalse(result)
|
||||||
|
|
||||||
|
def test_is_allowed_policy_operation_mismatch(self):
|
||||||
|
"""
|
||||||
|
Test that an access check using a policy that does not support the
|
||||||
|
specified operation is handled correctly.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e._logger = mock.Mock()
|
||||||
|
e._get_enum_string = mock.Mock(side_effect=["Create", "SymmetricKey"])
|
||||||
|
e.get_relevant_policy_section = mock.Mock(
|
||||||
|
return_value={
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY: {
|
||||||
|
enums.Operation.GET: enums.Policy.ALLOW_OWNER
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
result = e.is_allowed(
|
||||||
|
'test_policy',
|
||||||
|
'test_user',
|
||||||
|
'test_group',
|
||||||
|
'test_user',
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY,
|
||||||
|
enums.Operation.CREATE
|
||||||
|
)
|
||||||
|
|
||||||
|
e._logger.warning.assert_called_once_with(
|
||||||
|
"The 'test_policy' policy does not apply to Create operations on "
|
||||||
|
"SymmetricKey objects."
|
||||||
|
)
|
||||||
|
self.assertFalse(result)
|
||||||
|
|
||||||
|
def test_is_allowed_allow_all(self):
|
||||||
|
"""
|
||||||
|
Test that an access check resulting in an "Allow All" policy is
|
||||||
|
processed correctly.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e.get_relevant_policy_section = mock.Mock(
|
||||||
|
return_value={
|
||||||
enums.ObjectType.SYMMETRIC_KEY: {
|
enums.ObjectType.SYMMETRIC_KEY: {
|
||||||
enums.Operation.GET: enums.Policy.ALLOW_ALL
|
enums.Operation.GET: enums.Policy.ALLOW_ALL
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
)
|
||||||
|
|
||||||
is_allowed = e._is_allowed_by_operation_policy(
|
result = e.is_allowed(
|
||||||
'test',
|
'test_policy',
|
||||||
'test',
|
'test_user',
|
||||||
'test',
|
'test_group',
|
||||||
|
'test_user',
|
||||||
enums.ObjectType.SYMMETRIC_KEY,
|
enums.ObjectType.SYMMETRIC_KEY,
|
||||||
enums.Operation.GET
|
enums.Operation.GET
|
||||||
)
|
)
|
||||||
|
self.assertTrue(result)
|
||||||
|
|
||||||
self.assertTrue(is_allowed)
|
def test_is_allowed_allow_owner(self):
|
||||||
|
|
||||||
is_allowed = e._is_allowed_by_operation_policy(
|
|
||||||
'test',
|
|
||||||
'random',
|
|
||||||
'test',
|
|
||||||
enums.ObjectType.SYMMETRIC_KEY,
|
|
||||||
enums.Operation.GET
|
|
||||||
)
|
|
||||||
|
|
||||||
self.assertTrue(is_allowed)
|
|
||||||
|
|
||||||
def test_is_allowed_by_operation_block_all(self):
|
|
||||||
"""
|
"""
|
||||||
Test that a blocked operation is blocked by the operation policy.
|
Test that an access check resulting in an "Allow Owner" policy is
|
||||||
|
processed correctly.
|
||||||
"""
|
"""
|
||||||
e = engine.KmipEngine()
|
e = engine.KmipEngine()
|
||||||
e._operation_policies = {
|
e.get_relevant_policy_section = mock.Mock(
|
||||||
'test': {
|
return_value={
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY: {
|
||||||
|
enums.Operation.GET: enums.Policy.ALLOW_OWNER
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
result = e.is_allowed(
|
||||||
|
'test_policy',
|
||||||
|
'test_user',
|
||||||
|
'test_group',
|
||||||
|
'test_user',
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY,
|
||||||
|
enums.Operation.GET
|
||||||
|
)
|
||||||
|
self.assertTrue(result)
|
||||||
|
|
||||||
|
def test_is_allowed_allow_owner_not_owner(self):
|
||||||
|
"""
|
||||||
|
Test that an access check resulting in an "Allow Owner" policy is
|
||||||
|
processed correctly when the user requesting access is not the owner.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e.get_relevant_policy_section = mock.Mock(
|
||||||
|
return_value={
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY: {
|
||||||
|
enums.Operation.GET: enums.Policy.ALLOW_OWNER
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
result = e.is_allowed(
|
||||||
|
'test_policy',
|
||||||
|
'test_user_A',
|
||||||
|
'test_group',
|
||||||
|
'test_user_B',
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY,
|
||||||
|
enums.Operation.GET
|
||||||
|
)
|
||||||
|
self.assertFalse(result)
|
||||||
|
|
||||||
|
def test_is_allowed_disallow_all(self):
|
||||||
|
"""
|
||||||
|
Test that an access check resulting in an "Disallow All" policy is
|
||||||
|
processed correctly.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e.get_relevant_policy_section = mock.Mock(
|
||||||
|
return_value={
|
||||||
enums.ObjectType.SYMMETRIC_KEY: {
|
enums.ObjectType.SYMMETRIC_KEY: {
|
||||||
enums.Operation.GET: enums.Policy.DISALLOW_ALL
|
enums.Operation.GET: enums.Policy.DISALLOW_ALL
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
)
|
||||||
|
|
||||||
is_allowed = e._is_allowed_by_operation_policy(
|
result = e.is_allowed(
|
||||||
'test',
|
'test_policy',
|
||||||
'test',
|
'test_user',
|
||||||
'test',
|
'test_group',
|
||||||
|
'test_user',
|
||||||
enums.ObjectType.SYMMETRIC_KEY,
|
enums.ObjectType.SYMMETRIC_KEY,
|
||||||
enums.Operation.GET
|
enums.Operation.GET
|
||||||
)
|
)
|
||||||
|
self.assertFalse(result)
|
||||||
|
|
||||||
self.assertFalse(is_allowed)
|
def test_is_allowed_invalid_permission(self):
|
||||||
|
|
||||||
is_allowed = e._is_allowed_by_operation_policy(
|
|
||||||
'test',
|
|
||||||
'random',
|
|
||||||
'test',
|
|
||||||
enums.ObjectType.SYMMETRIC_KEY,
|
|
||||||
enums.Operation.GET
|
|
||||||
)
|
|
||||||
|
|
||||||
self.assertFalse(is_allowed)
|
|
||||||
|
|
||||||
def test_is_allowed_by_operation_safety_check(self):
|
|
||||||
"""
|
"""
|
||||||
Test that an unknown operation is blocked by the operation policy.
|
Test that an access check resulting in an invalid policy option is
|
||||||
|
processed correctly.
|
||||||
"""
|
"""
|
||||||
e = engine.KmipEngine()
|
e = engine.KmipEngine()
|
||||||
e._operation_policies = {
|
e.get_relevant_policy_section = mock.Mock(
|
||||||
'test': {
|
return_value={
|
||||||
enums.ObjectType.SYMMETRIC_KEY: {
|
enums.ObjectType.SYMMETRIC_KEY: {
|
||||||
enums.Operation.GET: 'unknown value'
|
enums.Operation.GET: 'invalid'
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
)
|
||||||
|
|
||||||
is_allowed = e._is_allowed_by_operation_policy(
|
result = e.is_allowed(
|
||||||
'test',
|
'test_policy',
|
||||||
'test',
|
'test_user',
|
||||||
'test',
|
'test_group',
|
||||||
|
'test_user',
|
||||||
enums.ObjectType.SYMMETRIC_KEY,
|
enums.ObjectType.SYMMETRIC_KEY,
|
||||||
enums.Operation.GET
|
enums.Operation.GET
|
||||||
)
|
)
|
||||||
|
self.assertFalse(result)
|
||||||
self.assertFalse(is_allowed)
|
|
||||||
|
|
||||||
is_allowed = e._is_allowed_by_operation_policy(
|
|
||||||
'test',
|
|
||||||
'random',
|
|
||||||
'test',
|
|
||||||
enums.ObjectType.SYMMETRIC_KEY,
|
|
||||||
enums.Operation.GET
|
|
||||||
)
|
|
||||||
|
|
||||||
self.assertFalse(is_allowed)
|
|
||||||
|
|
||||||
def test_is_allowed_by_operation_policy_nonexistent_policy(self):
|
|
||||||
"""
|
|
||||||
Test that a check with a non-existent policy yields a logging warning
|
|
||||||
and a blocked operation.
|
|
||||||
"""
|
|
||||||
e = engine.KmipEngine()
|
|
||||||
e._logger = mock.MagicMock()
|
|
||||||
|
|
||||||
policy = 'nonexistent-policy'
|
|
||||||
is_allowed = e._is_allowed_by_operation_policy(
|
|
||||||
policy,
|
|
||||||
'test',
|
|
||||||
'test',
|
|
||||||
enums.ObjectType.SYMMETRIC_KEY,
|
|
||||||
enums.Operation.GET
|
|
||||||
)
|
|
||||||
|
|
||||||
self.assertFalse(is_allowed)
|
|
||||||
e._logger.warning.assert_called_once_with(
|
|
||||||
"The '{0}' policy does not exist.".format(policy)
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_is_allowed_by_operation_policy_not_object_applicable(self):
|
|
||||||
"""
|
|
||||||
Test that a check for an object with a non-applicable policy yields
|
|
||||||
a logging warning and a blocked operation.
|
|
||||||
"""
|
|
||||||
e = engine.KmipEngine()
|
|
||||||
e._logger = mock.MagicMock()
|
|
||||||
e._operation_policies = {
|
|
||||||
'test': {
|
|
||||||
enums.ObjectType.SYMMETRIC_KEY: {
|
|
||||||
enums.Operation.GET: enums.Policy.ALLOW_OWNER
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
policy = 'test'
|
|
||||||
object_type = enums.ObjectType.PRIVATE_KEY
|
|
||||||
is_allowed = e._is_allowed_by_operation_policy(
|
|
||||||
policy,
|
|
||||||
'test',
|
|
||||||
'test',
|
|
||||||
object_type,
|
|
||||||
enums.Operation.GET
|
|
||||||
)
|
|
||||||
|
|
||||||
self.assertFalse(is_allowed)
|
|
||||||
e._logger.warning.assert_called_once_with(
|
|
||||||
"The '{0}' policy does not apply to {1} objects.".format(
|
|
||||||
policy,
|
|
||||||
e._get_enum_string(object_type)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_is_allowed_by_operation_policy_not_applicable(self):
|
|
||||||
"""
|
|
||||||
Test that a check with a non-applicable policy yields a logging
|
|
||||||
warning and a blocked operation.
|
|
||||||
"""
|
|
||||||
e = engine.KmipEngine()
|
|
||||||
e._logger = mock.MagicMock()
|
|
||||||
e._operation_policies = {
|
|
||||||
'test': {
|
|
||||||
enums.ObjectType.SYMMETRIC_KEY: {
|
|
||||||
enums.Operation.GET: enums.Policy.ALLOW_OWNER
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
policy = 'test'
|
|
||||||
object_type = enums.ObjectType.SYMMETRIC_KEY
|
|
||||||
operation = enums.Operation.CREATE
|
|
||||||
is_allowed = e._is_allowed_by_operation_policy(
|
|
||||||
policy,
|
|
||||||
'test',
|
|
||||||
'test',
|
|
||||||
object_type,
|
|
||||||
operation
|
|
||||||
)
|
|
||||||
|
|
||||||
self.assertFalse(is_allowed)
|
|
||||||
e._logger.warning.assert_called_once_with(
|
|
||||||
"The '{0}' policy does not apply to {1} operations on {2} "
|
|
||||||
"objects.".format(
|
|
||||||
policy,
|
|
||||||
e._get_enum_string(operation),
|
|
||||||
e._get_enum_string(object_type)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_get_object_with_access_controls(self):
|
def test_get_object_with_access_controls(self):
|
||||||
"""
|
"""
|
||||||
|
Loading…
x
Reference in New Issue
Block a user