Add optional 'name' argument to create and create_key_pair functions

The ProxyKmipClient now allows you to optionally provide a name
when performing a `Create` or a `Create Key Pair`. If not specified,
the name is excluded from the request.

 * For `create`, users specify `name`
 * For `create_key_pair`, users specify `private_name` and `public_name`

Resolves #208
This commit is contained in:
Jonathan Seawright 2016-12-01 15:45:04 -06:00
parent d1b01b75f9
commit 9ba479e4d3
6 changed files with 230 additions and 20 deletions

View File

@ -154,6 +154,9 @@ class Name(Struct):
@classmethod
def create(cls, name_value, name_type):
'''
Returns a Name object, populated with the given value and type
'''
if isinstance(name_value, Name.NameValue):
value = name_value
elif isinstance(name_value, str):
@ -200,6 +203,9 @@ class Name(Struct):
else:
return NotImplemented
def __ne__(self, other):
return not self.__eq__(other)
# 3.3
class ObjectType(Enumeration):

View File

@ -112,10 +112,17 @@ class AttributeValueFactory(object):
def _create_name(self, name):
if name is not None:
name_value = name.name_value
name_type = name.name_type
if isinstance(name, attributes.Name):
return attributes.Name.create(name.name_value, name.name_type)
return attributes.Name.create(name_value, name_type)
elif isinstance(name, str):
return attributes.Name.create(
name,
enums.NameType.UNINTERPRETED_TEXT_STRING
)
else:
raise ValueError('Unrecognized attribute type: '
'{0}'.format(name))
else:
return attributes.Name()

View File

@ -135,7 +135,7 @@ class ProxyKmipClient(api.KmipClient):
self.logger.exception("could not close client connection", e)
raise e
def create(self, algorithm, length, operation_policy_name=None):
def create(self, algorithm, length, operation_policy_name=None, name=None):
"""
Create a symmetric key on a KMIP appliance.
@ -144,7 +144,8 @@ class ProxyKmipClient(api.KmipClient):
algorithm to use to generate the symmetric key.
length (int): The length in bits for the symmetric key.
operation_policy_name (string): The name of the operation policy
to use for the new symmetric key. Optional, defaults to None.
to use for the new symmetric key. Optional, defaults to None
name (string): The name to give the key. Optional, defaults to None
Returns:
string: The uid of the newly created symmetric key.
@ -171,6 +172,10 @@ class ProxyKmipClient(api.KmipClient):
)
key_attributes = self._build_key_attributes(algorithm, length)
key_attributes.extend(common_attributes)
if name:
key_attributes.extend(self._build_name_attribute(name))
template = cobjects.TemplateAttribute(attributes=key_attributes)
# Create the symmetric key and handle the results
@ -185,7 +190,12 @@ class ProxyKmipClient(api.KmipClient):
message = result.result_message.value
raise exceptions.KmipOperationFailure(status, reason, message)
def create_key_pair(self, algorithm, length, operation_policy_name=None):
def create_key_pair(self,
algorithm,
length,
operation_policy_name=None,
public_name=None,
private_name=None):
"""
Create an asymmetric key pair on a KMIP appliance.
@ -195,6 +205,10 @@ class ProxyKmipClient(api.KmipClient):
length (int): The length in bits for the key pair.
operation_policy_name (string): The name of the operation policy
to use for the new key pair. Optional, defaults to None.
public_name (string): The name to give the public key.
Optional, defaults to None.
private_name (string): The name to give the public key.
Optional, defaults to None.
Returns:
string: The uid of the newly created public key.
@ -216,7 +230,7 @@ class ProxyKmipClient(api.KmipClient):
if not self._is_open:
raise exceptions.ClientConnectionNotOpen()
# Create the template containing the attributes
# Create the common attributes that are shared
common_attributes = self._build_common_attributes(
operation_policy_name
)
@ -224,8 +238,26 @@ class ProxyKmipClient(api.KmipClient):
key_attributes.extend(common_attributes)
template = cobjects.CommonTemplateAttribute(attributes=key_attributes)
# Create public / private specific attributes
public_template = None
if public_name:
name_attr = self._build_name_attribute(name=public_name)
public_template = cobjects.PublicKeyTemplateAttribute(
names=name_attr
)
private_template = None
if private_name:
name_attr = self._build_name_attribute(name=private_name)
private_template = cobjects.PrivateKeyTemplateAttribute(
names=name_attr
)
# Create the asymmetric key pair and handle the results
result = self.proxy.create_key_pair(common_template_attribute=template)
result = self.proxy.create_key_pair(
common_template_attribute=template,
private_key_template_attribute=private_template,
public_key_template_attribute=public_template)
status = result.result_status.value
if status == enums.ResultStatus.SUCCESS:
@ -411,7 +443,10 @@ class ProxyKmipClient(api.KmipClient):
return [algorithm_attribute, length_attribute, mask_attribute]
def _build_common_attributes(self, operation_policy_name=None):
# Build a list of common attributes.
'''
Build a list of common attributes that are shared across
symmetric as well as asymmetric objects
'''
common_attributes = []
if operation_policy_name:
@ -424,6 +459,19 @@ class ProxyKmipClient(api.KmipClient):
return common_attributes
def _build_name_attribute(self, name=None):
'''
Build a name attribute, returned in a list for ease
of use in the caller
'''
name_list = []
if name:
name_list.append(self.attribute_factory.create_attribute(
enums.AttributeType.NAME,
name)
)
return name_list
def __enter__(self):
self.open()
return self

