Adding operation policy name support to the client

This change adds operation policy name support to the pie client,
allowing you to optionally specify the policy name when creating
new symmetric keys and asymmetric key pairs. The operation policy
name can also be set on any pie object and will be sent with the
corresponding register request for new objects. Tests for these
additions are included.
This commit is contained in:
Peter Hamilton 2016-10-05 15:30:35 -04:00
parent bcb5e7d948
commit 894a7ac97d
3 changed files with 174 additions and 12 deletions

View File

@ -135,7 +135,7 @@ class ProxyKmipClient(api.KmipClient):
self.logger.exception("could not close client connection", e)
raise e
def create(self, algorithm, length):
def create(self, algorithm, length, operation_policy_name=None):
"""
Create a symmetric key on a KMIP appliance.
@ -143,6 +143,8 @@ class ProxyKmipClient(api.KmipClient):
algorithm (CryptographicAlgorithm): An enumeration defining the
algorithm to use to generate the symmetric key.
length (int): The length in bits for the symmetric key.
operation_policy_name (string): The name of the operation policy
to use for the new symmetric key. Optional, defaults to None.
Returns:
string: The uid of the newly created symmetric key.
@ -164,8 +166,12 @@ class ProxyKmipClient(api.KmipClient):
raise exceptions.ClientConnectionNotOpen()
# Create the template containing the attributes
attributes = self._build_key_attributes(algorithm, length)
template = cobjects.TemplateAttribute(attributes=attributes)
common_attributes = self._build_common_attributes(
operation_policy_name
)
key_attributes = self._build_key_attributes(algorithm, length)
key_attributes.extend(common_attributes)
template = cobjects.TemplateAttribute(attributes=key_attributes)
# Create the symmetric key and handle the results
result = self.proxy.create(enums.ObjectType.SYMMETRIC_KEY, template)
@ -179,7 +185,7 @@ class ProxyKmipClient(api.KmipClient):
message = result.result_message.value
raise exceptions.KmipOperationFailure(status, reason, message)
def create_key_pair(self, algorithm, length):
def create_key_pair(self, algorithm, length, operation_policy_name=None):
"""
Create an asymmetric key pair on a KMIP appliance.
@ -187,6 +193,8 @@ class ProxyKmipClient(api.KmipClient):
algorithm (CryptographicAlgorithm): An enumeration defining the
algorithm to use to generate the key pair.
length (int): The length in bits for the key pair.
operation_policy_name (string): The name of the operation policy
to use for the new key pair. Optional, defaults to None.
Returns:
string: The uid of the newly created public key.
@ -209,8 +217,12 @@ class ProxyKmipClient(api.KmipClient):
raise exceptions.ClientConnectionNotOpen()
# Create the template containing the attributes
attributes = self._build_key_attributes(algorithm, length)
template = cobjects.CommonTemplateAttribute(attributes=attributes)
common_attributes = self._build_common_attributes(
operation_policy_name
)
key_attributes = self._build_key_attributes(algorithm, length)
key_attributes.extend(common_attributes)
template = cobjects.CommonTemplateAttribute(attributes=key_attributes)
# Create the asymmetric key pair and handle the results
result = self.proxy.create_key_pair(common_template_attribute=template)
@ -250,15 +262,22 @@ class ProxyKmipClient(api.KmipClient):
raise exceptions.ClientConnectionNotOpen()
# Extract and create attributes
attributes = list()
object_attributes = list()
if hasattr(managed_object, 'cryptographic_usage_masks'):
mask_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
managed_object.cryptographic_usage_masks)
managed_object.cryptographic_usage_masks
)
object_attributes.append(mask_attribute)
if hasattr(managed_object, 'operation_policy_name'):
opn_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.OPERATION_POLICY_NAME,
managed_object.operation_policy_name
)
object_attributes.append(opn_attribute)
attributes.append(mask_attribute)
template = cobjects.TemplateAttribute(attributes=attributes)
template = cobjects.TemplateAttribute(attributes=object_attributes)
object_type = managed_object.object_type
# Register the managed object and handle the results
@ -391,6 +410,20 @@ class ProxyKmipClient(api.KmipClient):
return [algorithm_attribute, length_attribute, mask_attribute]
def _build_common_attributes(self, operation_policy_name=None):
# Build a list of common attributes.
common_attributes = []
if operation_policy_name:
common_attributes.append(
self.attribute_factory.create_attribute(
enums.AttributeType.OPERATION_POLICY_NAME,
operation_policy_name
)
)
return common_attributes
def __enter__(self):
self.open()
return self

View File

