Updated client integration tests, Added test for Application Specific Information

This commit is contained in:
Grace Lombardi 2022-07-12 13:54:51 -04:00 committed by arp102
parent 20f20b0e29
commit 1773fba67c
5 changed files with 72 additions and 26 deletions

View File

@ -12,10 +12,10 @@ if [[ "${RUN_INTEGRATION_TESTS}" == "1" ]]; then
sudo cp ./.travis/pykmip.conf /etc/pykmip/pykmip.conf sudo cp ./.travis/pykmip.conf /etc/pykmip/pykmip.conf
sudo cp ./.travis/server.conf /etc/pykmip/server.conf sudo cp ./.travis/server.conf /etc/pykmip/server.conf
sudo cp ./.travis/policy.json /etc/pykmip/policies/policy.json sudo cp ./.travis/policy.json /etc/pykmip/policies/policy.json
sudo mkdir /var/log/pykmip sudo mkdir -p /var/log/pykmip
sudo chmod 777 /var/log/pykmip sudo chmod 777 /var/log/pykmip
python ./bin/run_server.py & sudo python3 ./bin/run_server.py &
tox -e integration -- --config client sudo tox -e integration -- --config client
elif [[ "${RUN_INTEGRATION_TESTS}" == "2" ]]; then elif [[ "${RUN_INTEGRATION_TESTS}" == "2" ]]; then
# Set up the SLUGS instance # Set up the SLUGS instance
cp -r ./.travis/functional/slugs /tmp/ cp -r ./.travis/functional/slugs /tmp/
@ -23,14 +23,14 @@ elif [[ "${RUN_INTEGRATION_TESTS}" == "2" ]]; then
# Set up the PyKMIP server # Set up the PyKMIP server
cp -r ./.travis/functional/pykmip /tmp/ cp -r ./.travis/functional/pykmip /tmp/
python ./bin/create_certificates.py sudo python3 ./bin/create_certificates.py
mv *.pem /tmp/pykmip/certs/ mv *.pem /tmp/pykmip/certs/
sudo mkdir /var/log/pykmip sudo mkdir -p /var/log/pykmip
sudo chmod 777 /var/log/pykmip sudo chmod 777 /var/log/pykmip
pykmip-server -f /tmp/pykmip/server.conf -l /tmp/pykmip/server.log & sudo pykmip-server -f /tmp/pykmip/server.conf -l /tmp/pykmip/server.log &
# Run the functional tests # Run the functional tests
tox -e functional -- --config-file /tmp/pykmip/client.conf sudo tox -e functional -- --config-file /tmp/pykmip/client.conf
else else
tox sudo tox
fi fi

View File

@ -4,6 +4,7 @@ port=5696
certificate_path=/etc/pykmip/certs/cert.pem certificate_path=/etc/pykmip/certs/cert.pem
key_path=/etc/pykmip/certs/key.pem key_path=/etc/pykmip/certs/key.pem
ca_path=/etc/pykmip/certs/cert.pem ca_path=/etc/pykmip/certs/cert.pem
auth_suite=Basic auth_suite=TLS1.2
enable_tls_client_auth=False enable_tls_client_auth=False
policy_path=/etc/pykmip/policies/ policy_path=/etc/pykmip/policies/
database_path=/tmp/pykmip.db

View File

@ -17,6 +17,7 @@ from kmip.core import attributes
from kmip.core import enums from kmip.core import enums
from kmip.core import primitives from kmip.core import primitives
from kmip.core import utils from kmip.core import utils
import json
class AttributeValueFactory(object): class AttributeValueFactory(object):
@ -274,10 +275,11 @@ class AttributeValueFactory(object):
def _create_application_specific_information(self, info): def _create_application_specific_information(self, info):
if info: if info:
return attributes.ApplicationSpecificInformation( for k,v in info.items():
application_namespace=info.get("application_namespace"), return attributes.ApplicationSpecificInformation(
application_data=info.get("application_data") k,
) v
)
else: else:
return attributes.ApplicationSpecificInformation() return attributes.ApplicationSpecificInformation()

View File