View File

@ -70,6 +70,13 @@ class TestNameValue(TestCase):
self.assertFalse(name_val == other_name_val)
self.assertFalse(name_val == 'invalid')
def test__ne(self):
name_val = Name.NameValue(self.stringName1)
other_name_val = Name.NameValue(self.stringName2)
self.assertTrue(name_val != other_name_val)
self.assertTrue(name_val != 'invalid')
def test__str(self):
name_val = Name.NameValue(self.stringName1)
repr_name = "NameValue(value='{0}')".format(self.stringName1)
@ -110,6 +117,15 @@ class TestNameType(TestCase):
self.assertFalse(type_uri == type_txt)
self.assertFalse(type_uri == 'invalid')
def test__ne(self):
type_uri = Name.NameType(self.enum_uri)
same_type = Name.NameType(self.enum_uri)
type_txt = Name.NameType(self.enum_txt)
self.assertFalse(type_uri != same_type)
self.assertTrue(type_uri != type_txt)
self.assertTrue(type_uri != 'invalid')
def test__str(self):
type_uri = Name.NameType(self.enum_uri)
str_uri = "{0}".format(self.enum_uri)
@ -191,6 +207,16 @@ class TestName(TestCase):
self.assertFalse(name_obj == other_type)
self.assertFalse(name_obj == 'invalid')
def test__ne(self):
name_obj = Name.create(self.stringName1, self.enumNameType)
same_name = Name.create(self.stringName1, self.enumNameType)
other_name = Name.create(self.stringName2, self.enumNameType)
other_type = Name.create(self.stringName1, self.enumNameTypeUri)
self.assertFalse(name_obj != same_name)
self.assertNotEqual(name_obj, other_name)
self.assertNotEqual(name_obj, other_type)
def test__str(self):
name_obj = Name.create(self.stringName1, self.enumNameType)
repr_name = (
@ -563,10 +589,3 @@ class TestCryptographicParameters(TestCase):
'key_role_type': KeyRoleType.BDK})
self.assertFalse(self.cp == cp_valid)
self.assertRaises(TypeError, self.cp.validate)
def test_bad_object(self):
name_value = 'puppies'
name_type = NameType.UNINTERPRETED_TEXT_STRING
bad_obj = Name.create(name_value, name_type)
self.assertNotEqual(NotImplemented, bad_obj)

View File

