diff --git a/kmip/pie/client.py b/kmip/pie/client.py index 9c4a144..691e148 100644 --- a/kmip/pie/client.py +++ b/kmip/pie/client.py @@ -135,7 +135,7 @@ class ProxyKmipClient(api.KmipClient): self.logger.exception("could not close client connection", e) raise e - def create(self, algorithm, length): + def create(self, algorithm, length, operation_policy_name=None): """ Create a symmetric key on a KMIP appliance. @@ -143,6 +143,8 @@ class ProxyKmipClient(api.KmipClient): algorithm (CryptographicAlgorithm): An enumeration defining the algorithm to use to generate the symmetric key. length (int): The length in bits for the symmetric key. + operation_policy_name (string): The name of the operation policy + to use for the new symmetric key. Optional, defaults to None. Returns: string: The uid of the newly created symmetric key. @@ -164,8 +166,12 @@ class ProxyKmipClient(api.KmipClient): raise exceptions.ClientConnectionNotOpen() # Create the template containing the attributes - attributes = self._build_key_attributes(algorithm, length) - template = cobjects.TemplateAttribute(attributes=attributes) + common_attributes = self._build_common_attributes( + operation_policy_name + ) + key_attributes = self._build_key_attributes(algorithm, length) + key_attributes.extend(common_attributes) + template = cobjects.TemplateAttribute(attributes=key_attributes) # Create the symmetric key and handle the results result = self.proxy.create(enums.ObjectType.SYMMETRIC_KEY, template) @@ -179,7 +185,7 @@ class ProxyKmipClient(api.KmipClient): message = result.result_message.value raise exceptions.KmipOperationFailure(status, reason, message) - def create_key_pair(self, algorithm, length): + def create_key_pair(self, algorithm, length, operation_policy_name=None): """ Create an asymmetric key pair on a KMIP appliance. @@ -187,6 +193,8 @@ class ProxyKmipClient(api.KmipClient): algorithm (CryptographicAlgorithm): An enumeration defining the algorithm to use to generate the key pair. length (int): The length in bits for the key pair. + operation_policy_name (string): The name of the operation policy + to use for the new key pair. Optional, defaults to None. Returns: string: The uid of the newly created public key. @@ -209,8 +217,12 @@ class ProxyKmipClient(api.KmipClient): raise exceptions.ClientConnectionNotOpen() # Create the template containing the attributes - attributes = self._build_key_attributes(algorithm, length) - template = cobjects.CommonTemplateAttribute(attributes=attributes) + common_attributes = self._build_common_attributes( + operation_policy_name + ) + key_attributes = self._build_key_attributes(algorithm, length) + key_attributes.extend(common_attributes) + template = cobjects.CommonTemplateAttribute(attributes=key_attributes) # Create the asymmetric key pair and handle the results result = self.proxy.create_key_pair(common_template_attribute=template) @@ -250,15 +262,22 @@ class ProxyKmipClient(api.KmipClient): raise exceptions.ClientConnectionNotOpen() # Extract and create attributes - attributes = list() + object_attributes = list() + if hasattr(managed_object, 'cryptographic_usage_masks'): mask_attribute = self.attribute_factory.create_attribute( enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, - managed_object.cryptographic_usage_masks) + managed_object.cryptographic_usage_masks + ) + object_attributes.append(mask_attribute) + if hasattr(managed_object, 'operation_policy_name'): + opn_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.OPERATION_POLICY_NAME, + managed_object.operation_policy_name + ) + object_attributes.append(opn_attribute) - attributes.append(mask_attribute) - - template = cobjects.TemplateAttribute(attributes=attributes) + template = cobjects.TemplateAttribute(attributes=object_attributes) object_type = managed_object.object_type # Register the managed object and handle the results @@ -391,6 +410,20 @@ class ProxyKmipClient(api.KmipClient): return [algorithm_attribute, length_attribute, mask_attribute] + def _build_common_attributes(self, operation_policy_name=None): + # Build a list of common attributes. + common_attributes = [] + + if operation_policy_name: + common_attributes.append( + self.attribute_factory.create_attribute( + enums.AttributeType.OPERATION_POLICY_NAME, + operation_policy_name + ) + ) + + return common_attributes + def __enter__(self): self.open() return self diff --git a/kmip/pie/objects.py b/kmip/pie/objects.py index 82c59b0..5fca262 100644 --- a/kmip/pie/objects.py +++ b/kmip/pie/objects.py @@ -52,6 +52,11 @@ class ManagedObject(sql.Base): _names = relationship('ManagedObjectName', back_populates='mo', cascade='all, delete-orphan') names = association_proxy('_names', 'name') + operation_policy_name = Column( + 'operation_policy_name', + String(50), + default='default' + ) __mapper_args__ = { 'polymorphic_identity': 'ManagedObject', @@ -71,6 +76,7 @@ class ManagedObject(sql.Base): self.unique_identifier = None self.name_index = 0 self.names = list() + self.operation_policy_name = None self._object_type = None # All remaining attributes are not considered part of the public API @@ -78,7 +84,6 @@ class ManagedObject(sql.Base): self._application_specific_informations = list() self._contact_information = None self._object_groups = list() - self._operation_policy_name = None # The following attributes are placeholders for attributes that are # unsupported by kmip.core diff --git a/kmip/tests/unit/pie/test_client.py b/kmip/tests/unit/pie/test_client.py index 169cb9e..a802820 100644 --- a/kmip/tests/unit/pie/test_client.py +++ b/kmip/tests/unit/pie/test_client.py @@ -210,6 +210,55 @@ class TestProxyKmipClient(testtools.TestCase): self.assertIsInstance(uid, six.string_types) self.assertEqual(uid, key_id) + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_create_with_operation_policy_name(self): + """ + Test that a symmetric key can be created with proper inputs, + specifically testing that the operation policy name is correctly + sent with the request. + """ + # Create the template to test the create call + algorithm = enums.CryptographicAlgorithm.AES + length = 256 + algorithm_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, algorithm) + length_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, length) + mask_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT]) + opn_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.OPERATION_POLICY_NAME, + 'test' + ) + + key_attributes = [ + algorithm_attribute, + length_attribute, + mask_attribute, + opn_attribute + ] + template = obj.TemplateAttribute(attributes=key_attributes) + + key_id = 'aaaaaaaa-1111-2222-3333-ffffffffffff' + status = enums.ResultStatus.SUCCESS + result = results.CreateResult( + contents.ResultStatus(status), + uuid=attr.UniqueIdentifier(key_id)) + + with ProxyKmipClient() as client: + client.proxy.create.return_value = result + + client.create( + algorithm, + length, + operation_policy_name='test' + ) + client.proxy.create.assert_called_with( + enums.ObjectType.SYMMETRIC_KEY, template) + @mock.patch('kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)) def test_create_on_invalid_algorithm(self): @@ -311,6 +360,58 @@ class TestProxyKmipClient(testtools.TestCase): self.assertIsInstance(public_uid, six.string_types) self.assertIsInstance(private_uid, six.string_types) + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_create_key_pair_with_operation_policy_name(self): + """ + Test that an asymmetric key pair can be created with proper inputs, + specifically testing that the operation policy name is correctly + sent with the request. + """ + # Create the template to test the create key pair call + algorithm = enums.CryptographicAlgorithm.RSA + length = 2048 + algorithm_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, algorithm) + length_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, length) + mask_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT]) + opn_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.OPERATION_POLICY_NAME, + 'test' + ) + + pair_attributes = [ + algorithm_attribute, + length_attribute, + mask_attribute, + opn_attribute + ] + template = obj.CommonTemplateAttribute(attributes=pair_attributes) + + status = enums.ResultStatus.SUCCESS + result = results.CreateKeyPairResult( + contents.ResultStatus(status), + public_key_uuid=attr.PublicKeyUniqueIdentifier( + 'aaaaaaaa-1111-2222-3333-ffffffffffff'), + private_key_uuid=attr.PrivateKeyUniqueIdentifier( + 'ffffffff-3333-2222-1111-aaaaaaaaaaaa')) + + with ProxyKmipClient() as client: + client.proxy.create_key_pair.return_value = result + + public_uid, private_uid = client.create_key_pair( + enums.CryptographicAlgorithm.RSA, + 2048, + operation_policy_name='test' + ) + + kwargs = {'common_template_attribute': template} + client.proxy.create_key_pair.assert_called_with(**kwargs) + @mock.patch('kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)) def test_create_key_pair_on_invalid_algorithm(self): @@ -680,3 +781,26 @@ class TestProxyKmipClient(testtools.TestCase): self.assertRaisesRegexp( KmipOperationFailure, error_msg, client.register, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_build_common_attributes(self): + """ + Test that the right attribute objects are created. + """ + client = ProxyKmipClient() + client.open() + + operation_policy_name = 'test' + common_attributes = client._build_common_attributes( + operation_policy_name + ) + + self.assertEqual(1, len(common_attributes)) + + opn = common_attributes[0] + self.assertIsInstance(opn, obj.Attribute) + self.assertIsInstance(opn.attribute_name, obj.Attribute.AttributeName) + self.assertIsInstance(opn.attribute_value, attr.OperationPolicyName) + self.assertEqual(opn.attribute_name.value, 'Operation Policy Name') + self.assertEqual(opn.attribute_value.value, 'test')