@ -52,6 +52,11 @@ class ManagedObject(sql.Base):
_names = relationship('ManagedObjectName', back_populates='mo',
cascade='all, delete-orphan')
names = association_proxy('_names', 'name')
operation_policy_name = Column(
'operation_policy_name',
String(50),
default='default'
)
__mapper_args__ = {
'polymorphic_identity': 'ManagedObject',
@ -71,6 +76,7 @@ class ManagedObject(sql.Base):
self.unique_identifier = None
self.name_index = 0
self.names = list()
self.operation_policy_name = None
self._object_type = None
# All remaining attributes are not considered part of the public API
@ -78,7 +84,6 @@ class ManagedObject(sql.Base):
self._application_specific_informations = list()
self._contact_information = None
self._object_groups = list()
self._operation_policy_name = None
# The following attributes are placeholders for attributes that are
# unsupported by kmip.core

View File

@ -210,6 +210,55 @@ class TestProxyKmipClient(testtools.TestCase):
self.assertIsInstance(uid, six.string_types)
self.assertEqual(uid, key_id)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_create_with_operation_policy_name(self):
"""
Test that a symmetric key can be created with proper inputs,
specifically testing that the operation policy name is correctly
sent with the request.
"""
# Create the template to test the create call
algorithm = enums.CryptographicAlgorithm.AES
length = 256
algorithm_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, algorithm)
length_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH, length)
mask_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
[enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT])
opn_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.OPERATION_POLICY_NAME,
'test'
)
key_attributes = [
algorithm_attribute,
length_attribute,
mask_attribute,
opn_attribute
]
template = obj.TemplateAttribute(attributes=key_attributes)
key_id = 'aaaaaaaa-1111-2222-3333-ffffffffffff'
status = enums.ResultStatus.SUCCESS
result = results.CreateResult(
contents.ResultStatus(status),
uuid=attr.UniqueIdentifier(key_id))
with ProxyKmipClient() as client:
client.proxy.create.return_value = result
client.create(
algorithm,
length,
operation_policy_name='test'
)
client.proxy.create.assert_called_with(
enums.ObjectType.SYMMETRIC_KEY, template)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_create_on_invalid_algorithm(self):
@ -311,6 +360,58 @@ class TestProxyKmipClient(testtools.TestCase):
self.assertIsInstance(public_uid, six.string_types)
self.assertIsInstance(private_uid, six.string_types)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_create_key_pair_with_operation_policy_name(self):
"""
Test that an asymmetric key pair can be created with proper inputs,
specifically testing that the operation policy name is correctly
sent with the request.
"""
# Create the template to test the create key pair call
algorithm = enums.CryptographicAlgorithm.RSA
length = 2048
algorithm_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, algorithm)
length_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH, length)
mask_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
[enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT])
opn_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.OPERATION_POLICY_NAME,
'test'
)
pair_attributes = [
algorithm_attribute,
length_attribute,
mask_attribute,
opn_attribute
]
template = obj.CommonTemplateAttribute(attributes=pair_attributes)
status = enums.ResultStatus.SUCCESS
result = results.CreateKeyPairResult(
contents.ResultStatus(status),
public_key_uuid=attr.PublicKeyUniqueIdentifier(
'aaaaaaaa-1111-2222-3333-ffffffffffff'),
private_key_uuid=attr.PrivateKeyUniqueIdentifier(
'ffffffff-3333-2222-1111-aaaaaaaaaaaa'))
with ProxyKmipClient() as client:
client.proxy.create_key_pair.return_value = result
public_uid, private_uid = client.create_key_pair(
enums.CryptographicAlgorithm.RSA,
2048,
operation_policy_name='test'
)
kwargs = {'common_template_attribute': template}
client.proxy.create_key_pair.assert_called_with(**kwargs)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_create_key_pair_on_invalid_algorithm(self):
@ -680,3 +781,26 @@ class TestProxyKmipClient(testtools.TestCase):
self.assertRaisesRegexp(
KmipOperationFailure, error_msg, client.register, *args)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_build_common_attributes(self):
"""
Test that the right attribute objects are created.
"""
client = ProxyKmipClient()
client.open()
operation_policy_name = 'test'
common_attributes = client._build_common_attributes(
operation_policy_name
)
self.assertEqual(1, len(common_attributes))
opn = common_attributes[0]
self.assertIsInstance(opn, obj.Attribute)
self.assertIsInstance(opn.attribute_name, obj.Attribute.AttributeName)
self.assertIsInstance(opn.attribute_value, attr.OperationPolicyName)
self.assertEqual(opn.attribute_name.value, 'Operation Policy Name')
self.assertEqual(opn.attribute_value.value, 'test')