@ -41,7 +41,11 @@ class TestAttributeValueFactory(testtools.TestCase):
"""
Test that a Name attribute can be created.
"""
self.skip('')
attr_type = enums.AttributeType.NAME
name = self.factory.create_attribute_value(attr_type, "foo")
self.assertIsInstance(name, attributes.Name)
self.assertEqual("foo", name.name_value.value)
self.assertEqual(enums.Tags.NAME, name.tag)
def test_create_object_type(self):
"""

View File

@ -259,6 +259,69 @@ class TestProxyKmipClient(testtools.TestCase):
client.proxy.create.assert_called_with(
enums.ObjectType.SYMMETRIC_KEY, template)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_create_with_name(self):
"""
Test that a symmetric key can be created with proper inputs,
specifically testing that the name is correctly
sent with the request.
"""
# Create the template to test the create call
algorithm = enums.CryptographicAlgorithm.AES
length = 256
algorithm_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, algorithm)
length_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH, length)
mask_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
[enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT])
key_name = "symmetrickey"
name_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.NAME,
key_name)
key_attributes = [
algorithm_attribute,
length_attribute,
mask_attribute,
name_attribute
]
template = obj.TemplateAttribute(attributes=key_attributes)
key_id = 'aaaaaaaa-1111-2222-3333-ffffffffffff'
status = enums.ResultStatus.SUCCESS
result = results.CreateResult(
contents.ResultStatus(status),
uuid=attr.UniqueIdentifier(key_id))
with ProxyKmipClient() as client:
client.proxy.create.return_value = result
client.create(
algorithm,
length,
name=key_name
)
client.proxy.create.assert_called_with(
enums.ObjectType.SYMMETRIC_KEY, template)
def test_name_eq(self):
"""
Test that two identical name attributes match
"""
attr_type = enums.AttributeType.NAME
attr_name = "foo"
attr_a = self.attribute_factory.create_attribute(attr_type, attr_name)
attr_b = self.attribute_factory.create_attribute(attr_type, attr_name)
self.assertTrue(attr_a == attr_b)
self.assertFalse(attr_a != attr_b)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_create_on_invalid_algorithm(self):
@ -355,7 +418,9 @@ class TestProxyKmipClient(testtools.TestCase):
public_uid, private_uid = client.create_key_pair(
enums.CryptographicAlgorithm.RSA, 2048)
kwargs = {'common_template_attribute': template}
kwargs = {'common_template_attribute': template,
'private_key_template_attribute': None,
'public_key_template_attribute': None}
client.proxy.create_key_pair.assert_called_with(**kwargs)
self.assertIsInstance(public_uid, six.string_types)
self.assertIsInstance(private_uid, six.string_types)
@ -409,7 +474,68 @@ class TestProxyKmipClient(testtools.TestCase):
operation_policy_name='test'
)
kwargs = {'common_template_attribute': template}
kwargs = {'common_template_attribute': template,
'private_key_template_attribute': None,
'public_key_template_attribute': None}
client.proxy.create_key_pair.assert_called_with(**kwargs)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_create_key_pair_with_key_names(self):
"""
Test that an asymmetric key pair can be created with proper inputs,
specifically testing that the private / public names are correctly
sent with the request
"""
# Create the template to test the create key pair call
algorithm = enums.CryptographicAlgorithm.RSA
length = 2048
algorithm_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, algorithm)
length_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH, length)
mask_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
[enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT])
private_name_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.NAME, "private")
public_name_attribute = self.attribute_factory.create_attribute(
enums.AttributeType.NAME, "public")
pair_attributes = [
algorithm_attribute,
length_attribute,
mask_attribute]
template = obj.CommonTemplateAttribute(attributes=pair_attributes)
private_template = obj.PrivateKeyTemplateAttribute(
names=[private_name_attribute])
public_template = obj.PublicKeyTemplateAttribute(
names=[public_name_attribute])
status = enums.ResultStatus.SUCCESS
result = results.CreateKeyPairResult(
contents.ResultStatus(status),
public_key_uuid=attr.PublicKeyUniqueIdentifier(
'aaaaaaaa-1111-2222-3333-ffffffffffff'),
private_key_uuid=attr.PrivateKeyUniqueIdentifier(
'ffffffff-3333-2222-1111-aaaaaaaaaaaa'))
with ProxyKmipClient() as client:
client.proxy.create_key_pair.return_value = result
public_uid, private_uid = client.create_key_pair(
enums.CryptographicAlgorithm.RSA,
2048,
public_name="public",
private_name="private"
)
kwargs = {'common_template_attribute': template,
'private_key_template_attribute': private_template,
'public_key_template_attribute': public_template}
client.proxy.create_key_pair.assert_called_with(**kwargs)
@mock.patch('kmip.pie.client.KMIPProxy',
@ -793,7 +919,7 @@ class TestProxyKmipClient(testtools.TestCase):
operation_policy_name = 'test'
common_attributes = client._build_common_attributes(
operation_policy_name
operation_policy_name=operation_policy_name
)
self.assertEqual(1, len(common_attributes))