@ -25,6 +25,7 @@ from kmip.core.factories import attributes
from kmip.core.attributes import CryptographicParameters from kmip.core.attributes import CryptographicParameters
from kmip.core.attributes import DerivationParameters from kmip.core.attributes import DerivationParameters
from kmip.core.messages import payloads from kmip.core.messages import payloads
from kmip.pie import exceptions from kmip.pie import exceptions
@ -559,8 +560,15 @@ class ProxyKmipClient(object):
if hasattr(managed_object, '_application_specific_informations'): if hasattr(managed_object, '_application_specific_informations'):
if managed_object._application_specific_informations: if managed_object._application_specific_informations:
for attr in managed_object._application_specific_informations: for attr in managed_object._application_specific_informations:
object_attributes.append(attr) app_dict = {}
app_dict[attr] = managed_object._application_specific_informations[attr]
attribute = self.attribute_factory.create_attribute(
name=enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION,
value=app_dict,
index=1
)
object_attributes.append(attribute)
template = cobjects.TemplateAttribute(attributes=object_attributes) template = cobjects.TemplateAttribute(attributes=object_attributes)
object_type = managed_object.object_type object_type = managed_object.object_type

View File

@ -39,7 +39,10 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
uuids = self.client.locate() uuids = self.client.locate()
for uuid in uuids: for uuid in uuids:
self.client.destroy(uid=uuid) try:
self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, uuid)
finally:
self.client.destroy(uid=uuid)
def test_symmetric_key_create_get_destroy(self): def test_symmetric_key_create_get_destroy(self):
""" """
@ -57,6 +60,7 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
enums.CryptographicAlgorithm.AES) enums.CryptographicAlgorithm.AES)
self.assertEqual(key.cryptographic_length, 256) self.assertEqual(key.cryptographic_length, 256)
finally: finally:
self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, uid)
self.client.destroy(uid) self.client.destroy(uid)
self.assertRaises( self.assertRaises(
exceptions.KmipOperationFailure, self.client.get, uid) exceptions.KmipOperationFailure, self.client.get, uid)
@ -100,10 +104,8 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
self.assertNotEqual(unwrapped_key.value, wrapped_key.value) self.assertNotEqual(unwrapped_key.value, wrapped_key.value)
self.client.revoke( self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, key_id)
enums.RevocationReasonCode.CESSATION_OF_OPERATION, self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, wrapping_id)
wrapping_id
)
self.client.destroy(key_id) self.client.destroy(key_id)
self.client.destroy(wrapping_id) self.client.destroy(wrapping_id)
@ -131,6 +133,7 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
self.assertEqual( self.assertEqual(
result, key, "expected {0}\nobserved {1}".format(result, key)) result, key, "expected {0}\nobserved {1}".format(result, key))
finally: finally:
self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, uid)
self.client.destroy(uid) self.client.destroy(uid)
self.assertRaises( self.assertRaises(
exceptions.KmipOperationFailure, self.client.get, uid) exceptions.KmipOperationFailure, self.client.get, uid)
@ -181,6 +184,30 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
enums.EncodingOption.NO_ENCODING, enums.EncodingOption.NO_ENCODING,
key_wrapping_data.get('encoding_option') key_wrapping_data.get('encoding_option')
) )
self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, key_id)
self.client.destroy(key_id)
def test_register_app_specific_get(self):
"""
Test that a key with app specifc info can be registered with the server and that its
metadata is retrieved with the get operation.
"""
key = objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
128,
(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E'
b'\x0F'),
app_specific_info={
'application_namespace': 'Testing',
'application_data': 'Testing2'
}
)
key_id = self.client.register(key)
result = self.client.get(key_id)
app_specific_info = result.app_specific_info
self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, key_id)
self.client.destroy(key_id)
def test_asymmetric_key_pair_create_get_destroy(self): def test_asymmetric_key_pair_create_get_destroy(self):
""" """
@ -211,6 +238,7 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
enums.CryptographicAlgorithm.RSA) enums.CryptographicAlgorithm.RSA)
self.assertEqual(private_key.cryptographic_length, 2048) self.assertEqual(private_key.cryptographic_length, 2048)
finally: finally:
self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, public_uid)
self.client.destroy(public_uid) self.client.destroy(public_uid)
self.assertRaises( self.assertRaises(
exceptions.KmipOperationFailure, self.client.get, public_uid) exceptions.KmipOperationFailure, self.client.get, public_uid)
@ -218,6 +246,7 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
exceptions.KmipOperationFailure, self.client.destroy, exceptions.KmipOperationFailure, self.client.destroy,
public_uid) public_uid)
self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, private_uid)
self.client.destroy(private_uid) self.client.destroy(private_uid)
self.assertRaises( self.assertRaises(
exceptions.KmipOperationFailure, self.client.get, private_uid) exceptions.KmipOperationFailure, self.client.get, private_uid)
@ -264,6 +293,7 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
self.assertEqual( self.assertEqual(
result, key, "expected {0}\nobserved {1}".format(result, key)) result, key, "expected {0}\nobserved {1}".format(result, key))
finally: finally:
self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, uid)
self.client.destroy(uid) self.client.destroy(uid)
self.assertRaises( self.assertRaises(
exceptions.KmipOperationFailure, self.client.get, uid) exceptions.KmipOperationFailure, self.client.get, uid)
@ -371,6 +401,7 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
self.assertEqual( self.assertEqual(
result, key, "expected {0}\nobserved {1}".format(result, key)) result, key, "expected {0}\nobserved {1}".format(result, key))
finally: finally:
self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, uid)
self.client.destroy(uid) self.client.destroy(uid)
self.assertRaises( self.assertRaises(
exceptions.KmipOperationFailure, self.client.get, uid) exceptions.KmipOperationFailure, self.client.get, uid)
@ -449,6 +480,7 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
result, cert, "expected {0}\nobserved {1}".format( result, cert, "expected {0}\nobserved {1}".format(
result, cert)) result, cert))
finally: finally:
self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, uid)
self.client.destroy(uid) self.client.destroy(uid)
self.assertRaises( self.assertRaises(
exceptions.KmipOperationFailure, self.client.get, uid) exceptions.KmipOperationFailure, self.client.get, uid)
@ -476,6 +508,7 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
result, secret, "expected {0}\nobserved {1}".format( result, secret, "expected {0}\nobserved {1}".format(
result, secret)) result, secret))
finally: finally:
self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, uid)
self.client.destroy(uid) self.client.destroy(uid)
self.assertRaises( self.assertRaises(
exceptions.KmipOperationFailure, self.client.get, uid) exceptions.KmipOperationFailure, self.client.get, uid)
@ -570,6 +603,8 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
attribute.attribute_name.value attribute.attribute_name.value
) )
self.assertEqual(160, attribute.attribute_value.value) self.assertEqual(160, attribute.attribute_value.value)
self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, key_id)
self.client.destroy(key_id)
def test_derive_key_using_encryption(self): def test_derive_key_using_encryption(self):
""" """
@ -798,10 +833,7 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
self.assertEqual(plain_text, result) self.assertEqual(plain_text, result)
# Clean up. # Clean up.
self.client.revoke( self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, key_id)
enums.RevocationReasonCode.CESSATION_OF_OPERATION,
key_id
)
self.client.destroy(key_id) self.client.destroy(key_id)
def test_create_key_pair_sign_signature_verify(self): def test_create_key_pair_sign_signature_verify(self):
@ -858,11 +890,11 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
# Clean up. # Clean up.
self.client.revoke( self.client.revoke(
enums.RevocationReasonCode.CESSATION_OF_OPERATION, enums.RevocationReasonCode.KEY_COMPROMISE,
public_key_id public_key_id
) )
self.client.revoke( self.client.revoke(
enums.RevocationReasonCode.CESSATION_OF_OPERATION, enums.RevocationReasonCode.KEY_COMPROMISE,
private_key_id private_key_id
) )
self.client.destroy(public_key_id) self.client.destroy(public_key_id)
@ -1293,6 +1325,8 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
self.assertEqual(0, len(result)) self.assertEqual(0, len(result))
# Clean up the keys # Clean up the keys
self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, a_id)
self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, b_id)
self.client.destroy(a_id) self.client.destroy(a_id)
self.client.destroy(b_id) self.client.destroy(b_id)
@ -1344,6 +1378,7 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
self.assertEqual(enums.SplitKeyMethod.XOR, result.split_key_method) self.assertEqual(enums.SplitKeyMethod.XOR, result.split_key_method)
self.assertIsNone(result.prime_field_size) self.assertIsNone(result.prime_field_size)
finally: finally:
self.client.revoke(enums.RevocationReasonCode.KEY_COMPROMISE, uid)
self.client.destroy(uid) self.client.destroy(uid)
self.assertRaises( self.assertRaises(
exceptions.KmipOperationFailure, self.client.get, uid) exceptions.KmipOperationFailure, self.client.get, uid)