diff --git a/kmip/pie/client.py b/kmip/pie/client.py index 75d725e..ed841fe 100644 --- a/kmip/pie/client.py +++ b/kmip/pie/client.py @@ -138,7 +138,8 @@ class ProxyKmipClient(api.KmipClient): self.logger.exception("could not close client connection", e) raise e - def create(self, algorithm, length, operation_policy_name=None, name=None): + def create(self, algorithm, length, operation_policy_name=None, name=None, + cryptographic_usage_mask=None): """ Create a symmetric key on a KMIP appliance. @@ -149,6 +150,9 @@ class ProxyKmipClient(api.KmipClient): operation_policy_name (string): The name of the operation policy to use for the new symmetric key. Optional, defaults to None name (string): The name to give the key. Optional, defaults to None + cryptographic_usage_mask (list): list of enumerations of crypto + usage mask passing to the symmetric key. Optional, defaults to + None Returns: string: The uid of the newly created symmetric key. @@ -164,6 +168,13 @@ class ProxyKmipClient(api.KmipClient): "algorithm must be a CryptographicAlgorithm enumeration") elif not isinstance(length, six.integer_types) or length <= 0: raise TypeError("length must be a positive integer") + if cryptographic_usage_mask is not None: + if not isinstance(cryptographic_usage_mask, list) or \ + all(isinstance(item, enums.CryptographicUsageMask) + for item in cryptographic_usage_mask) is False: + raise TypeError( + "cryptographic_usage_mask must be a list of " + "CryptographicUsageMask enumerations") # Verify that operations can be given at this time if not self._is_open: @@ -173,7 +184,8 @@ class ProxyKmipClient(api.KmipClient): common_attributes = self._build_common_attributes( operation_policy_name ) - key_attributes = self._build_key_attributes(algorithm, length) + key_attributes = self._build_key_attributes( + algorithm, length, cryptographic_usage_mask) key_attributes.extend(common_attributes) if name: @@ -627,7 +639,7 @@ class ProxyKmipClient(api.KmipClient): message = result.result_message.value raise exceptions.KmipOperationFailure(status, reason, message) - def _build_key_attributes(self, algorithm, length): + def _build_key_attributes(self, algorithm, length, masks=None): # Build a list of core key attributes. algorithm_attribute = self.attribute_factory.create_attribute( enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, @@ -635,10 +647,16 @@ class ProxyKmipClient(api.KmipClient): length_attribute = self.attribute_factory.create_attribute( enums.AttributeType.CRYPTOGRAPHIC_LENGTH, length) + # Default crypto usage mask value + mask_value = [enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT] + if masks: + mask_value.extend(masks) + # remove duplicates + mask_value = list(set(mask_value)) mask_attribute = self.attribute_factory.create_attribute( enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, - [enums.CryptographicUsageMask.ENCRYPT, - enums.CryptographicUsageMask.DECRYPT]) + mask_value) return [algorithm_attribute, length_attribute, mask_attribute] diff --git a/kmip/tests/unit/pie/test_client.py b/kmip/tests/unit/pie/test_client.py index ee56906..5a96f0a 100644 --- a/kmip/tests/unit/pie/test_client.py +++ b/kmip/tests/unit/pie/test_client.py @@ -310,6 +310,55 @@ class TestProxyKmipClient(testtools.TestCase): client.proxy.create.assert_called_with( enums.ObjectType.SYMMETRIC_KEY, template) + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_create_with_cryptographic_usage_mask(self): + """ + Test that a symmetric key can be created with proper inputs, + specifically testing that the cryptographic usage mask is correctly + sent with the request. + """ + # Create the template to test the create call + algorithm = enums.CryptographicAlgorithm.AES + length = 256 + algorithm_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, algorithm) + length_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, length) + masks = [enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT] + masks_given = [enums.CryptographicUsageMask.MAC_GENERATE, + enums.CryptographicUsageMask.MAC_VERIFY] + masks.extend(masks_given) + mask_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + masks) + + key_attributes = [ + algorithm_attribute, + length_attribute, + mask_attribute, + ] + + template = obj.TemplateAttribute(attributes=key_attributes) + + key_id = 'aaaaaaaa-1111-2222-3333-ffffffffffff' + status = enums.ResultStatus.SUCCESS + result = results.CreateResult( + contents.ResultStatus(status), + uuid=attr.UniqueIdentifier(key_id)) + + with ProxyKmipClient() as client: + client.proxy.create.return_value = result + + client.create( + algorithm, + length, + cryptographic_usage_mask=masks_given + ) + client.proxy.create.assert_called_with( + enums.ObjectType.SYMMETRIC_KEY, template) + @mock.patch('kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)) def test_create_on_invalid_algorithm(self): @@ -332,6 +381,23 @@ class TestProxyKmipClient(testtools.TestCase): with ProxyKmipClient() as client: self.assertRaises(TypeError, client.create, *args) + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_create_on_invalid_cryptographic_usage_mask(self): + """ + Test that a TypeError exception is raised when trying to create a + symmetric key with invalid cryptographic_usage_mask. + """ + args = [enums.CryptographicAlgorithm.AES, 256] + kwargs = {'cryptographic_usage_mask': + enums.CryptographicUsageMask.ENCRYPT} + with ProxyKmipClient() as client: + self.assertRaises(TypeError, client.create, *args, **kwargs) + kwargs = {'cryptographic_usage_mask': + [enums.CryptographicUsageMask.ENCRYPT, 1]} + with ProxyKmipClient() as client: + self.assertRaises(TypeError, client.create, *args, **kwargs) + @mock.patch('kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)) def test_create_on_closed(self):