mirror of
https://github.com/OpenKMIP/PyKMIP.git
synced 2025-05-03 14:30:07 +02:00
1196 lines
49 KiB
Python
1196 lines
49 KiB
Python
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import logging
|
|
from testtools import TestCase
|
|
|
|
from kmip.core.attributes import CryptographicAlgorithm
|
|
from kmip.core.attributes import CryptographicLength
|
|
from kmip.core.attributes import Name
|
|
|
|
from kmip.core.enums import AttributeType
|
|
from kmip.core.enums import CryptographicAlgorithm as CryptoAlgorithmEnum
|
|
from kmip.core.enums import CryptographicUsageMask
|
|
from kmip.core.enums import KeyFormatType as KeyFormatTypeEnum
|
|
from kmip.core.enums import CertificateTypeEnum
|
|
from kmip.core.enums import NameType
|
|
from kmip.core.enums import ObjectType
|
|
from kmip.core.enums import OpaqueDataType
|
|
from kmip.core.enums import SecretDataType
|
|
from kmip.core.enums import ResultStatus
|
|
from kmip.core.enums import ResultReason
|
|
from kmip.core.enums import QueryFunction as QueryFunctionEnum
|
|
|
|
from kmip.core.factories.attributes import AttributeFactory
|
|
from kmip.core.factories.credentials import CredentialFactory
|
|
from kmip.core.factories.secrets import SecretFactory
|
|
|
|
from kmip.core.misc import KeyFormatType
|
|
|
|
from kmip.core.objects import Attribute
|
|
from kmip.core.objects import KeyBlock
|
|
from kmip.core.objects import KeyMaterial
|
|
from kmip.core.objects import KeyValue
|
|
from kmip.core.objects import TemplateAttribute
|
|
from kmip.core.objects import PrivateKeyTemplateAttribute
|
|
from kmip.core.objects import PublicKeyTemplateAttribute
|
|
from kmip.core.objects import CommonTemplateAttribute
|
|
|
|
from kmip.core.misc import QueryFunction
|
|
|
|
from kmip.core.secrets import SymmetricKey
|
|
from kmip.core.secrets import PrivateKey
|
|
from kmip.core.secrets import PublicKey
|
|
from kmip.core.secrets import Certificate
|
|
from kmip.core.secrets import SecretData
|
|
from kmip.core.secrets import OpaqueObject
|
|
|
|
import pytest
|
|
|
|
|
|
@pytest.mark.usefixtures("client")
|
|
class TestIntegration(TestCase):
|
|
|
|
def setUp(self):
|
|
super(TestIntegration, self).setUp()
|
|
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
self.attr_factory = AttributeFactory()
|
|
self.cred_factory = CredentialFactory()
|
|
self.secret_factory = SecretFactory()
|
|
|
|
def tearDown(self):
|
|
super(TestIntegration, self).tearDown()
|
|
|
|
def _create_symmetric_key(self, key_name=None):
|
|
"""
|
|
Helper function for creating symmetric keys. Used any time a key
|
|
needs to be created.
|
|
:param key_name: name of the key to be created
|
|
:return: returns the result of the "create key" operation as
|
|
provided by the KMIP appliance
|
|
"""
|
|
object_type = ObjectType.SYMMETRIC_KEY
|
|
attribute_type = AttributeType.CRYPTOGRAPHIC_ALGORITHM
|
|
algorithm = self.attr_factory.create_attribute(attribute_type,
|
|
CryptoAlgorithmEnum.AES)
|
|
mask_flags = [CryptographicUsageMask.ENCRYPT,
|
|
CryptographicUsageMask.DECRYPT]
|
|
attribute_type = AttributeType.CRYPTOGRAPHIC_USAGE_MASK
|
|
usage_mask = self.attr_factory.create_attribute(attribute_type,
|
|
mask_flags)
|
|
key_length = 128
|
|
attribute_type = AttributeType.CRYPTOGRAPHIC_LENGTH
|
|
key_length_obj = self.attr_factory.create_attribute(attribute_type,
|
|
key_length)
|
|
name = Attribute.AttributeName('Name')
|
|
|
|
if key_name is None:
|
|
key_name = 'Integration Test - Key'
|
|
|
|
name_value = Name.NameValue(key_name)
|
|
|
|
name_type = Name.NameType(NameType.UNINTERPRETED_TEXT_STRING)
|
|
value = Name(name_value=name_value, name_type=name_type)
|
|
name = Attribute(attribute_name=name, attribute_value=value)
|
|
attributes = [algorithm, usage_mask, key_length_obj, name]
|
|
template_attribute = TemplateAttribute(attributes=attributes)
|
|
|
|
return self.client.create(object_type, template_attribute,
|
|
credential=None)
|
|
|
|
def _create_key_pair(self, key_name=None):
|
|
"""
|
|
Helper function for creating private and public keys. Used any time
|
|
a key pair needs to be created.
|
|
:param key_name: name of the key to be created
|
|
:return: returns the result of the "create key" operation as
|
|
provided by the KMIP appliance
|
|
"""
|
|
attribute_type = AttributeType.CRYPTOGRAPHIC_ALGORITHM
|
|
algorithm = self.attr_factory.create_attribute(attribute_type,
|
|
CryptoAlgorithmEnum.RSA)
|
|
mask_flags = [CryptographicUsageMask.ENCRYPT,
|
|
CryptographicUsageMask.DECRYPT]
|
|
attribute_type = AttributeType.CRYPTOGRAPHIC_USAGE_MASK
|
|
usage_mask = self.attr_factory.create_attribute(attribute_type,
|
|
mask_flags)
|
|
key_length = 2048
|
|
attribute_type = AttributeType.CRYPTOGRAPHIC_LENGTH
|
|
key_length_obj = self.attr_factory.create_attribute(attribute_type,
|
|
key_length)
|
|
name = Attribute.AttributeName('Name')
|
|
|
|
if key_name is None:
|
|
key_name = 'Integration Test - Key'
|
|
|
|
priv_name_value = Name.NameValue(key_name + " Private")
|
|
pub_name_value = Name.NameValue(key_name + " Public")
|
|
name_type = Name.NameType(NameType.UNINTERPRETED_TEXT_STRING)
|
|
priv_value = Name(name_value=priv_name_value, name_type=name_type)
|
|
pub_value = Name(name_value=pub_name_value, name_type=name_type)
|
|
priv_name = Attribute(attribute_name=name, attribute_value=priv_value)
|
|
pub_name = Attribute(attribute_name=name, attribute_value=pub_value)
|
|
|
|
common_attributes = [algorithm, usage_mask, key_length_obj]
|
|
private_key_attributes = [priv_name]
|
|
public_key_attributes = [pub_name]
|
|
|
|
common = CommonTemplateAttribute(attributes=common_attributes)
|
|
priv_templ_attr = PrivateKeyTemplateAttribute(
|
|
attributes=private_key_attributes)
|
|
pub_templ_attr = PublicKeyTemplateAttribute(
|
|
attributes=public_key_attributes)
|
|
|
|
return self.client.\
|
|
create_key_pair(common_template_attribute=common,
|
|
private_key_template_attribute=priv_templ_attr,
|
|
public_key_template_attribute=pub_templ_attr)
|
|
|
|
def _check_result_status(self, result, result_status_type,
|
|
result_status_value):
|
|
"""
|
|
Helper function for checking the status of KMIP appliance actions.
|
|
Verifies the result status type and value.
|
|
:param result: result object
|
|
:param result_status_type: type of result status received
|
|
:param result_status_value: value of the result status
|
|
"""
|
|
|
|
result_status = result.result_status.value
|
|
# Error check the result status type and value
|
|
expected = result_status_type
|
|
|
|
self.assertIsInstance(result_status, expected)
|
|
|
|
expected = result_status_value
|
|
|
|
if result_status is ResultStatus.OPERATION_FAILED:
|
|
self.logger.error(result)
|
|
self.logger.error(result.result_reason)
|
|
self.logger.error(result.result_message)
|
|
self.assertEqual(expected, result_status)
|
|
|
|
def _check_uuid(self, uuid, uuid_type):
|
|
"""
|
|
Helper function for checking UUID type and value for errors
|
|
:param uuid: UUID of a created key
|
|
:param uuid_type: UUID type
|
|
:return:
|
|
"""
|
|
# Error check the UUID type and value
|
|
not_expected = None
|
|
|
|
self.assertNotEqual(not_expected, uuid)
|
|
|
|
expected = uuid_type
|
|
self.assertEqual(expected, type(uuid))
|
|
|
|
def _check_object_type(self, object_type, object_type_type,
|
|
object_type_value):
|
|
"""
|
|
Checks the type and value of a given object type.
|
|
:param object_type:
|
|
:param object_type_type:
|
|
:param object_type_value:
|
|
"""
|
|
# Error check the object type type and value
|
|
expected = object_type_type
|
|
|
|
self.assertIsInstance(object_type, expected)
|
|
|
|
expected = object_type_value
|
|
|
|
self.assertEqual(expected, object_type)
|
|
|
|
def _check_template_attribute(self, template_attribute,
|
|
template_attribute_type, num_attributes,
|
|
attribute_features):
|
|
"""
|
|
Checks the value and type of a given template attribute
|
|
:param template_attribute:
|
|
:param template_attribute_type:
|
|
:param num_attributes:
|
|
:param attribute_features:
|
|
"""
|
|
# Error check the template attribute type
|
|
expected = template_attribute_type
|
|
|
|
self.assertIsInstance(template_attribute, expected)
|
|
|
|
attributes = template_attribute.attributes
|
|
|
|
for i in range(num_attributes):
|
|
features = attribute_features[i]
|
|
self._check_attribute(attributes[i], features[0], features[1],
|
|
features[2], features[3])
|
|
|
|
def _check_attribute(self, attribute, attribute_name_type,
|
|
attribute_name_value, attribute_value_type,
|
|
attribute_value_value):
|
|
"""
|
|
Checks the value and type of a given attribute
|
|
:param attribute:
|
|
:param attribute_name_type:
|
|
:param attribute_name_value:
|
|
:param attribute_value_type:
|
|
:param attribute_value_value:
|
|
"""
|
|
# Error check the attribute name and value type and value
|
|
attribute_name = attribute.attribute_name
|
|
attribute_value = attribute.attribute_value
|
|
|
|
self._check_attribute_name(attribute_name, attribute_name_type,
|
|
attribute_name_value)
|
|
|
|
if attribute_name_value == 'Unique Identifier':
|
|
self._check_uuid(attribute_value.value, attribute_value_type)
|
|
else:
|
|
self._check_attribute_value(attribute_value, attribute_value_type,
|
|
attribute_value_value)
|
|
|
|
def _check_attribute_name(self, attribute_name, attribute_name_type,
|
|
attribute_name_value):
|
|
"""
|
|
Checks the attribute name for a given attribute
|
|
:param attribute_name:
|
|
:param attribute_name_type:
|
|
:param attribute_name_value:
|
|
"""
|
|
# Error check the attribute name type and value
|
|
expected = attribute_name_type
|
|
observed = type(attribute_name.value)
|
|
|
|
self.assertEqual(expected, observed)
|
|
|
|
expected = attribute_name_value
|
|
observed = attribute_name.value
|
|
|
|
self.assertEqual(expected, observed)
|
|
|
|
def _check_attribute_value(self, attribute_value, attribute_value_type,
|
|
attribute_value_value):
|
|
"""
|
|
Checks the attribute value for a given attribute
|
|
:param attribute_value:
|
|
:param attribute_value_type:
|
|
:param attribute_value_value:
|
|
"""
|
|
expected = attribute_value_type
|
|
observed = type(attribute_value.value)
|
|
|
|
self.assertEqual(expected, observed)
|
|
|
|
expected = attribute_value_value
|
|
observed = attribute_value.value
|
|
|
|
self.assertEqual(expected, observed)
|
|
|
|
def test_discover_versions(self):
|
|
result = self.client.discover_versions()
|
|
|
|
expected = ResultStatus.SUCCESS
|
|
observed = result.result_status.value
|
|
|
|
self.assertEqual(expected, observed)
|
|
|
|
def test_query(self):
|
|
# Build query function list, asking for all server data.
|
|
query_functions = list()
|
|
query_functions.append(
|
|
QueryFunction(QueryFunctionEnum.QUERY_OPERATIONS))
|
|
query_functions.append(
|
|
QueryFunction(QueryFunctionEnum.QUERY_OBJECTS))
|
|
query_functions.append(
|
|
QueryFunction(QueryFunctionEnum.QUERY_SERVER_INFORMATION))
|
|
query_functions.append(
|
|
QueryFunction(QueryFunctionEnum.QUERY_APPLICATION_NAMESPACES))
|
|
query_functions.append(
|
|
QueryFunction(QueryFunctionEnum.QUERY_EXTENSION_LIST))
|
|
query_functions.append(
|
|
QueryFunction(QueryFunctionEnum.QUERY_EXTENSION_MAP))
|
|
|
|
result = self.client.query(query_functions=query_functions)
|
|
|
|
expected = ResultStatus.SUCCESS
|
|
observed = result.result_status.value
|
|
|
|
self.assertEqual(expected, observed)
|
|
|
|
def test_symmetric_key_create_get_destroy(self):
|
|
"""
|
|
Test that symmetric keys are properly created
|
|
"""
|
|
key_name = 'Integration Test - Create-Get-Destroy Key'
|
|
result = self._create_symmetric_key(key_name=key_name)
|
|
|
|
self._check_result_status(result, ResultStatus, ResultStatus.SUCCESS)
|
|
self._check_object_type(result.object_type.value, ObjectType,
|
|
ObjectType.SYMMETRIC_KEY)
|
|
self._check_uuid(result.uuid.value, str)
|
|
|
|
result = self.client.get(uuid=result.uuid.value, credential=None)
|
|
|
|
self._check_result_status(result, ResultStatus, ResultStatus.SUCCESS)
|
|
self._check_object_type(result.object_type.value, ObjectType,
|
|
ObjectType.SYMMETRIC_KEY)
|
|
self._check_uuid(result.uuid.value, str)
|
|
|
|
# Check the secret type
|
|
secret = result.secret
|
|
|
|
expected = SymmetricKey
|
|
self.assertIsInstance(secret, expected)
|
|
|
|
self.logger.debug('Destroying key: ' + key_name + '\n With UUID: ' +
|
|
result.uuid.value)
|
|
|
|
result = self.client.destroy(result.uuid.value)
|
|
self._check_result_status(result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
self._check_uuid(result.uuid.value, str)
|
|
|
|
# Verify the secret was destroyed
|
|
result = self.client.get(uuid=result.uuid.value, credential=None)
|
|
|
|
self._check_result_status(result, ResultStatus,
|
|
ResultStatus.OPERATION_FAILED)
|
|
|
|
expected = ResultReason
|
|
observed = type(result.result_reason.value)
|
|
|
|
self.assertEqual(expected, observed)
|
|
|
|
expected = ResultReason.ITEM_NOT_FOUND
|
|
observed = result.result_reason.value
|
|
|
|
self.assertEqual(expected, observed)
|
|
|
|
def test_symmetric_key_register_get_destroy(self):
|
|
"""
|
|
Tests that symmetric keys are properly registered, retrieved,
|
|
and destroyed.
|
|
"""
|
|
object_type = ObjectType.SYMMETRIC_KEY
|
|
algorithm_value = CryptoAlgorithmEnum.AES
|
|
mask_flags = [CryptographicUsageMask.ENCRYPT,
|
|
CryptographicUsageMask.DECRYPT]
|
|
attribute_type = AttributeType.CRYPTOGRAPHIC_USAGE_MASK
|
|
usage_mask = self.attr_factory.create_attribute(attribute_type,
|
|
mask_flags)
|
|
|
|
name = Attribute.AttributeName('Name')
|
|
key_name = 'Integration Test - Register-Get-Destroy Key'
|
|
name_value = Name.NameValue(key_name)
|
|
name_type = Name.NameType(NameType.UNINTERPRETED_TEXT_STRING)
|
|
value = Name(name_value=name_value, name_type=name_type)
|
|
name = Attribute(attribute_name=name, attribute_value=value)
|
|
|
|
attributes = [usage_mask, name]
|
|
template_attribute = TemplateAttribute(attributes=attributes)
|
|
|
|
key_format_type = KeyFormatType(KeyFormatTypeEnum.RAW)
|
|
|
|
key_data = (
|
|
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
b'\x00')
|
|
|
|
key_material = KeyMaterial(key_data)
|
|
key_value = KeyValue(key_material)
|
|
cryptographic_algorithm = CryptographicAlgorithm(algorithm_value)
|
|
cryptographic_length = CryptographicLength(128)
|
|
|
|
key_block = KeyBlock(
|
|
key_format_type=key_format_type,
|
|
key_compression_type=None,
|
|
key_value=key_value,
|
|
cryptographic_algorithm=cryptographic_algorithm,
|
|
cryptographic_length=cryptographic_length,
|
|
key_wrapping_data=None)
|
|
|
|
secret = SymmetricKey(key_block)
|
|
|
|
result = self.client.register(object_type, template_attribute, secret,
|
|
credential=None)
|
|
|
|
self._check_result_status(result, ResultStatus, ResultStatus.SUCCESS)
|
|
self._check_uuid(result.uuid.value, str)
|
|
|
|
# Check that the returned key bytes match what was provided
|
|
uuid = result.uuid.value
|
|
result = self.client.get(uuid=uuid, credential=None)
|
|
|
|
self._check_result_status(result, ResultStatus, ResultStatus.SUCCESS)
|
|
self._check_object_type(result.object_type.value, ObjectType,
|
|
ObjectType.SYMMETRIC_KEY)
|
|
self._check_uuid(result.uuid.value, str)
|
|
|
|
# Check the secret type
|
|
secret = result.secret
|
|
|
|
expected = SymmetricKey
|
|
|
|
self.assertIsInstance(secret, expected)
|
|
|
|
key_block = result.secret.key_block
|
|
key_value = key_block.key_value
|
|
key_material = key_value.key_material
|
|
|
|
expected = key_data
|
|
observed = key_material.value
|
|
|
|
self.assertEqual(expected, observed)
|
|
|
|
self.logger.debug('Destroying key: ' + key_name + '\nWith UUID: ' +
|
|
result.uuid.value)
|
|
|
|
result = self.client.destroy(result.uuid.value)
|
|
self._check_result_status(result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
self._check_uuid(result.uuid.value, str)
|
|
|
|
# Verify the secret was destroyed
|
|
result = self.client.get(uuid=uuid, credential=None)
|
|
|
|
self._check_result_status(result, ResultStatus,
|
|
ResultStatus.OPERATION_FAILED)
|
|
|
|
expected = ResultReason
|
|
observed = type(result.result_reason.value)
|
|
|
|
self.assertEqual(expected, observed)
|
|
|
|
expected = ResultReason.ITEM_NOT_FOUND
|
|
observed = result.result_reason.value
|
|
|
|
self.assertEqual(expected, observed)
|
|
|
|
def test_key_pair_create_get_destroy(self):
|
|
"""
|
|
Test that key pairs are properly created, retrieved, and destroyed.
|
|
"""
|
|
key_name = 'Integration Test - Create-Get-Destroy Key Pair -'
|
|
result = self._create_key_pair(key_name=key_name)
|
|
|
|
self._check_result_status(result, ResultStatus, ResultStatus.SUCCESS)
|
|
|
|
# Check UUID value for Private key
|
|
self._check_uuid(result.private_key_uuid.value, str)
|
|
# Check UUID value for Public key
|
|
self._check_uuid(result.public_key_uuid.value, str)
|
|
|
|
priv_key_uuid = result.private_key_uuid.value
|
|
pub_key_uuid = result.public_key_uuid.value
|
|
|
|
priv_key_result = self.client.get(uuid=priv_key_uuid, credential=None)
|
|
pub_key_result = self.client.get(uuid=pub_key_uuid, credential=None)
|
|
|
|
self._check_result_status(priv_key_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
self._check_object_type(priv_key_result.object_type.value, ObjectType,
|
|
ObjectType.PRIVATE_KEY)
|
|
|
|
self._check_uuid(priv_key_result.uuid.value, str)
|
|
self._check_result_status(pub_key_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
self._check_object_type(pub_key_result.object_type.value, ObjectType,
|
|
ObjectType.PUBLIC_KEY)
|
|
|
|
self._check_uuid(pub_key_result.uuid.value, str)
|
|
|
|
# Check the secret type
|
|
priv_secret = priv_key_result.secret
|
|
pub_secret = pub_key_result.secret
|
|
|
|
priv_expected = PrivateKey
|
|
pub_expected = PublicKey
|
|
|
|
self.assertIsInstance(priv_secret, priv_expected)
|
|
self.assertIsInstance(pub_secret, pub_expected)
|
|
|
|
self.logger.debug('Destroying key: ' + key_name + ' Private' +
|
|
'\n With UUID: ' + result.private_key_uuid.value)
|
|
destroy_priv_key_result = self.client.destroy(
|
|
result.private_key_uuid.value)
|
|
|
|
self._check_result_status(destroy_priv_key_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
|
|
self.logger.debug('Destroying key: ' + key_name + ' Public' +
|
|
'\n With UUID: ' + result.public_key_uuid.value)
|
|
destroy_pub_key_result = self.client.destroy(
|
|
result.public_key_uuid.value)
|
|
self._check_result_status(destroy_pub_key_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
|
|
priv_key_uuid = destroy_priv_key_result.uuid.value
|
|
pub_key_uuid = destroy_pub_key_result.uuid.value
|
|
|
|
self._check_uuid(priv_key_uuid, str)
|
|
self._check_uuid(pub_key_uuid, str)
|
|
|
|
# Verify the secret was destroyed
|
|
priv_key_destroyed_result = self.client.get(uuid=priv_key_uuid)
|
|
pub_key_destroyed_result = self.client.get(uuid=pub_key_uuid)
|
|
|
|
self._check_result_status(priv_key_destroyed_result, ResultStatus,
|
|
ResultStatus.OPERATION_FAILED)
|
|
self._check_result_status(pub_key_destroyed_result, ResultStatus,
|
|
ResultStatus.OPERATION_FAILED)
|
|
|
|
expected = ResultReason
|
|
observed_priv = type(priv_key_destroyed_result.result_reason.value)
|
|
observed_pub = type(pub_key_destroyed_result.result_reason.value)
|
|
|
|
self.assertEqual(expected, observed_priv)
|
|
self.assertEqual(expected, observed_pub)
|
|
|
|
def test_private_key_register_get_destroy(self):
|
|
"""
|
|
Tests that private keys are properly registered, retrieved,
|
|
and destroyed.
|
|
"""
|
|
priv_key_object_type = ObjectType.PRIVATE_KEY
|
|
|
|
mask_flags = [CryptographicUsageMask.ENCRYPT,
|
|
CryptographicUsageMask.DECRYPT]
|
|
attribute_type = AttributeType.CRYPTOGRAPHIC_USAGE_MASK
|
|
usage_mask = self.attr_factory.create_attribute(attribute_type,
|
|
mask_flags)
|
|
|
|
name = Attribute.AttributeName('Name')
|
|
key_name = 'Integration Test - Register-Get-Destroy Key -'
|
|
|
|
priv_name_value = Name.NameValue(key_name + " Private")
|
|
|
|
name_type = Name.NameType(NameType.UNINTERPRETED_TEXT_STRING)
|
|
priv_value = Name(name_value=priv_name_value, name_type=name_type)
|
|
|
|
priv_name = Attribute(attribute_name=name, attribute_value=priv_value)
|
|
|
|
priv_key_attributes = [usage_mask, priv_name]
|
|
|
|
private_template_attribute = TemplateAttribute(
|
|
attributes=priv_key_attributes)
|
|
|
|
key_format_type = KeyFormatType(KeyFormatTypeEnum.RAW)
|
|
|
|
key_data = (
|
|
b'\x30\x82\x02\x76\x02\x01\x00\x30\x0D\x06\x09\x2A\x86\x48\x86\xF7'
|
|
b'\x0D\x01\x01\x01\x05\x00\x04\x82\x02\x60\x30\x82\x02\x5C\x02\x01'
|
|
b'\x00\x02\x81\x81\x00\x93\x04\x51\xC9\xEC\xD9\x4F\x5B\xB9\xDA\x17'
|
|
b'\xDD\x09\x38\x1B\xD2\x3B\xE4\x3E\xCA\x8C\x75\x39\xF3\x01\xFC\x8A'
|
|
b'\x8C\xD5\xD5\x27\x4C\x3E\x76\x99\xDB\xDC\x71\x1C\x97\xA7\xAA\x91'
|
|
b'\xE2\xC5\x0A\x82\xBD\x0B\x10\x34\xF0\xDF\x49\x3D\xEC\x16\x36\x24'
|
|
b'\x27\xE5\x8A\xCC\xE7\xF6\xCE\x0F\x9B\xCC\x61\x7B\xBD\x8C\x90\xD0'
|
|
b'\x09\x4A\x27\x03\xBA\x0D\x09\xEB\x19\xD1\x00\x5F\x2F\xB2\x65\x52'
|
|
b'\x6A\xAC\x75\xAF\x32\xF8\xBC\x78\x2C\xDE\xD2\xA5\x7F\x81\x1E\x03'
|
|
b'\xEA\xF6\x7A\x94\x4D\xE5\xE7\x84\x13\xDC\xA8\xF2\x32\xD0\x74\xE6'
|
|
b'\xDC\xEA\x4C\xEC\x9F\x02\x03\x01\x00\x01\x02\x81\x80\x0B\x6A\x7D'
|
|
b'\x73\x61\x99\xEA\x48\xA4\x20\xE4\x53\x7C\xA0\xC7\xC0\x46\x78\x4D'
|
|
b'\xCB\xEA\xA6\x3B\xAE\xBC\x0B\xC1\x32\x78\x74\x49\xCD\xE8\xD7\xCA'
|
|
b'\xD0\xC0\xC8\x63\xC0\xFE\xFB\x06\xC3\x06\x2B\xEF\xC5\x00\x33\xEC'
|
|
b'\xF8\x7B\x4E\x33\xA9\xBE\x7B\xCB\xC8\xF1\x51\x1A\xE2\x15\xE8\x0D'
|
|
b'\xEB\x5D\x8A\xF2\xBD\x31\x31\x9D\x78\x21\x19\x66\x40\x93\x5A\x0C'
|
|
b'\xD6\x7C\x94\x59\x95\x79\xF2\x10\x0D\x65\xE0\x38\x83\x1F\xDA\xFB'
|
|
b'\x0D\xBE\x2B\xBD\xAC\x00\xA6\x96\xE6\x7E\x75\x63\x50\xE1\xC9\x9A'
|
|
b'\xCE\x11\xA3\x6D\xAB\xAC\x3E\xD3\xE7\x30\x96\x00\x59\x02\x41\x00'
|
|
b'\xDD\xF6\x72\xFB\xCC\x5B\xDA\x3D\x73\xAF\xFC\x4E\x79\x1E\x0C\x03'
|
|
b'\x39\x02\x24\x40\x5D\x69\xCC\xAA\xBC\x74\x9F\xAA\x0D\xCD\x4C\x25'
|
|
b'\x83\xC7\x1D\xDE\x89\x41\xA7\xB9\xAA\x03\x0F\x52\xEF\x14\x51\x46'
|
|
b'\x6C\x07\x4D\x4D\x33\x8F\xE6\x77\x89\x2A\xCD\x9E\x10\xFD\x35\xBD'
|
|
b'\x02\x41\x00\xA9\x8F\xBC\x3E\xD6\xB4\xC6\xF8\x60\xF9\x71\x65\xAC'
|
|
b'\x2F\x7B\xB6\xF2\xE2\xCB\x19\x2A\x9A\xBD\x49\x79\x5B\xE5\xBC\xF3'
|
|
b'\x7D\x8E\xE6\x9A\x6E\x16\x9C\x24\xE5\xC3\x2E\x4E\x7F\xA3\x32\x65'
|
|
b'\x46\x14\x07\xF9\x52\xBA\x49\xE2\x04\x81\x8A\x2F\x78\x5F\x11\x3F'
|
|
b'\x92\x2B\x8B\x02\x40\x25\x3F\x94\x70\x39\x0D\x39\x04\x93\x03\x77'
|
|
b'\x7D\xDB\xC9\x75\x0E\x9D\x64\x84\x9C\xE0\x90\x3E\xAE\x70\x4D\xC9'
|
|
b'\xF5\x89\xB7\x68\x0D\xEB\x9D\x60\x9F\xD5\xBC\xD4\xDE\xCD\x6F\x12'
|
|
b'\x05\x42\xE5\xCF\xF5\xD7\x6F\x2A\x43\xC8\x61\x5F\xB5\xB3\xA9\x21'
|
|
b'\x34\x63\x79\x7A\xA9\x02\x41\x00\xA1\xDD\xF0\x23\xC0\xCD\x94\xC0'
|
|
b'\x19\xBB\x26\xD0\x9B\x9E\x3C\xA8\xFA\x97\x1C\xB1\x6A\xA5\x8B\x9B'
|
|
b'\xAF\x79\xD6\x08\x1A\x1D\xBB\xA4\x52\xBA\x53\x65\x3E\x28\x04\xBA'
|
|
b'\x98\xFF\x69\xE8\xBB\x1B\x3A\x16\x1E\xA2\x25\xEA\x50\x14\x63\x21'
|
|
b'\x6A\x8D\xAB\x9B\x88\xA7\x5E\x5F\x02\x40\x61\x78\x64\x6E\x11\x2C'
|
|
b'\xF7\x9D\x92\x1A\x8A\x84\x3F\x17\xF6\xE7\xFF\x97\x4F\x68\x81\x22'
|
|
b'\x36\x5B\xF6\x69\x0C\xDF\xC9\x96\xE1\x89\x09\x52\xEB\x38\x20\xDD'
|
|
b'\x18\x90\xEC\x1C\x86\x19\xE8\x7A\x2B\xD3\x8F\x9D\x03\xB3\x7F\xAC'
|
|
b'\x74\x2E\xFB\x74\x8C\x78\x85\x94\x2C\x39')
|
|
|
|
key_material = KeyMaterial(key_data)
|
|
key_value = KeyValue(key_material)
|
|
|
|
algorithm_value = CryptoAlgorithmEnum.RSA
|
|
cryptographic_algorithm = CryptographicAlgorithm(algorithm_value)
|
|
cryptographic_length = CryptographicLength(2048)
|
|
|
|
key_block = KeyBlock(
|
|
key_format_type=key_format_type,
|
|
key_compression_type=None,
|
|
key_value=key_value,
|
|
cryptographic_algorithm=cryptographic_algorithm,
|
|
cryptographic_length=cryptographic_length,
|
|
key_wrapping_data=None)
|
|
|
|
priv_secret = PrivateKey(key_block)
|
|
|
|
priv_key_result = self.client.register(priv_key_object_type,
|
|
private_template_attribute,
|
|
priv_secret, credential=None)
|
|
|
|
self._check_result_status(priv_key_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
|
|
self._check_uuid(priv_key_result.uuid.value, str)
|
|
|
|
# Check that the returned key bytes match what was provided
|
|
priv_uuid = priv_key_result.uuid.value
|
|
|
|
priv_key_result = self.client.get(uuid=priv_uuid, credential=None)
|
|
|
|
self._check_result_status(priv_key_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
|
|
self._check_object_type(priv_key_result.object_type.value, ObjectType,
|
|
ObjectType.PRIVATE_KEY)
|
|
|
|
self._check_uuid(priv_key_result.uuid.value, str)
|
|
|
|
# Check the secret type
|
|
priv_secret = priv_key_result.secret
|
|
|
|
priv_expected = PrivateKey
|
|
|
|
self.assertIsInstance(priv_secret, priv_expected)
|
|
|
|
priv_key_block = priv_key_result.secret.key_block
|
|
priv_key_value = priv_key_block.key_value
|
|
priv_key_material = priv_key_value.key_material
|
|
|
|
expected = key_data
|
|
|
|
priv_observed = priv_key_material.value
|
|
|
|
self.assertEqual(expected, priv_observed)
|
|
|
|
self.logger.debug('Destroying key: ' + key_name + " Private" +
|
|
'\nWith " "UUID: ' + priv_key_result.uuid.value)
|
|
|
|
priv_result = self.client.destroy(priv_key_result.uuid.value)
|
|
|
|
self._check_result_status(priv_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
|
|
self._check_uuid(priv_result.uuid.value, str)
|
|
|
|
# Verify the secret was destroyed
|
|
priv_key_destroyed_result = self.client.get(uuid=priv_uuid,
|
|
credential=None)
|
|
|
|
self._check_result_status(priv_key_destroyed_result, ResultStatus,
|
|
ResultStatus.OPERATION_FAILED)
|
|
|
|
expected = ResultReason
|
|
priv_observed = type(priv_key_destroyed_result.result_reason.value)
|
|
|
|
self.assertEqual(expected, priv_observed)
|
|
|
|
def test_public_key_register_get_destroy(self):
|
|
"""
|
|
Tests that public keys are properly registered, retrieved,
|
|
and destroyed.
|
|
"""
|
|
pub_key_object_type = ObjectType.PUBLIC_KEY
|
|
mask_flags = [CryptographicUsageMask.ENCRYPT,
|
|
CryptographicUsageMask.DECRYPT]
|
|
attribute_type = AttributeType.CRYPTOGRAPHIC_USAGE_MASK
|
|
usage_mask = self.attr_factory.create_attribute(attribute_type,
|
|
mask_flags)
|
|
|
|
name = Attribute.AttributeName('Name')
|
|
key_name = 'Integration Test - Register-Get-Destroy Key -'
|
|
|
|
pub_name_value = Name.NameValue(key_name + " Public")
|
|
name_type = Name.NameType(NameType.UNINTERPRETED_TEXT_STRING)
|
|
pub_value = Name(name_value=pub_name_value, name_type=name_type)
|
|
pub_name = Attribute(attribute_name=name, attribute_value=pub_value)
|
|
pub_key_attributes = [usage_mask, pub_name]
|
|
public_template_attribute = TemplateAttribute(
|
|
attributes=pub_key_attributes)
|
|
key_format_type = KeyFormatType(KeyFormatTypeEnum.RAW)
|
|
key_data = (
|
|
b'\x30\x81\x9F\x30\x0D\x06\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x01'
|
|
b'\x05\x00\x03\x81\x8D\x00\x30\x81\x89\x02\x81\x81\x00\x93\x04\x51'
|
|
b'\xC9\xEC\xD9\x4F\x5B\xB9\xDA\x17\xDD\x09\x38\x1B\xD2\x3B\xE4\x3E'
|
|
b'\xCA\x8C\x75\x39\xF3\x01\xFC\x8A\x8C\xD5\xD5\x27\x4C\x3E\x76\x99'
|
|
b'\xDB\xDC\x71\x1C\x97\xA7\xAA\x91\xE2\xC5\x0A\x82\xBD\x0B\x10\x34'
|
|
b'\xF0\xDF\x49\x3D\xEC\x16\x36\x24\x27\xE5\x8A\xCC\xE7\xF6\xCE\x0F'
|
|
b'\x9B\xCC\x61\x7B\xBD\x8C\x90\xD0\x09\x4A\x27\x03\xBA\x0D\x09\xEB'
|
|
b'\x19\xD1\x00\x5F\x2F\xB2\x65\x52\x6A\xAC\x75\xAF\x32\xF8\xBC\x78'
|
|
b'\x2C\xDE\xD2\xA5\x7F\x81\x1E\x03\xEA\xF6\x7A\x94\x4D\xE5\xE7\x84'
|
|
b'\x13\xDC\xA8\xF2\x32\xD0\x74\xE6\xDC\xEA\x4C\xEC\x9F\x02\x03\x01'
|
|
b'\x00\x01')
|
|
|
|
key_material = KeyMaterial(key_data)
|
|
key_value = KeyValue(key_material)
|
|
|
|
algorithm_value = CryptoAlgorithmEnum.RSA
|
|
cryptographic_algorithm = CryptographicAlgorithm(algorithm_value)
|
|
cryptographic_length = CryptographicLength(2048)
|
|
|
|
key_block = KeyBlock(
|
|
key_format_type=key_format_type,
|
|
key_compression_type=None,
|
|
key_value=key_value,
|
|
cryptographic_algorithm=cryptographic_algorithm,
|
|
cryptographic_length=cryptographic_length,
|
|
key_wrapping_data=None)
|
|
pub_secret = PublicKey(key_block)
|
|
|
|
pub_key_result = self.client.register(pub_key_object_type,
|
|
public_template_attribute,
|
|
pub_secret, credential=None)
|
|
self._check_result_status(pub_key_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
# Check that the returned key bytes match what was provided
|
|
pub_uuid = pub_key_result.uuid.value
|
|
pub_key_result = self.client.get(uuid=pub_uuid, credential=None)
|
|
self._check_result_status(pub_key_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
|
|
self._check_object_type(pub_key_result.object_type.value, ObjectType,
|
|
ObjectType.PUBLIC_KEY)
|
|
self._check_uuid(pub_key_result.uuid.value, str)
|
|
|
|
# Check the secret type
|
|
pub_secret = pub_key_result.secret
|
|
pub_expected = PublicKey
|
|
self.assertIsInstance(pub_secret, pub_expected)
|
|
|
|
pub_key_block = pub_key_result.secret.key_block
|
|
pub_key_value = pub_key_block.key_value
|
|
pub_key_material = pub_key_value.key_material
|
|
|
|
expected = key_data
|
|
pub_observed = pub_key_material.value
|
|
self.assertEqual(expected, pub_observed)
|
|
|
|
self.logger.debug('Destroying key: ' + key_name + " Public" +
|
|
'\nWith " "UUID: ' + pub_key_result.uuid.value)
|
|
pub_result = self.client.destroy(pub_key_result.uuid.value)
|
|
|
|
self._check_result_status(pub_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
self._check_uuid(pub_result.uuid.value, str)
|
|
|
|
pub_key_destroyed_result = self.client.get(uuid=pub_uuid,
|
|
credential=None)
|
|
self._check_result_status(pub_key_destroyed_result, ResultStatus,
|
|
ResultStatus.OPERATION_FAILED)
|
|
expected = ResultReason
|
|
pub_observed = type(pub_key_destroyed_result.result_reason.value)
|
|
|
|
self.assertEqual(expected, pub_observed)
|
|
|
|
def test_cert_register_get_destroy(self):
|
|
"""
|
|
Tests that certificates are properly registered, retrieved,
|
|
and destroyed.
|
|
"""
|
|
|
|
# properties copied from test case example :
|
|
# http://docs.oasis-open.org/kmip/testcases/v1.1/cn01/kmip-testcases-v1.1-cn01.html#_Toc333488807
|
|
|
|
cert_obj_type = ObjectType.CERTIFICATE
|
|
|
|
mask_flags = [CryptographicUsageMask.SIGN,
|
|
CryptographicUsageMask.VERIFY]
|
|
attribute_type = AttributeType.CRYPTOGRAPHIC_USAGE_MASK
|
|
usage_mask = self.attr_factory.create_attribute(attribute_type,
|
|
mask_flags)
|
|
|
|
name = Attribute.AttributeName('Name')
|
|
cert_name = 'Integration Test - Register-Get-Destroy Certificate'
|
|
|
|
cert_name_value = Name.NameValue(cert_name)
|
|
|
|
name_type = Name.NameType(NameType.UNINTERPRETED_TEXT_STRING)
|
|
cert_value = Name(name_value=cert_name_value, name_type=name_type)
|
|
|
|
cert_name_attr = Attribute(attribute_name=name,
|
|
attribute_value=cert_value)
|
|
|
|
cert_attributes = [usage_mask, cert_name_attr]
|
|
|
|
cert_template_attribute = TemplateAttribute(
|
|
attributes=cert_attributes)
|
|
|
|
cert_format_type = CertificateTypeEnum.X_509
|
|
|
|
cert_data = (
|
|
b'\x30\x82\x03\x12\x30\x82\x01\xFA\xA0\x03\x02\x01\x02\x02\x01\x01'
|
|
b'\x30\x0D\x06\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x05\x05\x00\x30'
|
|
b'\x3B\x31\x0B\x30\x09\x06\x03\x55\x04\x06\x13\x02\x55\x53\x31\x0D'
|
|
b'\x30\x0B\x06\x03\x55\x04\x0A\x13\x04\x54\x45\x53\x54\x31\x0E\x30'
|
|
b'\x0C\x06\x03\x55\x04\x0B\x13\x05\x4F\x41\x53\x49\x53\x31\x0D\x30'
|
|
b'\x0B\x06\x03\x55\x04\x03\x13\x04\x4B\x4D\x49\x50\x30\x1E\x17\x0D'
|
|
b'\x31\x30\x31\x31\x30\x31\x32\x33\x35\x39\x35\x39\x5A\x17\x0D\x32'
|
|
b'\x30\x31\x31\x30\x31\x32\x33\x35\x39\x35\x39\x5A\x30\x3B\x31\x0B'
|
|
b'\x30\x09\x06\x03\x55\x04\x06\x13\x02\x55\x53\x31\x0D\x30\x0B\x06'
|
|
b'\x03\x55\x04\x0A\x13\x04\x54\x45\x53\x54\x31\x0E\x30\x0C\x06\x03'
|
|
b'\x55\x04\x0B\x13\x05\x4F\x41\x53\x49\x53\x31\x0D\x30\x0B\x06\x03'
|
|
b'\x55\x04\x03\x13\x04\x4B\x4D\x49\x50\x30\x82\x01\x22\x30\x0D\x06'
|
|
b'\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x01\x05\x00\x03\x82\x01\x0F'
|
|
b'\x00\x30\x82\x01\x0A\x02\x82\x01\x01\x00\xAB\x7F\x16\x1C\x00\x42'
|
|
b'\x49\x6C\xCD\x6C\x6D\x4D\xAD\xB9\x19\x97\x34\x35\x35\x77\x76\x00'
|
|
b'\x3A\xCF\x54\xB7\xAF\x1E\x44\x0A\xFB\x80\xB6\x4A\x87\x55\xF8\x00'
|
|
b'\x2C\xFE\xBA\x6B\x18\x45\x40\xA2\xD6\x60\x86\xD7\x46\x48\x34\x6D'
|
|
b'\x75\xB8\xD7\x18\x12\xB2\x05\x38\x7C\x0F\x65\x83\xBC\x4D\x7D\xC7'
|
|
b'\xEC\x11\x4F\x3B\x17\x6B\x79\x57\xC4\x22\xE7\xD0\x3F\xC6\x26\x7F'
|
|
b'\xA2\xA6\xF8\x9B\x9B\xEE\x9E\x60\xA1\xD7\xC2\xD8\x33\xE5\xA5\xF4'
|
|
b'\xBB\x0B\x14\x34\xF4\xE7\x95\xA4\x11\x00\xF8\xAA\x21\x49\x00\xDF'
|
|
b'\x8B\x65\x08\x9F\x98\x13\x5B\x1C\x67\xB7\x01\x67\x5A\xBD\xBC\x7D'
|
|
b'\x57\x21\xAA\xC9\xD1\x4A\x7F\x08\x1F\xCE\xC8\x0B\x64\xE8\xA0\xEC'
|
|
b'\xC8\x29\x53\x53\xC7\x95\x32\x8A\xBF\x70\xE1\xB4\x2E\x7B\xB8\xB7'
|
|
b'\xF4\xE8\xAC\x8C\x81\x0C\xDB\x66\xE3\xD2\x11\x26\xEB\xA8\xDA\x7D'
|
|
b'\x0C\xA3\x41\x42\xCB\x76\xF9\x1F\x01\x3D\xA8\x09\xE9\xC1\xB7\xAE'
|
|
b'\x64\xC5\x41\x30\xFB\xC2\x1D\x80\xE9\xC2\xCB\x06\xC5\xC8\xD7\xCC'
|
|
b'\xE8\x94\x6A\x9A\xC9\x9B\x1C\x28\x15\xC3\x61\x2A\x29\xA8\x2D\x73'
|
|
b'\xA1\xF9\x93\x74\xFE\x30\xE5\x49\x51\x66\x2A\x6E\xDA\x29\xC6\xFC'
|
|
b'\x41\x13\x35\xD5\xDC\x74\x26\xB0\xF6\x05\x02\x03\x01\x00\x01\xA3'
|
|
b'\x21\x30\x1F\x30\x1D\x06\x03\x55\x1D\x0E\x04\x16\x04\x14\x04\xE5'
|
|
b'\x7B\xD2\xC4\x31\xB2\xE8\x16\xE1\x80\xA1\x98\x23\xFA\xC8\x58\x27'
|
|
b'\x3F\x6B\x30\x0D\x06\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x05\x05'
|
|
b'\x00\x03\x82\x01\x01\x00\xA8\x76\xAD\xBC\x6C\x8E\x0F\xF0\x17\x21'
|
|
b'\x6E\x19\x5F\xEA\x76\xBF\xF6\x1A\x56\x7C\x9A\x13\xDC\x50\xD1\x3F'
|
|
b'\xEC\x12\xA4\x27\x3C\x44\x15\x47\xCF\xAB\xCB\x5D\x61\xD9\x91\xE9'
|
|
b'\x66\x31\x9D\xF7\x2C\x0D\x41\xBA\x82\x6A\x45\x11\x2F\xF2\x60\x89'
|
|
b'\xA2\x34\x4F\x4D\x71\xCF\x7C\x92\x1B\x4B\xDF\xAE\xF1\x60\x0D\x1B'
|
|
b'\xAA\xA1\x53\x36\x05\x7E\x01\x4B\x8B\x49\x6D\x4F\xAE\x9E\x8A\x6C'
|
|
b'\x1D\xA9\xAE\xB6\xCB\xC9\x60\xCB\xF2\xFA\xE7\x7F\x58\x7E\xC4\xBB'
|
|
b'\x28\x20\x45\x33\x88\x45\xB8\x8D\xD9\xAE\xEA\x53\xE4\x82\xA3\x6E'
|
|
b'\x73\x4E\x4F\x5F\x03\xB9\xD0\xDF\xC4\xCA\xFC\x6B\xB3\x4E\xA9\x05'
|
|
b'\x3E\x52\xBD\x60\x9E\xE0\x1E\x86\xD9\xB0\x9F\xB5\x11\x20\xC1\x98'
|
|
b'\x34\xA9\x97\xB0\x9C\xE0\x8D\x79\xE8\x13\x11\x76\x2F\x97\x4B\xB1'
|
|
b'\xC8\xC0\x91\x86\xC4\xD7\x89\x33\xE0\xDB\x38\xE9\x05\x08\x48\x77'
|
|
b'\xE1\x47\xC7\x8A\xF5\x2F\xAE\x07\x19\x2F\xF1\x66\xD1\x9F\xA9\x4A'
|
|
b'\x11\xCC\x11\xB2\x7E\xD0\x50\xF7\xA2\x7F\xAE\x13\xB2\x05\xA5\x74'
|
|
b'\xC4\xEE\x00\xAA\x8B\xD6\x5D\x0D\x70\x57\xC9\x85\xC8\x39\xEF\x33'
|
|
b'\x6A\x44\x1E\xD5\x3A\x53\xC6\xB6\xB6\x96\xF1\xBD\xEB\x5F\x7E\xA8'
|
|
b'\x11\xEB\xB2\x5A\x7F\x86')
|
|
|
|
cert_obj = Certificate(cert_format_type, cert_data)
|
|
|
|
cert_result = self.client.register(cert_obj_type,
|
|
cert_template_attribute,
|
|
cert_obj, credential=None)
|
|
|
|
self._check_result_status(cert_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
|
|
self._check_uuid(cert_result.uuid.value, str)
|
|
|
|
# Check that the returned key bytes match what was provided
|
|
cert_uuid = cert_result.uuid.value
|
|
|
|
cert_result = self.client.get(uuid=cert_uuid, credential=None)
|
|
|
|
self._check_result_status(cert_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
|
|
self._check_object_type(cert_result.object_type.value, ObjectType,
|
|
ObjectType.CERTIFICATE)
|
|
|
|
self._check_uuid(cert_result.uuid.value, str)
|
|
|
|
# Check the secret type
|
|
cert_secret = cert_result.secret
|
|
|
|
cert_secret_expected = Certificate
|
|
|
|
self.assertIsInstance(cert_secret, cert_secret_expected)
|
|
|
|
cert_material = cert_result.secret.certificate_value.value
|
|
|
|
expected = cert_data
|
|
|
|
self.assertEqual(expected, cert_material)
|
|
|
|
self.logger.debug('Destroying cert: ' + cert_name +
|
|
'\nWith " "UUID: ' + cert_result.uuid.value)
|
|
|
|
cert_result = self.client.destroy(cert_result.uuid.value)
|
|
|
|
self._check_result_status(cert_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
|
|
self._check_uuid(cert_result.uuid.value, str)
|
|
|
|
# Verify the secret was destroyed
|
|
cert_result_destroyed_result = self.client.get(uuid=cert_uuid,
|
|
credential=None)
|
|
|
|
self._check_result_status(cert_result_destroyed_result, ResultStatus,
|
|
ResultStatus.OPERATION_FAILED)
|
|
|
|
expected = ResultReason
|
|
cert_observed = type(cert_result_destroyed_result.result_reason.value)
|
|
|
|
self.assertEqual(expected, cert_observed)
|
|
|
|
def test_passphrase_register_get_destroy(self):
|
|
"""
|
|
Tests that passphrases can be properly registered, retrieved,
|
|
and destroyed
|
|
"""
|
|
|
|
# properties copied from test case example :
|
|
# http://docs.oasis-open.org/kmip/testcases/v1.1/cn01/kmip-testcases-v1.1-cn01.html#_Toc333488777
|
|
|
|
pass_obj_type = ObjectType.SECRET_DATA
|
|
|
|
mask_flags = [CryptographicUsageMask.VERIFY]
|
|
attribute_type = AttributeType.CRYPTOGRAPHIC_USAGE_MASK
|
|
usage_mask = self.attr_factory.create_attribute(attribute_type,
|
|
mask_flags)
|
|
|
|
name = Attribute.AttributeName('Name')
|
|
pass_name = 'Integration Test - Register-Get-Destroy Passphrase'
|
|
|
|
pass_name_value = Name.NameValue(pass_name)
|
|
|
|
name_type = Name.NameType(NameType.UNINTERPRETED_TEXT_STRING)
|
|
pass_value = Name(name_value=pass_name_value, name_type=name_type)
|
|
|
|
pass_name_attr = Attribute(attribute_name=name,
|
|
attribute_value=pass_value)
|
|
|
|
pass_attributes = [usage_mask, pass_name_attr]
|
|
|
|
pass_template_attribute = TemplateAttribute(
|
|
attributes=pass_attributes)
|
|
|
|
pass_format_type = SecretData.SecretDataType(
|
|
SecretDataType.PASSWORD)
|
|
|
|
key_format_type = KeyFormatType(KeyFormatTypeEnum.OPAQUE)
|
|
key_data = b'\x70\x65\x65\x6b\x2d\x61\x2d\x62\x6f\x6f\x21\x21'
|
|
|
|
key_material = KeyMaterial(key_data)
|
|
key_value = KeyValue(key_material)
|
|
|
|
key_block = KeyBlock(
|
|
key_format_type=key_format_type,
|
|
key_compression_type=None,
|
|
key_value=key_value,
|
|
key_wrapping_data=None)
|
|
|
|
pass_obj = SecretData(secret_data_type=pass_format_type,
|
|
key_block=key_block)
|
|
|
|
pass_result = self.client.register(pass_obj_type,
|
|
pass_template_attribute,
|
|
pass_obj, credential=None)
|
|
|
|
self._check_result_status(pass_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
|
|
self._check_uuid(pass_result.uuid.value, str)
|
|
|
|
# Check that the returned key bytes match what was provided
|
|
pass_uuid = pass_result.uuid.value
|
|
|
|
pass_result = self.client.get(uuid=pass_uuid, credential=None)
|
|
|
|
self._check_result_status(pass_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
|
|
self._check_object_type(pass_result.object_type.value, ObjectType,
|
|
ObjectType.SECRET_DATA)
|
|
|
|
self._check_uuid(pass_result.uuid.value, str)
|
|
|
|
# Check the secret type
|
|
pass_secret = pass_result.secret
|
|
|
|
pass_secret_expected = SecretData
|
|
|
|
self.assertIsInstance(pass_secret, pass_secret_expected)
|
|
|
|
pass_material = pass_result.secret.key_block.key_value.key_material\
|
|
.value
|
|
|
|
expected = key_data
|
|
|
|
self.assertEqual(expected, pass_material)
|
|
|
|
self.logger.debug('Destroying cert: ' + pass_name +
|
|
'\nWith " "UUID: ' + pass_result.uuid.value)
|
|
|
|
pass_result = self.client.destroy(pass_result.uuid.value)
|
|
|
|
self._check_result_status(pass_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
|
|
self._check_uuid(pass_result.uuid.value, str)
|
|
|
|
# Verify the secret was destroyed
|
|
pass_result_destroyed_result = self.client.get(uuid=pass_uuid,
|
|
credential=None)
|
|
|
|
self._check_result_status(pass_result_destroyed_result, ResultStatus,
|
|
ResultStatus.OPERATION_FAILED)
|
|
|
|
expected = ResultReason
|
|
pass_observed = type(pass_result_destroyed_result.result_reason.value)
|
|
|
|
self.assertEqual(expected, pass_observed)
|
|
|
|
def test_opaque_data_register_get_destroy(self):
|
|
"""
|
|
Tests that opaque objects can be properly registered, retrieved,
|
|
and destroyed
|
|
"""
|
|
|
|
opaque_obj_type = ObjectType.OPAQUE_DATA
|
|
opaque_obj_data_type = OpaqueObject.OpaqueDataType(OpaqueDataType.NONE)
|
|
|
|
name = Attribute.AttributeName('Name')
|
|
opaque_obj_name = 'Integration Test - Register-Get-Destroy Opaque Data'
|
|
|
|
opaque_obj_name_value = Name.NameValue(opaque_obj_name)
|
|
|
|
name_type = Name.NameType(NameType.UNINTERPRETED_TEXT_STRING)
|
|
opaque_obj_value = Name(name_value=opaque_obj_name_value,
|
|
name_type=name_type)
|
|
|
|
opaque_obj_name_attr = Attribute(attribute_name=name,
|
|
attribute_value=opaque_obj_value)
|
|
|
|
opaque_obj_attributes = [opaque_obj_name_attr]
|
|
|
|
opaque_obj_template_attribute = TemplateAttribute(
|
|
attributes=opaque_obj_attributes)
|
|
|
|
opaque_obj_data = OpaqueObject.OpaqueDataValue((
|
|
b'\x30\x82\x03\x12\x30\x82\x01\xFA\xA0\x03\x02\x01\x02\x02\x01\x01'
|
|
b'\x30\x0D\x06\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x05\x05\x00\x30'
|
|
b'\x3B\x31\x0B\x30\x09\x06\x03\x55\x04\x06\x13\x02\x55\x53\x31\x0D'
|
|
b'\x30\x0B\x06\x03\x55\x04\x0A\x13\x04\x54\x45\x53\x54\x31\x0E\x30'
|
|
))
|
|
|
|
opaque_obj = OpaqueObject(opaque_data_type=opaque_obj_data_type,
|
|
opaque_data_value=opaque_obj_data)
|
|
|
|
opaque_obj_result = self.client.register(opaque_obj_type,
|
|
opaque_obj_template_attribute,
|
|
opaque_obj, credential=None)
|
|
|
|
self._check_result_status(opaque_obj_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
|
|
self._check_uuid(opaque_obj_result.uuid.value, str)
|
|
|
|
# Check that the returned key bytes match what was provided
|
|
opaque_obj_uuid = opaque_obj_result.uuid.value
|
|
|
|
opaque_obj_result = self.client.get(uuid=opaque_obj_uuid,
|
|
credential=None)
|
|
|
|
self._check_result_status(opaque_obj_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
|
|
self._check_object_type(
|
|
opaque_obj_result.object_type.value, ObjectType,
|
|
ObjectType.OPAQUE_DATA)
|
|
|
|
self._check_uuid(opaque_obj_result.uuid.value, str)
|
|
|
|
# Check the secret type
|
|
opaque_obj_secret = opaque_obj_result.secret
|
|
|
|
opaque_obj_secret_expected = OpaqueObject
|
|
|
|
self.assertIsInstance(opaque_obj_secret, opaque_obj_secret_expected)
|
|
|
|
opaque_obj_material = opaque_obj_result.secret.opaque_data_value.value
|
|
expected = opaque_obj_data.value
|
|
|
|
self.assertEqual(expected, opaque_obj_material)
|
|
|
|
self.logger.debug('Destroying opaque object: ' + opaque_obj_name +
|
|
'\nWith " "UUID: ' + opaque_obj_result.uuid.value)
|
|
|
|
opaque_obj_result = self.client.destroy(opaque_obj_result.uuid.value)
|
|
|
|
self._check_result_status(opaque_obj_result, ResultStatus,
|
|
ResultStatus.SUCCESS)
|
|
|
|
self._check_uuid(opaque_obj_result.uuid.value, str)
|
|
|
|
# Verify the secret was destroyed
|
|
opaque_obj_result_destroyed_result = self.client.get(
|
|
uuid=opaque_obj_uuid, credential=None)
|
|
|
|
self._check_result_status(opaque_obj_result_destroyed_result,
|
|
ResultStatus, ResultStatus.OPERATION_FAILED)
|
|
|
|
expected = ResultReason
|
|
opaque_obj_observed = \
|
|
type(opaque_obj_result_destroyed_result.result_reason.value)
|
|
|
|
self.assertEqual(expected, opaque_obj_observed)
|
|
|
|
def test_symmetric_key_create_getattributelist_destroy(self):
|
|
"""
|
|
Test that the GetAttributeList operation works for a newly created key.
|
|
"""
|
|
key_name = 'Integration Test - Create-GetAttributeList-Destroy Key'
|
|
result = self._create_symmetric_key(key_name=key_name)
|
|
uid = result.uuid.value
|
|
|
|
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)
|
|
self.assertEqual(ObjectType.SYMMETRIC_KEY, result.object_type.value)
|
|
self.assertIsInstance(uid, str)
|
|
|
|
try:
|
|
result = self.client.get_attribute_list(uid)
|
|
|
|
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)
|
|
self.assertIsInstance(result.uid, str)
|
|
self.assertIsInstance(result.names, list)
|
|
|
|
for name in result.names:
|
|
self.assertIsInstance(name, str)
|
|
|
|
expected = [
|
|
'Cryptographic Algorithm',
|
|
'Cryptographic Length',
|
|
'Cryptographic Usage Mask',
|
|
'Unique Identifier',
|
|
'Object Type']
|
|
for name in expected:
|
|
self.assertIn(name, result.names)
|
|
|
|
finally:
|
|
result = self.client.destroy(uid)
|
|
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)
|
|
|
|
result = self.client.get(uuid=result.uuid.value, credential=None)
|
|
|
|
self.assertEqual(
|
|
ResultStatus.OPERATION_FAILED, result.result_status.value)
|