Merge pull request #36 from hadesto/dev/hadi/integration-tests/release/symmetric-keys

Added integration tests for testing symmetric keys
This commit is contained in:
Peter Hamilton 2015-06-25 07:55:52 -04:00
commit 0069f0d9fb
1 changed files with 359 additions and 1 deletions

View File

@ -13,14 +13,43 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import pytest import logging
from testtools import TestCase from testtools import TestCase
from kmip.core.attributes import CryptographicAlgorithm
from kmip.core.attributes import CryptographicLength
from kmip.core.attributes import Name
from kmip.core.enums import AttributeType
from kmip.core.enums import CryptographicAlgorithm as CryptoAlgorithmEnum
from kmip.core.enums import CryptographicUsageMask
from kmip.core.enums import KeyFormatType as KeyFormatTypeEnum
from kmip.core.enums import NameType
from kmip.core.enums import ObjectType
from kmip.core.enums import ResultStatus from kmip.core.enums import ResultStatus
from kmip.core.enums import ResultReason
from kmip.core.enums import QueryFunction as QueryFunctionEnum from kmip.core.enums import QueryFunction as QueryFunctionEnum
from kmip.core.factories.attributes import AttributeFactory
from kmip.core.factories.credentials import CredentialFactory
from kmip.core.factories.secrets import SecretFactory
from kmip.core.misc import KeyFormatType
from kmip.core.objects import Attribute
from kmip.core.objects import KeyBlock
from kmip.core.objects import KeyMaterial
from kmip.core.objects import KeyValue
from kmip.core.objects import TemplateAttribute
from kmip.core.misc import QueryFunction from kmip.core.misc import QueryFunction
from kmip.core.secrets import SymmetricKey
import pytest
@pytest.mark.usefixtures("client") @pytest.mark.usefixtures("client")
class TestIntegration(TestCase): class TestIntegration(TestCase):
@ -28,9 +57,191 @@ class TestIntegration(TestCase):
def setUp(self): def setUp(self):
super(TestIntegration, self).setUp() super(TestIntegration, self).setUp()
self.logger = logging.getLogger(__name__)
self.attr_factory = AttributeFactory()
self.cred_factory = CredentialFactory()
self.secret_factory = SecretFactory()
def tearDown(self): def tearDown(self):
super(TestIntegration, self).tearDown() super(TestIntegration, self).tearDown()
def _create_symmetric_key(self, key_name=None):
"""
Helper function for creating symmetric keys. Used any time a key
needs to be created.
:param key_name: name of the key to be created
:return: returns the result of the "create key" operation as
provided by the KMIP appliance
"""
object_type = ObjectType.SYMMETRIC_KEY
attribute_type = AttributeType.CRYPTOGRAPHIC_ALGORITHM
algorithm = self.attr_factory.create_attribute(attribute_type,
CryptoAlgorithmEnum.AES)
mask_flags = [CryptographicUsageMask.ENCRYPT,
CryptographicUsageMask.DECRYPT]
attribute_type = AttributeType.CRYPTOGRAPHIC_USAGE_MASK
usage_mask = self.attr_factory.create_attribute(attribute_type,
mask_flags)
key_length = 128
attribute_type = AttributeType.CRYPTOGRAPHIC_LENGTH
key_length_obj = self.attr_factory.create_attribute(attribute_type,
key_length)
name = Attribute.AttributeName('Name')
if key_name is None:
key_name = 'Integration Test - Key'
name_value = Name.NameValue(key_name)
name_type = Name.NameType(NameType.UNINTERPRETED_TEXT_STRING)
value = Name(name_value=name_value, name_type=name_type)
name = Attribute(attribute_name=name, attribute_value=value)
attributes = [algorithm, usage_mask, key_length_obj, name]
template_attribute = TemplateAttribute(attributes=attributes)
return self.client.create(object_type, template_attribute,
credential=None)
def _check_result_status(self, result, result_status_type,
result_status_value):
"""
Helper function for checking the status of KMIP appliance actions.
Verifies the result status type and value.
:param result: result object
:param result_status_type: type of result status received
:param result_status_value: value of the result status
"""
result_status = result.result_status.enum
# Error check the result status type and value
expected = result_status_type
self.assertIsInstance(result_status, expected)
expected = result_status_value
if result_status is ResultStatus.OPERATION_FAILED:
self.logger.debug(result)
self.logger.debug(result.result_reason)
self.logger.debug(result.result_message)
self.assertEqual(expected, result_status)
def _check_uuid(self, uuid, uuid_type):
"""
Helper function for checking UUID type and value for errors
:param uuid: UUID of a created key
:param uuid_type: UUID type
:return:
"""
# Error check the UUID type and value
not_expected = None
self.assertNotEqual(not_expected, uuid)
expected = uuid_type
self.assertEqual(expected, type(uuid))
def _check_object_type(self, object_type, object_type_type,
object_type_value):
"""
Checks the type and value of a given object type.
:param object_type:
:param object_type_type:
:param object_type_value:
"""
# Error check the object type type and value
expected = object_type_type
self.assertIsInstance(object_type, expected)
expected = object_type_value
self.assertEqual(expected, object_type)
def _check_template_attribute(self, template_attribute,
template_attribute_type, num_attributes,
attribute_features):
"""
Checks the value and type of a given template attribute
:param template_attribute:
:param template_attribute_type:
:param num_attributes:
:param attribute_features:
"""
# Error check the template attribute type
expected = template_attribute_type
self.assertIsInstance(template_attribute, expected)
attributes = template_attribute.attributes
for i in range(num_attributes):
features = attribute_features[i]
self._check_attribute(attributes[i], features[0], features[1],
features[2], features[3])
def _check_attribute(self, attribute, attribute_name_type,
attribute_name_value, attribute_value_type,
attribute_value_value):
"""
Checks the value and type of a given attribute
:param attribute:
:param attribute_name_type:
:param attribute_name_value:
:param attribute_value_type:
:param attribute_value_value:
"""
# Error check the attribute name and value type and value
attribute_name = attribute.attribute_name
attribute_value = attribute.attribute_value
self._check_attribute_name(attribute_name, attribute_name_type,
attribute_name_value)
if attribute_name_value == 'Unique Identifier':
self._check_uuid(attribute_value.value, attribute_value_type)
else:
self._check_attribute_value(attribute_value, attribute_value_type,
attribute_value_value)
def _check_attribute_name(self, attribute_name, attribute_name_type,
attribute_name_value):
"""
Checks the attribute name for a given attribute
:param attribute_name:
:param attribute_name_type:
:param attribute_name_value:
"""
# Error check the attribute name type and value
expected = attribute_name_type
observed = type(attribute_name.value)
self.assertEqual(expected, observed)
expected = attribute_name_value
observed = attribute_name.value
self.assertEqual(expected, observed)
def _check_attribute_value(self, attribute_value, attribute_value_type,
attribute_value_value):
"""
Checks the attribute value for a given attribute
:param attribute_value:
:param attribute_value_type:
:param attribute_value_value:
"""
expected = attribute_value_type
observed = type(attribute_value.value)
self.assertEqual(expected, observed)
expected = attribute_value_value
observed = attribute_value.value
self.assertEqual(expected, observed)
def test_discover_versions(self): def test_discover_versions(self):
result = self.client.discover_versions() result = self.client.discover_versions()
@ -61,3 +272,150 @@ class TestIntegration(TestCase):
observed = result.result_status.enum observed = result.result_status.enum
self.assertEqual(expected, observed) self.assertEqual(expected, observed)
def test_symmetric_key_create_get_destroy(self):
"""
Test that symmetric keys are properly created
"""
key_name = 'Integration Test - Create-Get-Destroy Key'
result = self._create_symmetric_key(key_name=key_name)
self._check_result_status(result, ResultStatus, ResultStatus.SUCCESS)
self._check_object_type(result.object_type.enum, ObjectType,
ObjectType.SYMMETRIC_KEY)
self._check_uuid(result.uuid.value, str)
result = self.client.get(uuid=result.uuid.value, credential=None)
self._check_result_status(result, ResultStatus, ResultStatus.SUCCESS)
self._check_object_type(result.object_type.enum, ObjectType,
ObjectType.SYMMETRIC_KEY)
self._check_uuid(result.uuid.value, str)
# Check the secret type
secret = result.secret
expected = SymmetricKey
self.assertIsInstance(secret, expected)
self.logger.debug('Destroying key: ' + key_name + '\n With UUID: ' +
result.uuid.value)
result = self.client.destroy(result.uuid.value)
self._check_result_status(result, ResultStatus,
ResultStatus.SUCCESS)
self._check_uuid(result.uuid.value, str)
# Verify the secret was destroyed
result = self.client.get(uuid=result.uuid.value, credential=None)
self._check_result_status(result, ResultStatus,
ResultStatus.OPERATION_FAILED)
expected = ResultReason
observed = type(result.result_reason.enum)
self.assertEqual(expected, observed)
expected = ResultReason.ITEM_NOT_FOUND
observed = result.result_reason.enum
self.assertEqual(expected, observed)
def test_symmetric_key_register_get_destroy(self):
"""
Tests that symmetric keys are properly registered
"""
object_type = ObjectType.SYMMETRIC_KEY
algorithm_value = CryptoAlgorithmEnum.AES
mask_flags = [CryptographicUsageMask.ENCRYPT,
CryptographicUsageMask.DECRYPT]
attribute_type = AttributeType.CRYPTOGRAPHIC_USAGE_MASK
usage_mask = self.attr_factory.create_attribute(attribute_type,
mask_flags)
name = Attribute.AttributeName('Name')
key_name = 'Integration Test - Register-Get-Destroy Key'
name_value = Name.NameValue(key_name)
name_type = Name.NameType(NameType.UNINTERPRETED_TEXT_STRING)
value = Name(name_value=name_value, name_type=name_type)
name = Attribute(attribute_name=name, attribute_value=value)
attributes = [usage_mask, name]
template_attribute = TemplateAttribute(attributes=attributes)
key_format_type = KeyFormatType(KeyFormatTypeEnum.RAW)
key_data = (
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x00')
key_material = KeyMaterial(key_data)
key_value = KeyValue(key_material)
cryptographic_algorithm = CryptographicAlgorithm(algorithm_value)
cryptographic_length = CryptographicLength(128)
key_block = KeyBlock(
key_format_type=key_format_type,
key_compression_type=None,
key_value=key_value,
cryptographic_algorithm=cryptographic_algorithm,
cryptographic_length=cryptographic_length,
key_wrapping_data=None)
secret = SymmetricKey(key_block)
result = self.client.register(object_type, template_attribute, secret,
credential=None)
self._check_result_status(result, ResultStatus, ResultStatus.SUCCESS)
self._check_uuid(result.uuid.value, str)
# Check that the returned key bytes match what was provided
uuid = result.uuid.value
result = self.client.get(uuid=uuid, credential=None)
self._check_result_status(result, ResultStatus, ResultStatus.SUCCESS)
self._check_object_type(result.object_type.enum, ObjectType,
ObjectType.SYMMETRIC_KEY)
self._check_uuid(result.uuid.value, str)
# Check the secret type
secret = result.secret
expected = SymmetricKey
self.assertIsInstance(secret, expected)
key_block = result.secret.key_block
key_value = key_block.key_value
key_material = key_value.key_material
expected = key_data
observed = key_material.value
self.assertEqual(expected, observed)
self.logger.debug('Destroying key: ' + key_name + '\nWith UUID: ' +
result.uuid.value)
result = self.client.destroy(result.uuid.value)
self._check_result_status(result, ResultStatus,
ResultStatus.SUCCESS)
self._check_uuid(result.uuid.value, str)
# Verify the secret was destroyed
result = self.client.get(uuid=uuid, credential=None)
self._check_result_status(result, ResultStatus,
ResultStatus.OPERATION_FAILED)
expected = ResultReason
observed = type(result.result_reason.enum)
self.assertEqual(expected, observed)
expected = ResultReason.ITEM_NOT_FOUND
observed = result.result_reason.enum
self.assertEqual(expected, observed)