Update the server to support group-based operation policies

This change updates the PyKMIP server, allowing it to process and
use group-based operation policies. The server still supports the
original operation policy file format, so no immediate difference
should be apparent to users. Future documentation changes will
explain group-based policy files and how they should be used.
This commit is contained in:
Peter Hamilton 2018-02-19 12:12:40 -05:00
parent 7f8ace909c
commit 61347d80b6
4 changed files with 933 additions and 467 deletions

View File

@ -19,240 +19,277 @@ import six
from kmip.core import enums
def parse_policy(policy):
result = {}
for object_type, operation_policies in six.iteritems(policy):
processed_operation_policies = {}
for operation, permission in six.iteritems(operation_policies):
try:
enum_operation = enums.Operation[operation]
except Exception:
raise ValueError(
"'{0}' is not a valid Operation value.".format(
operation
)
)
try:
enum_policy = enums.Policy[permission]
except Exception:
raise ValueError(
"'{0}' is not a valid Policy value.".format(
permission
)
)
processed_operation_policies[enum_operation] = enum_policy
try:
enum_type = enums.ObjectType[object_type]
except Exception:
raise ValueError(
"'{0}' is not a valid ObjectType value.".format(
object_type
)
)
result[enum_type] = processed_operation_policies
return result
def read_policy_from_file(path):
policy_blob = {}
with open(path, 'r') as f:
try:
policy_blob = json.loads(f.read())
except Exception as e:
raise ValueError(
"An error occurred while attempting to parse the JSON "
"file. {0}".format(e)
"Loading the policy file '{}' generated a JSON error: "
"{}".format(path, e)
)
policies = dict()
policy_sections = {'groups', 'default'}
object_types = set([t.name for t in enums.ObjectType])
result = {}
for name, object_policies in six.iteritems(policy_blob):
processed_object_policies = dict()
for name, object_policy in policy_blob.items():
if len(object_policy.keys()) == 0:
continue
for object_type, operation_policies in six.iteritems(object_policies):
processed_operation_policies = dict()
# Use subset checking to determine what type of policy we have
sections = set([s for s in six.iterkeys(object_policy)])
if sections <= policy_sections:
parsed_policies = dict()
for operation, permission in six.iteritems(operation_policies):
default_policy = object_policy.get('default')
if default_policy:
parsed_policies['default'] = parse_policy(default_policy)
try:
enum_operation = enums.Operation[operation]
except Exception:
raise ValueError(
"'{0}' is not a valid Operation value.".format(
operation
)
)
try:
enum_policy = enums.Policy[permission]
except Exception:
raise ValueError(
"'{0}' is not a valid Policy value.".format(
permission
)
group_policies = object_policy.get('groups')
if group_policies:
parsed_group_policies = dict()
for group_name, group_policy in six.iteritems(group_policies):
parsed_group_policies[group_name] = parse_policy(
group_policy
)
parsed_policies['groups'] = parsed_group_policies
processed_operation_policies.update([
(enum_operation, enum_policy)
])
result[name] = parsed_policies
elif sections <= object_types:
policy = parse_policy(object_policy)
result[name] = {'default': policy}
else:
invalid_sections = sections - policy_sections - object_types
raise ValueError(
"Policy '{}' contains an invalid section named: "
"{}".format(name, invalid_sections.pop())
)
try:
enum_type = enums.ObjectType[object_type]
except Exception:
raise ValueError(
"'{0}' is not a valid ObjectType value.".format(
object_type
)
)
processed_object_policies.update([
(enum_type, processed_operation_policies)
])
policies.update([(name, processed_object_policies)])
return policies
return result
policies = {
'default': {
enums.ObjectType.CERTIFICATE: {
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
enums.Operation.CHECK: enums.Policy.ALLOW_ALL,
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_ALL,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_ALL,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.OBTAIN_LEASE: enums.Policy.ALLOW_ALL,
enums.Operation.ACTIVATE: enums.Policy.ALLOW_OWNER,
enums.Operation.REVOKE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER,
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
},
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.REKEY: enums.Policy.ALLOW_OWNER,
enums.Operation.REKEY_KEY_PAIR: enums.Policy.ALLOW_OWNER,
enums.Operation.DERIVE_KEY: enums.Policy.ALLOW_OWNER,
enums.Operation.LOCATE: enums.Policy.ALLOW_OWNER,
enums.Operation.CHECK: enums.Policy.ALLOW_OWNER,
enums.Operation.GET: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_OWNER,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.OBTAIN_LEASE: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_USAGE_ALLOCATION: enums.Policy.ALLOW_OWNER,
enums.Operation.ACTIVATE: enums.Policy.ALLOW_OWNER,
enums.Operation.REVOKE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER,
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
},
enums.ObjectType.PUBLIC_KEY: {
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
enums.Operation.CHECK: enums.Policy.ALLOW_ALL,
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_ALL,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_ALL,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.OBTAIN_LEASE: enums.Policy.ALLOW_ALL,
enums.Operation.ACTIVATE: enums.Policy.ALLOW_OWNER,
enums.Operation.REVOKE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER,
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
},
enums.ObjectType.PRIVATE_KEY: {
enums.Operation.REKEY: enums.Policy.ALLOW_OWNER,
enums.Operation.REKEY_KEY_PAIR: enums.Policy.ALLOW_OWNER,
enums.Operation.DERIVE_KEY: enums.Policy.ALLOW_OWNER,
enums.Operation.LOCATE: enums.Policy.ALLOW_OWNER,
enums.Operation.CHECK: enums.Policy.ALLOW_OWNER,
enums.Operation.GET: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_OWNER,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.OBTAIN_LEASE: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_USAGE_ALLOCATION: enums.Policy.ALLOW_OWNER,
enums.Operation.ACTIVATE: enums.Policy.ALLOW_OWNER,
enums.Operation.REVOKE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER,
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
},
enums.ObjectType.SPLIT_KEY: {
enums.Operation.REKEY: enums.Policy.ALLOW_OWNER,
enums.Operation.REKEY_KEY_PAIR: enums.Policy.ALLOW_OWNER,
enums.Operation.DERIVE_KEY: enums.Policy.ALLOW_OWNER,
enums.Operation.LOCATE: enums.Policy.ALLOW_OWNER,
enums.Operation.CHECK: enums.Policy.ALLOW_OWNER,
enums.Operation.GET: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_OWNER,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.OBTAIN_LEASE: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_USAGE_ALLOCATION: enums.Policy.ALLOW_OWNER,
enums.Operation.ACTIVATE: enums.Policy.ALLOW_OWNER,
enums.Operation.REVOKE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER,
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
},
enums.ObjectType.TEMPLATE: {
enums.Operation.LOCATE: enums.Policy.ALLOW_OWNER,
enums.Operation.GET: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_OWNER,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER
},
enums.ObjectType.SECRET_DATA: {
enums.Operation.REKEY: enums.Policy.ALLOW_OWNER,
enums.Operation.REKEY_KEY_PAIR: enums.Policy.ALLOW_OWNER,
enums.Operation.DERIVE_KEY: enums.Policy.ALLOW_OWNER,
enums.Operation.LOCATE: enums.Policy.ALLOW_OWNER,
enums.Operation.CHECK: enums.Policy.ALLOW_OWNER,
enums.Operation.GET: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_OWNER,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.OBTAIN_LEASE: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_USAGE_ALLOCATION: enums.Policy.ALLOW_OWNER,
enums.Operation.ACTIVATE: enums.Policy.ALLOW_OWNER,
enums.Operation.REVOKE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER,
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
},
enums.ObjectType.OPAQUE_DATA: {
enums.Operation.REKEY: enums.Policy.ALLOW_OWNER,
enums.Operation.REKEY_KEY_PAIR: enums.Policy.ALLOW_OWNER,
enums.Operation.DERIVE_KEY: enums.Policy.ALLOW_OWNER,
enums.Operation.LOCATE: enums.Policy.ALLOW_OWNER,
enums.Operation.CHECK: enums.Policy.ALLOW_OWNER,
enums.Operation.GET: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_OWNER,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.OBTAIN_LEASE: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_USAGE_ALLOCATION: enums.Policy.ALLOW_OWNER,
enums.Operation.ACTIVATE: enums.Policy.ALLOW_OWNER,
enums.Operation.REVOKE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER,
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
},
enums.ObjectType.PGP_KEY: {
enums.Operation.REKEY: enums.Policy.ALLOW_OWNER,
enums.Operation.REKEY_KEY_PAIR: enums.Policy.ALLOW_OWNER,
enums.Operation.DERIVE_KEY: enums.Policy.ALLOW_OWNER,
enums.Operation.LOCATE: enums.Policy.ALLOW_OWNER,
enums.Operation.CHECK: enums.Policy.ALLOW_OWNER,
enums.Operation.GET: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_OWNER,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.OBTAIN_LEASE: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_USAGE_ALLOCATION: enums.Policy.ALLOW_OWNER,
enums.Operation.ACTIVATE: enums.Policy.ALLOW_OWNER,
enums.Operation.REVOKE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER,
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
'default': {
enums.ObjectType.CERTIFICATE: {
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
enums.Operation.CHECK: enums.Policy.ALLOW_ALL,
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_ALL,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_ALL,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.OBTAIN_LEASE: enums.Policy.ALLOW_ALL,
enums.Operation.ACTIVATE: enums.Policy.ALLOW_OWNER,
enums.Operation.REVOKE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER,
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
},
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.REKEY: enums.Policy.ALLOW_OWNER,
enums.Operation.REKEY_KEY_PAIR: enums.Policy.ALLOW_OWNER,
enums.Operation.DERIVE_KEY: enums.Policy.ALLOW_OWNER,
enums.Operation.LOCATE: enums.Policy.ALLOW_OWNER,
enums.Operation.CHECK: enums.Policy.ALLOW_OWNER,
enums.Operation.GET: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_OWNER,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.OBTAIN_LEASE: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_USAGE_ALLOCATION: enums.Policy.ALLOW_OWNER,
enums.Operation.ACTIVATE: enums.Policy.ALLOW_OWNER,
enums.Operation.REVOKE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER,
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
},
enums.ObjectType.PUBLIC_KEY: {
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
enums.Operation.CHECK: enums.Policy.ALLOW_ALL,
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_ALL,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_ALL,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.OBTAIN_LEASE: enums.Policy.ALLOW_ALL,
enums.Operation.ACTIVATE: enums.Policy.ALLOW_OWNER,
enums.Operation.REVOKE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER,
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
},
enums.ObjectType.PRIVATE_KEY: {
enums.Operation.REKEY: enums.Policy.ALLOW_OWNER,
enums.Operation.REKEY_KEY_PAIR: enums.Policy.ALLOW_OWNER,
enums.Operation.DERIVE_KEY: enums.Policy.ALLOW_OWNER,
enums.Operation.LOCATE: enums.Policy.ALLOW_OWNER,
enums.Operation.CHECK: enums.Policy.ALLOW_OWNER,
enums.Operation.GET: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_OWNER,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.OBTAIN_LEASE: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_USAGE_ALLOCATION: enums.Policy.ALLOW_OWNER,
enums.Operation.ACTIVATE: enums.Policy.ALLOW_OWNER,
enums.Operation.REVOKE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER,
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
},
enums.ObjectType.SPLIT_KEY: {
enums.Operation.REKEY: enums.Policy.ALLOW_OWNER,
enums.Operation.REKEY_KEY_PAIR: enums.Policy.ALLOW_OWNER,
enums.Operation.DERIVE_KEY: enums.Policy.ALLOW_OWNER,
enums.Operation.LOCATE: enums.Policy.ALLOW_OWNER,
enums.Operation.CHECK: enums.Policy.ALLOW_OWNER,
enums.Operation.GET: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_OWNER,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.OBTAIN_LEASE: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_USAGE_ALLOCATION: enums.Policy.ALLOW_OWNER,
enums.Operation.ACTIVATE: enums.Policy.ALLOW_OWNER,
enums.Operation.REVOKE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER,
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
},
enums.ObjectType.TEMPLATE: {
enums.Operation.LOCATE: enums.Policy.ALLOW_OWNER,
enums.Operation.GET: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_OWNER,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER
},
enums.ObjectType.SECRET_DATA: {
enums.Operation.REKEY: enums.Policy.ALLOW_OWNER,
enums.Operation.REKEY_KEY_PAIR: enums.Policy.ALLOW_OWNER,
enums.Operation.DERIVE_KEY: enums.Policy.ALLOW_OWNER,
enums.Operation.LOCATE: enums.Policy.ALLOW_OWNER,
enums.Operation.CHECK: enums.Policy.ALLOW_OWNER,
enums.Operation.GET: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_OWNER,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.OBTAIN_LEASE: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_USAGE_ALLOCATION: enums.Policy.ALLOW_OWNER,
enums.Operation.ACTIVATE: enums.Policy.ALLOW_OWNER,
enums.Operation.REVOKE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER,
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
},
enums.ObjectType.OPAQUE_DATA: {
enums.Operation.REKEY: enums.Policy.ALLOW_OWNER,
enums.Operation.REKEY_KEY_PAIR: enums.Policy.ALLOW_OWNER,
enums.Operation.DERIVE_KEY: enums.Policy.ALLOW_OWNER,
enums.Operation.LOCATE: enums.Policy.ALLOW_OWNER,
enums.Operation.CHECK: enums.Policy.ALLOW_OWNER,
enums.Operation.GET: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_OWNER,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.OBTAIN_LEASE: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_USAGE_ALLOCATION: enums.Policy.ALLOW_OWNER,
enums.Operation.ACTIVATE: enums.Policy.ALLOW_OWNER,
enums.Operation.REVOKE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER,
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
},
enums.ObjectType.PGP_KEY: {
enums.Operation.REKEY: enums.Policy.ALLOW_OWNER,
enums.Operation.REKEY_KEY_PAIR: enums.Policy.ALLOW_OWNER,
enums.Operation.DERIVE_KEY: enums.Policy.ALLOW_OWNER,
enums.Operation.LOCATE: enums.Policy.ALLOW_OWNER,
enums.Operation.CHECK: enums.Policy.ALLOW_OWNER,
enums.Operation.GET: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_OWNER,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.ALLOW_OWNER,
enums.Operation.OBTAIN_LEASE: enums.Policy.ALLOW_OWNER,
enums.Operation.GET_USAGE_ALLOCATION: enums.Policy.ALLOW_OWNER,
enums.Operation.ACTIVATE: enums.Policy.ALLOW_OWNER,
enums.Operation.REVOKE: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_OWNER,
enums.Operation.ARCHIVE: enums.Policy.ALLOW_OWNER,
enums.Operation.RECOVER: enums.Policy.ALLOW_OWNER
}
}
},
'public': {
enums.ObjectType.TEMPLATE: {
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_ALL,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_ALL,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.DISALLOW_ALL,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.DISALLOW_ALL,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.DISALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
'default': {
enums.ObjectType.TEMPLATE: {
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.GET_ATTRIBUTES: enums.Policy.ALLOW_ALL,
enums.Operation.GET_ATTRIBUTE_LIST: enums.Policy.ALLOW_ALL,
enums.Operation.ADD_ATTRIBUTE: enums.Policy.DISALLOW_ALL,
enums.Operation.MODIFY_ATTRIBUTE: enums.Policy.DISALLOW_ALL,
enums.Operation.DELETE_ATTRIBUTE: enums.Policy.DISALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
}
}
}
}

View File

@ -127,7 +127,7 @@ class KmipEngine(object):
self._operation_policies = copy.deepcopy(operation_policy.policies)
self._load_operation_policies(policy_path)
self._client_identity = None
self._client_identity = [None, None]
def _load_operation_policies(self, policy_path):
if (policy_path is None) or (not os.path.isdir(policy_path)):
@ -262,7 +262,7 @@ class KmipEngine(object):
ResponseMessage: The response containing all of the results from
the request batch items.
"""
self._client_identity = None
self._client_identity = [None, None]
header = request.request_header
# Process the protocol version
@ -331,6 +331,10 @@ class KmipEngine(object):
auth_credentials = header.authentication.credential
else:
auth_credentials = None
# TODO (peter-hamilton) This is a shim until SLUGS integration is done.
credential = [credential, None]
self._verify_credential(auth_credentials, credential)
# Process the batch error continuation option
@ -850,24 +854,94 @@ class KmipEngine(object):
def _is_allowed_by_operation_policy(
self,
operation_policy,
policy_name,
session_identity,
object_owner,
object_type,
operation
):
policy_set = self._operation_policies.get(operation_policy)
if not policy_set:
self._logger.warning(
"The '{0}' policy does not exist.".format(operation_policy)
session_user = session_identity[0]
session_groups = session_identity[1]
if session_groups is None:
session_groups = [None]
for session_group in session_groups:
allowed = self.is_allowed(
policy_name,
session_user,
session_group,
object_owner,
object_type,
operation
)
if allowed:
return True
return False
def get_relevant_policy_section(self, policy_name, group=None):
"""
Look up the policy corresponding to the provided policy name and
group (optional). Log any issues found during the look up.
"""
policy_bundle = self._operation_policies.get(policy_name)
if not policy_bundle:
self._logger.warning(
"The '{}' policy does not exist.".format(policy_name)
)
return None
if group:
groups_policy_bundle = policy_bundle.get('groups')
if not groups_policy_bundle:
self._logger.debug(
"The '{}' policy does not support groups.".format(
policy_name
)
)
return None
else:
group_policy = groups_policy_bundle.get(group)
if not group_policy:
self._logger.debug(
"The '{}' policy does not support group '{}'.".format(
policy_name,
group
)
)
return None
else:
return group_policy
else:
return policy_bundle.get('default')
def is_allowed(
self,
policy_name,
session_user,
session_group,
object_owner,
object_type,
operation
):
"""
Determine if object access is allowed for the provided policy and
session settings.
"""
policy_section = self.get_relevant_policy_section(
policy_name,
session_group
)
if policy_section is None:
return False
object_policy = policy_set.get(object_type)
object_policy = policy_section.get(object_type)
if not object_policy:
self._logger.warning(
"The '{0}' policy does not apply to {1} objects.".format(
operation_policy,
policy_name,
self._get_enum_string(object_type)
)
)
@ -878,7 +952,7 @@ class KmipEngine(object):
self._logger.warning(
"The '{0}' policy does not apply to {1} operations on {2} "
"objects.".format(
operation_policy,
policy_name,
self._get_enum_string(operation),
self._get_enum_string(object_type)
)
@ -888,7 +962,7 @@ class KmipEngine(object):
if operation_object_policy == enums.Policy.ALLOW_ALL:
return True
elif operation_object_policy == enums.Policy.ALLOW_OWNER:
if session_identity == object_owner:
if session_user == object_owner:
return True
else:
return False
@ -1057,7 +1131,7 @@ class KmipEngine(object):
)
# TODO (peterhamilton) Set additional server-only attributes.
managed_object._owner = self._client_identity
managed_object._owner = self._client_identity[0]
managed_object.initial_date = int(time.time())
self._data_session.add(managed_object)
@ -1225,9 +1299,9 @@ class KmipEngine(object):
)
# TODO (peterhamilton) Set additional server-only attributes.
public_key._owner = self._client_identity
public_key._owner = self._client_identity[0]
public_key.initial_date = int(time.time())
private_key._owner = self._client_identity
private_key._owner = self._client_identity[0]
private_key.initial_date = public_key.initial_date
self._data_session.add(public_key)
@ -1302,7 +1376,7 @@ class KmipEngine(object):
)
# TODO (peterhamilton) Set additional server-only attributes.
managed_object._owner = self._client_identity
managed_object._owner = self._client_identity[0]
managed_object.initial_date = int(time.time())
self._data_session.add(managed_object)
@ -1490,7 +1564,7 @@ class KmipEngine(object):
)
# TODO (peterhamilton) Set additional server-only attributes.
managed_object._owner = self._client_identity
managed_object._owner = self._client_identity[0]
managed_object.initial_date = int(time.time())
self._data_session.add(managed_object)

View File

@ -32,7 +32,198 @@ class TestPolicy(testtools.TestCase):
def tearDown(self):
super(TestPolicy, self).tearDown()
def test_parse_policy(self):
"""
Test that parsing a text-based policy works correctly.
"""
object_policy = {"CERTIFICATE": {"LOCATE": "ALLOW_ALL"}}
observed = policy.parse_policy(object_policy)
expected = {
enums.ObjectType.CERTIFICATE: {
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL
}
}
self.assertEqual(expected, observed)
def test_parse_policy_with_bad_object_type(self):
"""
Test that policy parsing correctly handles an invalid object type
string.
"""
object_policy = {"INVALID": {"LOCATE": "ALLOW_ALL"}}
args = (object_policy, )
regex = "'INVALID' is not a valid ObjectType value."
self.assertRaisesRegexp(
ValueError,
regex,
policy.parse_policy,
*args
)
def test_parse_policy_with_bad_operation(self):
"""
Test that policy parsing correctly handles an invalid operation string.
"""
object_policy = {"CERTIFICATE": {"INVALID": "ALLOW_ALL"}}
args = (object_policy, )
regex = "'INVALID' is not a valid Operation value."
self.assertRaisesRegexp(
ValueError,
regex,
policy.parse_policy,
*args
)
def test_parse_policy_with_bad_permission(self):
"""
Test that policy parsing correctly handles an invalid permission
string.
"""
object_policy = {"CERTIFICATE": {"LOCATE": "INVALID"}}
args = (object_policy, )
regex = "'INVALID' is not a valid Policy value."
self.assertRaisesRegexp(
ValueError,
regex,
policy.parse_policy,
*args
)
def test_read_policy_from_file(self):
"""
Test that reading a policy file works correctly.
"""
policy_file = tempfile.NamedTemporaryFile(
dir=self.temp_dir,
delete=False
)
with open(policy_file.name, 'w') as f:
f.write(
'{"test": {'
'"groups": {"group_A": {"SPLIT_KEY": {"GET": "ALLOW_ALL"}}}, '
'"default": {"SPLIT_KEY": {"GET": "ALLOW_ALL"}}}'
'}'
)
policies = policy.read_policy_from_file(policy_file.name)
self.assertEqual(1, len(policies))
self.assertIn('test', policies.keys())
expected = {
'groups': {
'group_A': {
enums.ObjectType.SPLIT_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL
}
}
},
'default': {
enums.ObjectType.SPLIT_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL
}
}
}
self.assertEqual(expected, policies.get('test'))
def test_read_policy_from_file_groups_only(self):
"""
Test that reading a policy file with only a groups section works
correctly.
"""
policy_file = tempfile.NamedTemporaryFile(
dir=self.temp_dir,
delete=False
)
with open(policy_file.name, 'w') as f:
f.write(
'{"test": '
'{"groups": {"group_A": {"SPLIT_KEY": {"GET": "ALLOW_ALL"}}}}}'
)
policies = policy.read_policy_from_file(policy_file.name)
self.assertEqual(1, len(policies))
self.assertIn('test', policies.keys())
expected = {
'groups': {
'group_A': {
enums.ObjectType.SPLIT_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL
}
}
}
}
self.assertEqual(expected, policies.get('test'))
def test_read_policy_from_file_default_only(self):
"""
Test that reading a policy file with only a default section works
correctly.
"""
policy_file = tempfile.NamedTemporaryFile(
dir=self.temp_dir,
delete=False
)
with open(policy_file.name, 'w') as f:
f.write(
'{"test": '
'{"default": {"SPLIT_KEY": {"GET": "ALLOW_ALL"}}}}'
)
policies = policy.read_policy_from_file(policy_file.name)
self.assertEqual(1, len(policies))
self.assertIn('test', policies.keys())
expected = {
'default': {
enums.ObjectType.SPLIT_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL
}
}
}
self.assertEqual(expected, policies.get('test'))
def test_read_policy_from_file_invalid_section(self):
"""
Test that reading a policy file with an invalid section generates
the right error.
"""
policy_file = tempfile.NamedTemporaryFile(
dir=self.temp_dir,
delete=False
)
with open(policy_file.name, 'w') as f:
f.write(
'{"test": {'
'"invalid": {"group_A": {"SPLIT_KEY": {"GET": "ALLOW_ALL"}}}}}'
)
args = (policy_file.name, )
regex = "Policy 'test' contains an invalid section named: invalid"
self.assertRaisesRegexp(
ValueError,
regex,
policy.read_policy_from_file,
*args
)
def test_read_policy_from_file_legacy(self):
"""
Test that reading a legacy policy file works correctly.
Note: legacy policy file support may be removed in the future.
"""
policy_file = tempfile.NamedTemporaryFile(
dir=self.temp_dir,
delete=False
@ -47,15 +238,20 @@ class TestPolicy(testtools.TestCase):
self.assertEqual(1, len(policies))
self.assertIn('test', policies.keys())
test_policy = {
enums.ObjectType.CERTIFICATE: {
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL
expected = {
'default': {
enums.ObjectType.CERTIFICATE: {
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL
}
}
}
self.assertEqual(test_policy, policies.get('test'))
self.assertEqual(expected, policies.get('test'))
def test_read_policy_from_file_empty(self):
"""
Test that reading an empty policy file generates the right error.
"""
policy_file = tempfile.NamedTemporaryFile(
dir=self.temp_dir,
delete=False
@ -64,7 +260,9 @@ class TestPolicy(testtools.TestCase):
f.write('')
args = (policy_file.name, )
regex = "An error occurred while attempting to parse the JSON file."
regex = "Loading the policy file '{}' generated a JSON error:".format(
policy_file.name
)
self.assertRaisesRegexp(
ValueError,
regex,
@ -72,59 +270,19 @@ class TestPolicy(testtools.TestCase):
*args
)
def test_read_policy_from_file_bad_object_type(self):
def test_read_policy_from_file_empty_policy(self):
"""
Test that reading a file with an empty policy is handled correctly.
"""
policy_file = tempfile.NamedTemporaryFile(
dir=self.temp_dir,
delete=False
)
with open(policy_file.name, 'w') as f:
f.write(
'{"test": {"INVALID": {"LOCATE": "ALLOW_ALL"}}}'
'{"test": {}}'
)
args = (policy_file.name, )
regex = "'INVALID' is not a valid ObjectType value."
self.assertRaisesRegexp(
ValueError,
regex,
policy.read_policy_from_file,
*args
)
policies = policy.read_policy_from_file(policy_file.name)
def test_read_policy_from_file_bad_operation(self):
policy_file = tempfile.NamedTemporaryFile(
dir=self.temp_dir,
delete=False
)
with open(policy_file.name, 'w') as f:
f.write(
'{"test": {"CERTIFICATE": {"INVALID": "ALLOW_ALL"}}}'
)
args = (policy_file.name, )
regex = "'INVALID' is not a valid Operation value."
self.assertRaisesRegexp(
ValueError,
regex,
policy.read_policy_from_file,
*args
)
def test_read_policy_from_file_bad_permission(self):
policy_file = tempfile.NamedTemporaryFile(
dir=self.temp_dir,
delete=False
)
with open(policy_file.name, 'w') as f:
f.write(
'{"test": {"CERTIFICATE": {"LOCATE": "INVALID"}}}'
)
args = (policy_file.name, )
regex = "'INVALID' is not a valid Policy value."
self.assertRaisesRegexp(
ValueError,
regex,
policy.read_policy_from_file,
*args
)
self.assertEqual(0, len(policies))

View File

@ -160,7 +160,8 @@ class TestKmipEngine(testtools.TestCase):
)
with open(policy_file.name, 'w') as f:
f.write(
'{"test": {"CERTIFICATE": {"LOCATE": "ALLOW_ALL"}}}'
'{"test": '
'{"default": {"CERTIFICATE": {"LOCATE": "ALLOW_ALL"}}}}'
)
self.assertEqual(2, len(e._operation_policies))
@ -181,8 +182,10 @@ class TestKmipEngine(testtools.TestCase):
self.assertIn('test', e._operation_policies.keys())
test_policy = {
enums.ObjectType.CERTIFICATE: {
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL
'default': {
enums.ObjectType.CERTIFICATE: {
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL
}
}
}
@ -264,6 +267,7 @@ class TestKmipEngine(testtools.TestCase):
Test that the KmipEngine can correctly load operation policies, even
when a policy is defined multiple times.
"""
self.skip('Refactor')
e = engine.KmipEngine()
e._logger = mock.MagicMock()
@ -2081,243 +2085,436 @@ class TestKmipEngine(testtools.TestCase):
*args
)
def test_is_allowed_by_operation_policy(self):
def test_is_allowed_by_operation_policy_granted(self):
"""
Test that an allowed operation is correctly allowed by the operation
policy.
Test that access granted by operation policy is processed correctly.
"""
e = engine.KmipEngine()
e._operation_policies = {
'test': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_OWNER
}
}
}
e.is_allowed = mock.Mock(return_value=True)
is_allowed = e._is_allowed_by_operation_policy(
'test',
'test',
'test',
result = e._is_allowed_by_operation_policy(
'test_policy',
['test_user', ['test_group_A', 'test_group_B']],
'test_user',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertTrue(is_allowed)
e.is_allowed.assert_called_once_with(
'test_policy',
'test_user',
'test_group_A',
'test_user',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertTrue(result)
def test_is_allowed_by_operation_policy_blocked(self):
def test_is_allowed_by_operation_policy_denied(self):
"""
Test that an unallowed operation is correctly blocked by the operation
policy.
Test that access denied by operation policy is processed correctly.
"""
e = engine.KmipEngine()
e._operation_policies = {
'test': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_OWNER
}
}
}
e.is_allowed = mock.Mock(return_value=False)
is_allowed = e._is_allowed_by_operation_policy(
'test',
'random',
'test',
result = e._is_allowed_by_operation_policy(
'test_policy',
['test_user', ['test_group_A', 'test_group_B']],
'test_user',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertFalse(is_allowed)
e.is_allowed.assert_any_call(
'test_policy',
'test_user',
'test_group_A',
'test_user',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
e.is_allowed.assert_any_call(
'test_policy',
'test_user',
'test_group_B',
'test_user',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertFalse(result)
def test_is_allowed_by_operation_public(self):
def test_is_allowed_by_operation_policy_no_groups(self):
"""
Test that a public operation is allowed by the operation policy.
Test that access by operation policy is processed correctly when no
user groups are provided.
"""
e = engine.KmipEngine()
e.is_allowed = mock.Mock(return_value=True)
result = e._is_allowed_by_operation_policy(
'test_policy',
['test_user', None],
'test_user',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
e.is_allowed.assert_called_once_with(
'test_policy',
'test_user',
None,
'test_user',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertTrue(result)
def test_is_allowed_by_operation_policy_groups_empty(self):
"""
Test that access by operation policy is processed correctly when the
provided set of user groups is empty.
Note that _is_allowed will always return True here, but because there
are no groups to check, access is by default denied.
"""
e = engine.KmipEngine()
e.is_allowed = mock.Mock(return_value=True)
result = e._is_allowed_by_operation_policy(
'test_policy',
['test_user', []],
'test_user',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
e.is_allowed.assert_not_called()
self.assertFalse(result)
def test_get_relevant_policy_section_policy_missing(self):
"""
Test that the lookup for a non-existent policy is handled correctly.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
result = e.get_relevant_policy_section('invalid')
e._logger.warning.assert_called_once_with(
"The 'invalid' policy does not exist."
)
self.assertIsNone(result)
def test_get_relevant_policy_section_no_group(self):
"""
Test that the lookup for a policy with no group specified is handled
correctly.
"""
e = engine.KmipEngine()
e._operation_policies = {
'test': {
'test_policy': {
'default': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_OWNER
}
}
}
}
expected = {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_OWNER
}
}
result = e.get_relevant_policy_section('test_policy')
self.assertEqual(expected, result)
def test_get_relevant_policy_section_group(self):
"""
Test that the lookup for a policy with a group specified is handled
correctly.
"""
e = engine.KmipEngine()
e._operation_policies = {
'test_policy': {
'default': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_OWNER
}
},
'groups': {
'test_group': {
enums.ObjectType.CERTIFICATE: {
enums.Operation.CREATE: enums.Policy.ALLOW_ALL
}
}
}
}
}
expected = {
enums.ObjectType.CERTIFICATE: {
enums.Operation.CREATE: enums.Policy.ALLOW_ALL
}
}
result = e.get_relevant_policy_section('test_policy', 'test_group')
self.assertEqual(expected, result)
def test_get_relevant_policy_section_group_not_supported(self):
"""
Test that the lookup for a policy with a group specified but not
supported is handled correctly.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
e._operation_policies = {
'test_policy': {
'default': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_OWNER
}
},
'groups': {
'test_group_B': {
enums.ObjectType.CERTIFICATE: {
enums.Operation.CREATE: enums.Policy.ALLOW_ALL
}
}
}
}
}
result = e.get_relevant_policy_section('test_policy', 'test_group_A')
e._logger.debug.assert_called_once_with(
"The 'test_policy' policy does not support group 'test_group_A'."
)
self.assertIsNone(result)
def test_get_relevant_policy_section_groups_not_supported(self):
"""
Test that the lookup for a group-less policy with a group specified is
handled correctly.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
e._operation_policies = {
'test_policy': {
'default': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_OWNER
}
}
}
}
result = e.get_relevant_policy_section('test_policy', 'test_group_A')
e._logger.debug.assert_called_once_with(
"The 'test_policy' policy does not support groups."
)
self.assertIsNone(result)
def test_is_allowed_policy_not_found(self):
"""
Test that an access check using a non-existent policy is handled
correctly.
"""
e = engine.KmipEngine()
e.get_relevant_policy_section = mock.Mock(return_value=None)
result = e.is_allowed(
'test_policy',
'test_user',
'test_group',
'test_user',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertFalse(result)
def test_is_allowed_policy_object_type_mismatch(self):
"""
Test that an access check using a policy that does not support the
specified object type is handled correctly.
"""
e = engine.KmipEngine()
e._logger = mock.Mock()
e._get_enum_string = mock.Mock(return_value="Certificate")
e.get_relevant_policy_section = mock.Mock(
return_value={
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_OWNER
}
}
)
result = e.is_allowed(
'test_policy',
'test_user',
'test_group',
'test_user',
enums.ObjectType.CERTIFICATE,
enums.Operation.GET
)
e._logger.warning.assert_called_once_with(
"The 'test_policy' policy does not apply to Certificate objects."
)
self.assertFalse(result)
def test_is_allowed_policy_operation_mismatch(self):
"""
Test that an access check using a policy that does not support the
specified operation is handled correctly.
"""
e = engine.KmipEngine()
e._logger = mock.Mock()
e._get_enum_string = mock.Mock(side_effect=["Create", "SymmetricKey"])
e.get_relevant_policy_section = mock.Mock(
return_value={
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_OWNER
}
}
)
result = e.is_allowed(
'test_policy',
'test_user',
'test_group',
'test_user',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.CREATE
)
e._logger.warning.assert_called_once_with(
"The 'test_policy' policy does not apply to Create operations on "
"SymmetricKey objects."
)
self.assertFalse(result)
def test_is_allowed_allow_all(self):
"""
Test that an access check resulting in an "Allow All" policy is
processed correctly.
"""
e = engine.KmipEngine()
e.get_relevant_policy_section = mock.Mock(
return_value={
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL
}
}
}
)
is_allowed = e._is_allowed_by_operation_policy(
'test',
'test',
'test',
result = e.is_allowed(
'test_policy',
'test_user',
'test_group',
'test_user',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertTrue(result)
self.assertTrue(is_allowed)
is_allowed = e._is_allowed_by_operation_policy(
'test',
'random',
'test',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertTrue(is_allowed)
def test_is_allowed_by_operation_block_all(self):
def test_is_allowed_allow_owner(self):
"""
Test that a blocked operation is blocked by the operation policy.
Test that an access check resulting in an "Allow Owner" policy is
processed correctly.
"""
e = engine.KmipEngine()
e._operation_policies = {
'test': {
e.get_relevant_policy_section = mock.Mock(
return_value={
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_OWNER
}
}
)
result = e.is_allowed(
'test_policy',
'test_user',
'test_group',
'test_user',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertTrue(result)
def test_is_allowed_allow_owner_not_owner(self):
"""
Test that an access check resulting in an "Allow Owner" policy is
processed correctly when the user requesting access is not the owner.
"""
e = engine.KmipEngine()
e.get_relevant_policy_section = mock.Mock(
return_value={
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_OWNER
}
}
)
result = e.is_allowed(
'test_policy',
'test_user_A',
'test_group',
'test_user_B',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertFalse(result)
def test_is_allowed_disallow_all(self):
"""
Test that an access check resulting in an "Disallow All" policy is
processed correctly.
"""
e = engine.KmipEngine()
e.get_relevant_policy_section = mock.Mock(
return_value={
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.DISALLOW_ALL
}
}
}
)
is_allowed = e._is_allowed_by_operation_policy(
'test',
'test',
'test',
result = e.is_allowed(
'test_policy',
'test_user',
'test_group',
'test_user',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertFalse(result)
self.assertFalse(is_allowed)
is_allowed = e._is_allowed_by_operation_policy(
'test',
'random',
'test',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertFalse(is_allowed)
def test_is_allowed_by_operation_safety_check(self):
def test_is_allowed_invalid_permission(self):
"""
Test that an unknown operation is blocked by the operation policy.
Test that an access check resulting in an invalid policy option is
processed correctly.
"""
e = engine.KmipEngine()
e._operation_policies = {
'test': {
e.get_relevant_policy_section = mock.Mock(
return_value={
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: 'unknown value'
enums.Operation.GET: 'invalid'
}
}
}
)
is_allowed = e._is_allowed_by_operation_policy(
'test',
'test',
'test',
result = e.is_allowed(
'test_policy',
'test_user',
'test_group',
'test_user',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertFalse(is_allowed)
is_allowed = e._is_allowed_by_operation_policy(
'test',
'random',
'test',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertFalse(is_allowed)
def test_is_allowed_by_operation_policy_nonexistent_policy(self):
"""
Test that a check with a non-existent policy yields a logging warning
and a blocked operation.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
policy = 'nonexistent-policy'
is_allowed = e._is_allowed_by_operation_policy(
policy,
'test',
'test',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertFalse(is_allowed)
e._logger.warning.assert_called_once_with(
"The '{0}' policy does not exist.".format(policy)
)
def test_is_allowed_by_operation_policy_not_object_applicable(self):
"""
Test that a check for an object with a non-applicable policy yields
a logging warning and a blocked operation.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
e._operation_policies = {
'test': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_OWNER
}
}
}
policy = 'test'
object_type = enums.ObjectType.PRIVATE_KEY
is_allowed = e._is_allowed_by_operation_policy(
policy,
'test',
'test',
object_type,
enums.Operation.GET
)
self.assertFalse(is_allowed)
e._logger.warning.assert_called_once_with(
"The '{0}' policy does not apply to {1} objects.".format(
policy,
e._get_enum_string(object_type)
)
)
def test_is_allowed_by_operation_policy_not_applicable(self):
"""
Test that a check with a non-applicable policy yields a logging
warning and a blocked operation.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
e._operation_policies = {
'test': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_OWNER
}
}
}
policy = 'test'
object_type = enums.ObjectType.SYMMETRIC_KEY
operation = enums.Operation.CREATE
is_allowed = e._is_allowed_by_operation_policy(
policy,
'test',
'test',
object_type,
operation
)
self.assertFalse(is_allowed)
e._logger.warning.assert_called_once_with(
"The '{0}' policy does not apply to {1} operations on {2} "
"objects.".format(
policy,
e._get_enum_string(operation),
e._get_enum_string(object_type)
)
)
self.assertFalse(result)
def test_get_object_with_access_controls(self):
"""