mirror of
https://github.com/OpenKMIP/PyKMIP.git
synced 2025-06-27 17:24:24 +02:00
Merge pull request #195 from OpenKMIP/feat/add-opp-policy-name-to-client
Adding operation policy name support to the client
This commit is contained in:
commit
843c75d830
@ -135,7 +135,7 @@ class ProxyKmipClient(api.KmipClient):
|
|||||||
self.logger.exception("could not close client connection", e)
|
self.logger.exception("could not close client connection", e)
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
def create(self, algorithm, length):
|
def create(self, algorithm, length, operation_policy_name=None):
|
||||||
"""
|
"""
|
||||||
Create a symmetric key on a KMIP appliance.
|
Create a symmetric key on a KMIP appliance.
|
||||||
|
|
||||||
@ -143,6 +143,8 @@ class ProxyKmipClient(api.KmipClient):
|
|||||||
algorithm (CryptographicAlgorithm): An enumeration defining the
|
algorithm (CryptographicAlgorithm): An enumeration defining the
|
||||||
algorithm to use to generate the symmetric key.
|
algorithm to use to generate the symmetric key.
|
||||||
length (int): The length in bits for the symmetric key.
|
length (int): The length in bits for the symmetric key.
|
||||||
|
operation_policy_name (string): The name of the operation policy
|
||||||
|
to use for the new symmetric key. Optional, defaults to None.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
string: The uid of the newly created symmetric key.
|
string: The uid of the newly created symmetric key.
|
||||||
@ -164,8 +166,12 @@ class ProxyKmipClient(api.KmipClient):
|
|||||||
raise exceptions.ClientConnectionNotOpen()
|
raise exceptions.ClientConnectionNotOpen()
|
||||||
|
|
||||||
# Create the template containing the attributes
|
# Create the template containing the attributes
|
||||||
attributes = self._build_key_attributes(algorithm, length)
|
common_attributes = self._build_common_attributes(
|
||||||
template = cobjects.TemplateAttribute(attributes=attributes)
|
operation_policy_name
|
||||||
|
)
|
||||||
|
key_attributes = self._build_key_attributes(algorithm, length)
|
||||||
|
key_attributes.extend(common_attributes)
|
||||||
|
template = cobjects.TemplateAttribute(attributes=key_attributes)
|
||||||
|
|
||||||
# Create the symmetric key and handle the results
|
# Create the symmetric key and handle the results
|
||||||
result = self.proxy.create(enums.ObjectType.SYMMETRIC_KEY, template)
|
result = self.proxy.create(enums.ObjectType.SYMMETRIC_KEY, template)
|
||||||
@ -179,7 +185,7 @@ class ProxyKmipClient(api.KmipClient):
|
|||||||
message = result.result_message.value
|
message = result.result_message.value
|
||||||
raise exceptions.KmipOperationFailure(status, reason, message)
|
raise exceptions.KmipOperationFailure(status, reason, message)
|
||||||
|
|
||||||
def create_key_pair(self, algorithm, length):
|
def create_key_pair(self, algorithm, length, operation_policy_name=None):
|
||||||
"""
|
"""
|
||||||
Create an asymmetric key pair on a KMIP appliance.
|
Create an asymmetric key pair on a KMIP appliance.
|
||||||
|
|
||||||
@ -187,6 +193,8 @@ class ProxyKmipClient(api.KmipClient):
|
|||||||
algorithm (CryptographicAlgorithm): An enumeration defining the
|
algorithm (CryptographicAlgorithm): An enumeration defining the
|
||||||
algorithm to use to generate the key pair.
|
algorithm to use to generate the key pair.
|
||||||
length (int): The length in bits for the key pair.
|
length (int): The length in bits for the key pair.
|
||||||
|
operation_policy_name (string): The name of the operation policy
|
||||||
|
to use for the new key pair. Optional, defaults to None.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
string: The uid of the newly created public key.
|
string: The uid of the newly created public key.
|
||||||
@ -209,8 +217,12 @@ class ProxyKmipClient(api.KmipClient):
|
|||||||
raise exceptions.ClientConnectionNotOpen()
|
raise exceptions.ClientConnectionNotOpen()
|
||||||
|
|
||||||
# Create the template containing the attributes
|
# Create the template containing the attributes
|
||||||
attributes = self._build_key_attributes(algorithm, length)
|
common_attributes = self._build_common_attributes(
|
||||||
template = cobjects.CommonTemplateAttribute(attributes=attributes)
|
operation_policy_name
|
||||||
|
)
|
||||||
|
key_attributes = self._build_key_attributes(algorithm, length)
|
||||||
|
key_attributes.extend(common_attributes)
|
||||||
|
template = cobjects.CommonTemplateAttribute(attributes=key_attributes)
|
||||||
|
|
||||||
# Create the asymmetric key pair and handle the results
|
# Create the asymmetric key pair and handle the results
|
||||||
result = self.proxy.create_key_pair(common_template_attribute=template)
|
result = self.proxy.create_key_pair(common_template_attribute=template)
|
||||||
@ -250,15 +262,22 @@ class ProxyKmipClient(api.KmipClient):
|
|||||||
raise exceptions.ClientConnectionNotOpen()
|
raise exceptions.ClientConnectionNotOpen()
|
||||||
|
|
||||||
# Extract and create attributes
|
# Extract and create attributes
|
||||||
attributes = list()
|
object_attributes = list()
|
||||||
|
|
||||||
if hasattr(managed_object, 'cryptographic_usage_masks'):
|
if hasattr(managed_object, 'cryptographic_usage_masks'):
|
||||||
mask_attribute = self.attribute_factory.create_attribute(
|
mask_attribute = self.attribute_factory.create_attribute(
|
||||||
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||||
managed_object.cryptographic_usage_masks)
|
managed_object.cryptographic_usage_masks
|
||||||
|
)
|
||||||
|
object_attributes.append(mask_attribute)
|
||||||
|
if hasattr(managed_object, 'operation_policy_name'):
|
||||||
|
opn_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.OPERATION_POLICY_NAME,
|
||||||
|
managed_object.operation_policy_name
|
||||||
|
)
|
||||||
|
object_attributes.append(opn_attribute)
|
||||||
|
|
||||||
attributes.append(mask_attribute)
|
template = cobjects.TemplateAttribute(attributes=object_attributes)
|
||||||
|
|
||||||
template = cobjects.TemplateAttribute(attributes=attributes)
|
|
||||||
object_type = managed_object.object_type
|
object_type = managed_object.object_type
|
||||||
|
|
||||||
# Register the managed object and handle the results
|
# Register the managed object and handle the results
|
||||||
@ -391,6 +410,20 @@ class ProxyKmipClient(api.KmipClient):
|
|||||||
|
|
||||||
return [algorithm_attribute, length_attribute, mask_attribute]
|
return [algorithm_attribute, length_attribute, mask_attribute]
|
||||||
|
|
||||||
|
def _build_common_attributes(self, operation_policy_name=None):
|
||||||
|
# Build a list of common attributes.
|
||||||
|
common_attributes = []
|
||||||
|
|
||||||
|
if operation_policy_name:
|
||||||
|
common_attributes.append(
|
||||||
|
self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.OPERATION_POLICY_NAME,
|
||||||
|
operation_policy_name
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return common_attributes
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
self.open()
|
self.open()
|
||||||
return self
|
return self
|
||||||
|
@ -52,6 +52,11 @@ class ManagedObject(sql.Base):
|
|||||||
_names = relationship('ManagedObjectName', back_populates='mo',
|
_names = relationship('ManagedObjectName', back_populates='mo',
|
||||||
cascade='all, delete-orphan')
|
cascade='all, delete-orphan')
|
||||||
names = association_proxy('_names', 'name')
|
names = association_proxy('_names', 'name')
|
||||||
|
operation_policy_name = Column(
|
||||||
|
'operation_policy_name',
|
||||||
|
String(50),
|
||||||
|
default='default'
|
||||||
|
)
|
||||||
|
|
||||||
__mapper_args__ = {
|
__mapper_args__ = {
|
||||||
'polymorphic_identity': 'ManagedObject',
|
'polymorphic_identity': 'ManagedObject',
|
||||||
@ -71,6 +76,7 @@ class ManagedObject(sql.Base):
|
|||||||
self.unique_identifier = None
|
self.unique_identifier = None
|
||||||
self.name_index = 0
|
self.name_index = 0
|
||||||
self.names = list()
|
self.names = list()
|
||||||
|
self.operation_policy_name = None
|
||||||
self._object_type = None
|
self._object_type = None
|
||||||
|
|
||||||
# All remaining attributes are not considered part of the public API
|
# All remaining attributes are not considered part of the public API
|
||||||
@ -78,7 +84,6 @@ class ManagedObject(sql.Base):
|
|||||||
self._application_specific_informations = list()
|
self._application_specific_informations = list()
|
||||||
self._contact_information = None
|
self._contact_information = None
|
||||||
self._object_groups = list()
|
self._object_groups = list()
|
||||||
self._operation_policy_name = None
|
|
||||||
|
|
||||||
# The following attributes are placeholders for attributes that are
|
# The following attributes are placeholders for attributes that are
|
||||||
# unsupported by kmip.core
|
# unsupported by kmip.core
|
||||||
|
@ -210,6 +210,55 @@ class TestProxyKmipClient(testtools.TestCase):
|
|||||||
self.assertIsInstance(uid, six.string_types)
|
self.assertIsInstance(uid, six.string_types)
|
||||||
self.assertEqual(uid, key_id)
|
self.assertEqual(uid, key_id)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_create_with_operation_policy_name(self):
|
||||||
|
"""
|
||||||
|
Test that a symmetric key can be created with proper inputs,
|
||||||
|
specifically testing that the operation policy name is correctly
|
||||||
|
sent with the request.
|
||||||
|
"""
|
||||||
|
# Create the template to test the create call
|
||||||
|
algorithm = enums.CryptographicAlgorithm.AES
|
||||||
|
length = 256
|
||||||
|
algorithm_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, algorithm)
|
||||||
|
length_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_LENGTH, length)
|
||||||
|
mask_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||||
|
[enums.CryptographicUsageMask.ENCRYPT,
|
||||||
|
enums.CryptographicUsageMask.DECRYPT])
|
||||||
|
opn_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.OPERATION_POLICY_NAME,
|
||||||
|
'test'
|
||||||
|
)
|
||||||
|
|
||||||
|
key_attributes = [
|
||||||
|
algorithm_attribute,
|
||||||
|
length_attribute,
|
||||||
|
mask_attribute,
|
||||||
|
opn_attribute
|
||||||
|
]
|
||||||
|
template = obj.TemplateAttribute(attributes=key_attributes)
|
||||||
|
|
||||||
|
key_id = 'aaaaaaaa-1111-2222-3333-ffffffffffff'
|
||||||
|
status = enums.ResultStatus.SUCCESS
|
||||||
|
result = results.CreateResult(
|
||||||
|
contents.ResultStatus(status),
|
||||||
|
uuid=attr.UniqueIdentifier(key_id))
|
||||||
|
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
client.proxy.create.return_value = result
|
||||||
|
|
||||||
|
client.create(
|
||||||
|
algorithm,
|
||||||
|
length,
|
||||||
|
operation_policy_name='test'
|
||||||
|
)
|
||||||
|
client.proxy.create.assert_called_with(
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY, template)
|
||||||
|
|
||||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
mock.MagicMock(spec_set=KMIPProxy))
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
def test_create_on_invalid_algorithm(self):
|
def test_create_on_invalid_algorithm(self):
|
||||||
@ -311,6 +360,58 @@ class TestProxyKmipClient(testtools.TestCase):
|
|||||||
self.assertIsInstance(public_uid, six.string_types)
|
self.assertIsInstance(public_uid, six.string_types)
|
||||||
self.assertIsInstance(private_uid, six.string_types)
|
self.assertIsInstance(private_uid, six.string_types)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_create_key_pair_with_operation_policy_name(self):
|
||||||
|
"""
|
||||||
|
Test that an asymmetric key pair can be created with proper inputs,
|
||||||
|
specifically testing that the operation policy name is correctly
|
||||||
|
sent with the request.
|
||||||
|
"""
|
||||||
|
# Create the template to test the create key pair call
|
||||||
|
algorithm = enums.CryptographicAlgorithm.RSA
|
||||||
|
length = 2048
|
||||||
|
algorithm_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, algorithm)
|
||||||
|
length_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_LENGTH, length)
|
||||||
|
mask_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||||
|
[enums.CryptographicUsageMask.ENCRYPT,
|
||||||
|
enums.CryptographicUsageMask.DECRYPT])
|
||||||
|
opn_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.OPERATION_POLICY_NAME,
|
||||||
|
'test'
|
||||||
|
)
|
||||||
|
|
||||||
|
pair_attributes = [
|
||||||
|
algorithm_attribute,
|
||||||
|
length_attribute,
|
||||||
|
mask_attribute,
|
||||||
|
opn_attribute
|
||||||
|
]
|
||||||
|
template = obj.CommonTemplateAttribute(attributes=pair_attributes)
|
||||||
|
|
||||||
|
status = enums.ResultStatus.SUCCESS
|
||||||
|
result = results.CreateKeyPairResult(
|
||||||
|
contents.ResultStatus(status),
|
||||||
|
public_key_uuid=attr.PublicKeyUniqueIdentifier(
|
||||||
|
'aaaaaaaa-1111-2222-3333-ffffffffffff'),
|
||||||
|
private_key_uuid=attr.PrivateKeyUniqueIdentifier(
|
||||||
|
'ffffffff-3333-2222-1111-aaaaaaaaaaaa'))
|
||||||
|
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
client.proxy.create_key_pair.return_value = result
|
||||||
|
|
||||||
|
public_uid, private_uid = client.create_key_pair(
|
||||||
|
enums.CryptographicAlgorithm.RSA,
|
||||||
|
2048,
|
||||||
|
operation_policy_name='test'
|
||||||
|
)
|
||||||
|
|
||||||
|
kwargs = {'common_template_attribute': template}
|
||||||
|
client.proxy.create_key_pair.assert_called_with(**kwargs)
|
||||||
|
|
||||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
mock.MagicMock(spec_set=KMIPProxy))
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
def test_create_key_pair_on_invalid_algorithm(self):
|
def test_create_key_pair_on_invalid_algorithm(self):
|
||||||
@ -680,3 +781,26 @@ class TestProxyKmipClient(testtools.TestCase):
|
|||||||
|
|
||||||
self.assertRaisesRegexp(
|
self.assertRaisesRegexp(
|
||||||
KmipOperationFailure, error_msg, client.register, *args)
|
KmipOperationFailure, error_msg, client.register, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_build_common_attributes(self):
|
||||||
|
"""
|
||||||
|
Test that the right attribute objects are created.
|
||||||
|
"""
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
client.open()
|
||||||
|
|
||||||
|
operation_policy_name = 'test'
|
||||||
|
common_attributes = client._build_common_attributes(
|
||||||
|
operation_policy_name
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(1, len(common_attributes))
|
||||||
|
|
||||||
|
opn = common_attributes[0]
|
||||||
|
self.assertIsInstance(opn, obj.Attribute)
|
||||||
|
self.assertIsInstance(opn.attribute_name, obj.Attribute.AttributeName)
|
||||||
|
self.assertIsInstance(opn.attribute_value, attr.OperationPolicyName)
|
||||||
|
self.assertEqual(opn.attribute_name.value, 'Operation Policy Name')
|
||||||
|
self.assertEqual(opn.attribute_value.value, 'test')
|
||||||
|
Loading…
x
Reference in New Issue
Block a user