mirror of
				https://github.com/OpenKMIP/PyKMIP.git
				synced 2025-10-26 01:54:32 +02:00 
			
		
		
		
	This change updates the KMIP object model to support explicitly storing key wrapping data attributes. Key wrapping data is treated externally as a dictionary and is stored as individual fields in the back end. Various unit tests have been updated and added to support these additions.
		
			
				
	
	
		
			722 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			722 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
 | |
| # All Rights Reserved.
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may
 | |
| # not use this file except in compliance with the License. You may obtain
 | |
| # a copy of the License at
 | |
| #
 | |
| #    http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | |
| # License for the specific language governing permissions and limitations
 | |
| # under the License.
 | |
| 
 | |
| import binascii
 | |
| import testtools
 | |
| 
 | |
| from kmip.core import enums
 | |
| from kmip.pie import sqltypes
 | |
| from kmip.pie.objects import ManagedObject, PublicKey
 | |
| from sqlalchemy import create_engine
 | |
| from sqlalchemy.orm import sessionmaker
 | |
| 
 | |
| 
 | |
| class TestPublicKey(testtools.TestCase):
 | |
|     """
 | |
|     Test suite for PublicKey.
 | |
|     """
 | |
|     def setUp(self):
 | |
|         super(TestPublicKey, self).setUp()
 | |
| 
 | |
|         # Key values taken from Sections 8.2 and 13.4 of the KMIP 1.1
 | |
|         # testing documentation.
 | |
|         self.bytes_1024 = (
 | |
|             b'\x30\x81\x9F\x30\x0D\x06\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x01'
 | |
|             b'\x05\x00\x03\x81\x8D\x00\x30\x81\x89\x02\x81\x81\x00\x93\x04\x51'
 | |
|             b'\xC9\xEC\xD9\x4F\x5B\xB9\xDA\x17\xDD\x09\x38\x1B\xD2\x3B\xE4\x3E'
 | |
|             b'\xCA\x8C\x75\x39\xF3\x01\xFC\x8A\x8C\xD5\xD5\x27\x4C\x3E\x76\x99'
 | |
|             b'\xDB\xDC\x71\x1C\x97\xA7\xAA\x91\xE2\xC5\x0A\x82\xBD\x0B\x10\x34'
 | |
|             b'\xF0\xDF\x49\x3D\xEC\x16\x36\x24\x27\xE5\x8A\xCC\xE7\xF6\xCE\x0F'
 | |
|             b'\x9B\xCC\x61\x7B\xBD\x8C\x90\xD0\x09\x4A\x27\x03\xBA\x0D\x09\xEB'
 | |
|             b'\x19\xD1\x00\x5F\x2F\xB2\x65\x52\x6A\xAC\x75\xAF\x32\xF8\xBC\x78'
 | |
|             b'\x2C\xDE\xD2\xA5\x7F\x81\x1E\x03\xEA\xF6\x7A\x94\x4D\xE5\xE7\x84'
 | |
|             b'\x13\xDC\xA8\xF2\x32\xD0\x74\xE6\xDC\xEA\x4C\xEC\x9F\x02\x03\x01'
 | |
|             b'\x00\x01')
 | |
|         self.bytes_2048 = (
 | |
|             b'\x30\x82\x01\x0A\x02\x82\x01\x01\x00\xAB\x7F\x16\x1C\x00\x42\x49'
 | |
|             b'\x6C\xCD\x6C\x6D\x4D\xAD\xB9\x19\x97\x34\x35\x35\x77\x76\x00\x3A'
 | |
|             b'\xCF\x54\xB7\xAF\x1E\x44\x0A\xFB\x80\xB6\x4A\x87\x55\xF8\x00\x2C'
 | |
|             b'\xFE\xBA\x6B\x18\x45\x40\xA2\xD6\x60\x86\xD7\x46\x48\x34\x6D\x75'
 | |
|             b'\xB8\xD7\x18\x12\xB2\x05\x38\x7C\x0F\x65\x83\xBC\x4D\x7D\xC7\xEC'
 | |
|             b'\x11\x4F\x3B\x17\x6B\x79\x57\xC4\x22\xE7\xD0\x3F\xC6\x26\x7F\xA2'
 | |
|             b'\xA6\xF8\x9B\x9B\xEE\x9E\x60\xA1\xD7\xC2\xD8\x33\xE5\xA5\xF4\xBB'
 | |
|             b'\x0B\x14\x34\xF4\xE7\x95\xA4\x11\x00\xF8\xAA\x21\x49\x00\xDF\x8B'
 | |
|             b'\x65\x08\x9F\x98\x13\x5B\x1C\x67\xB7\x01\x67\x5A\xBD\xBC\x7D\x57'
 | |
|             b'\x21\xAA\xC9\xD1\x4A\x7F\x08\x1F\xCE\xC8\x0B\x64\xE8\xA0\xEC\xC8'
 | |
|             b'\x29\x53\x53\xC7\x95\x32\x8A\xBF\x70\xE1\xB4\x2E\x7B\xB8\xB7\xF4'
 | |
|             b'\xE8\xAC\x8C\x81\x0C\xDB\x66\xE3\xD2\x11\x26\xEB\xA8\xDA\x7D\x0C'
 | |
|             b'\xA3\x41\x42\xCB\x76\xF9\x1F\x01\x3D\xA8\x09\xE9\xC1\xB7\xAE\x64'
 | |
|             b'\xC5\x41\x30\xFB\xC2\x1D\x80\xE9\xC2\xCB\x06\xC5\xC8\xD7\xCC\xE8'
 | |
|             b'\x94\x6A\x9A\xC9\x9B\x1C\x28\x15\xC3\x61\x2A\x29\xA8\x2D\x73\xA1'
 | |
|             b'\xF9\x93\x74\xFE\x30\xE5\x49\x51\x66\x2A\x6E\xDA\x29\xC6\xFC\x41'
 | |
|             b'\x13\x35\xD5\xDC\x74\x26\xB0\xF6\x05\x02\x03\x01\x00\x01')
 | |
|         self.engine = create_engine('sqlite:///:memory:', echo=True)
 | |
|         sqltypes.Base.metadata.create_all(self.engine)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         super(TestPublicKey, self).tearDown()
 | |
| 
 | |
|     def test_init(self):
 | |
|         """
 | |
|         Test that a PublicKey object can be instantiated.
 | |
|         """
 | |
|         key = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024)
 | |
| 
 | |
|         self.assertEqual(
 | |
|             key.cryptographic_algorithm, enums.CryptographicAlgorithm.RSA)
 | |
|         self.assertEqual(key.cryptographic_length, 1024)
 | |
|         self.assertEqual(key.value, self.bytes_1024)
 | |
|         self.assertEqual(key.key_format_type, enums.KeyFormatType.X_509)
 | |
|         self.assertEqual(key.cryptographic_usage_masks, list())
 | |
|         self.assertEqual(key.names, ['Public Key'])
 | |
| 
 | |
|     def test_init_with_args(self):
 | |
|         """
 | |
|         Test that a PublicKey object can be instantiated with all arguments.
 | |
|         """
 | |
|         key = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA,
 | |
|             1024,
 | |
|             self.bytes_1024,
 | |
|             enums.KeyFormatType.X_509,
 | |
|             masks=[enums.CryptographicUsageMask.ENCRYPT,
 | |
|                    enums.CryptographicUsageMask.DECRYPT],
 | |
|             name='Test Public Key')
 | |
| 
 | |
|         self.assertEqual(key.cryptographic_algorithm,
 | |
|                          enums.CryptographicAlgorithm.RSA)
 | |
|         self.assertEqual(key.cryptographic_length, 1024)
 | |
|         self.assertEqual(key.value, self.bytes_1024)
 | |
|         self.assertEqual(key.key_format_type, enums.KeyFormatType.X_509)
 | |
|         self.assertEqual(key.cryptographic_usage_masks,
 | |
|                          [enums.CryptographicUsageMask.ENCRYPT,
 | |
|                           enums.CryptographicUsageMask.DECRYPT])
 | |
|         self.assertEqual(key.names, ['Test Public Key'])
 | |
| 
 | |
|     def test_get_object_type(self):
 | |
|         """
 | |
|         Test that the object type can be retrieved from the PublicKey.
 | |
|         """
 | |
|         expected = enums.ObjectType.PUBLIC_KEY
 | |
|         key = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
 | |
|             enums.KeyFormatType.X_509)
 | |
|         observed = key.object_type
 | |
|         self.assertEqual(expected, observed)
 | |
| 
 | |
|     def test_validate_on_invalid_algorithm(self):
 | |
|         """
 | |
|         Test that a TypeError is raised when an invalid algorithm value is
 | |
|         used to construct a PublicKey.
 | |
|         """
 | |
|         args = ('invalid', 1024, self.bytes_1024, enums.KeyFormatType.X_509)
 | |
|         self.assertRaises(TypeError, PublicKey, *args)
 | |
| 
 | |
|     def test_validate_on_invalid_length(self):
 | |
|         """
 | |
|         Test that a TypeError is raised when an invalid length value is used
 | |
|         to construct a PublicKey.
 | |
|         """
 | |
|         args = (enums.CryptographicAlgorithm.RSA, 'invalid', self.bytes_1024,
 | |
|                 enums.KeyFormatType.X_509)
 | |
|         self.assertRaises(TypeError, PublicKey, *args)
 | |
| 
 | |
|     def test_validate_on_invalid_value(self):
 | |
|         """
 | |
|         Test that a TypeError is raised when an invalid value is used to
 | |
|         construct a PublicKey.
 | |
|         """
 | |
|         args = (enums.CryptographicAlgorithm.RSA, 1024, 0,
 | |
|                 enums.KeyFormatType.X_509)
 | |
|         self.assertRaises(TypeError, PublicKey, *args)
 | |
| 
 | |
|     def test_validate_on_invalid_format_type(self):
 | |
|         """
 | |
|         Test that a TypeError is raised when an invalid format type is used to
 | |
|         construct a PublicKey.
 | |
|         """
 | |
|         args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
 | |
|                 'invalid')
 | |
|         self.assertRaises(TypeError, PublicKey, *args)
 | |
| 
 | |
|     def test_validate_on_invalid_format_type_value(self):
 | |
|         """
 | |
|         Test that a ValueError is raised when an invalid format type is used to
 | |
|         construct a PublicKey.
 | |
|         """
 | |
|         args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
 | |
|                 enums.KeyFormatType.OPAQUE)
 | |
|         self.assertRaises(ValueError, PublicKey, *args)
 | |
| 
 | |
|     def test_validate_on_invalid_masks(self):
 | |
|         """
 | |
|         Test that a TypeError is raised when an invalid masks value is used to
 | |
|         construct a PublicKey.
 | |
|         """
 | |
|         args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
 | |
|                 enums.KeyFormatType.X_509)
 | |
|         kwargs = {'masks': 'invalid'}
 | |
|         self.assertRaises(TypeError, PublicKey, *args, **kwargs)
 | |
| 
 | |
|     def test_validate_on_invalid_mask(self):
 | |
|         """
 | |
|         Test that a TypeError is raised when an invalid mask value is used to
 | |
|         construct a PublicKey.
 | |
|         """
 | |
|         args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
 | |
|                 enums.KeyFormatType.X_509)
 | |
|         kwargs = {'masks': ['invalid']}
 | |
|         self.assertRaises(TypeError, PublicKey, *args, **kwargs)
 | |
| 
 | |
|     def test_validate_on_invalid_name(self):
 | |
|         """
 | |
|         Test that a TypeError is raised when an invalid name value is used to
 | |
|         construct a PublicKey.
 | |
|         """
 | |
|         args = (enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
 | |
|                 enums.KeyFormatType.X_509)
 | |
|         kwargs = {'name': 0}
 | |
|         self.assertRaises(TypeError, PublicKey, *args, **kwargs)
 | |
| 
 | |
|     def test_repr(self):
 | |
|         """
 | |
|         Test that repr can be applied to a PublicKey.
 | |
|         """
 | |
|         key = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
 | |
|             enums.KeyFormatType.X_509)
 | |
|         args = "{0}, {1}, {2}, {3}, {4}".format(
 | |
|             "algorithm={0}".format(enums.CryptographicAlgorithm.RSA),
 | |
|             "length={0}".format(1024),
 | |
|             "value={0}".format(binascii.hexlify(self.bytes_1024)),
 | |
|             "format_type={0}".format(enums.KeyFormatType.X_509),
 | |
|             "key_wrapping_data={0}".format({})
 | |
|         )
 | |
|         expected = "PublicKey({0})".format(args)
 | |
|         observed = repr(key)
 | |
|         self.assertEqual(expected, observed)
 | |
| 
 | |
|     def test_str(self):
 | |
|         """
 | |
|         Test that str can be applied to a PublicKey.
 | |
|         """
 | |
|         key = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
 | |
|             enums.KeyFormatType.X_509)
 | |
|         expected = str(binascii.hexlify(self.bytes_1024))
 | |
|         observed = str(key)
 | |
|         self.assertEqual(expected, observed)
 | |
| 
 | |
|     def test_equal_on_equal(self):
 | |
|         """
 | |
|         Test that the equality operator returns True when comparing two
 | |
|         PublicKey objects with the same data.
 | |
|         """
 | |
|         a = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
 | |
|             enums.KeyFormatType.X_509)
 | |
|         b = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
 | |
|             enums.KeyFormatType.X_509)
 | |
|         self.assertTrue(a == b)
 | |
|         self.assertTrue(b == a)
 | |
| 
 | |
|     def test_equal_on_not_equal_algorithm(self):
 | |
|         """
 | |
|         Test that the equality operator returns False when comparing two
 | |
|         PublicKey objects with different data.
 | |
|         """
 | |
|         a = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
 | |
|             enums.KeyFormatType.X_509)
 | |
|         b = PublicKey(
 | |
|             enums.CryptographicAlgorithm.AES, 1024, self.bytes_1024,
 | |
|             enums.KeyFormatType.X_509)
 | |
|         self.assertFalse(a == b)
 | |
|         self.assertFalse(b == a)
 | |
| 
 | |
|     def test_equal_on_not_equal_length(self):
 | |
|         """
 | |
|         Test that the equality operator returns False when comparing two
 | |
|         PublicKey objects with different data.
 | |
|         """
 | |
|         a = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
 | |
|             enums.KeyFormatType.X_509)
 | |
|         b = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_1024,
 | |
|             enums.KeyFormatType.X_509)
 | |
|         self.assertFalse(a == b)
 | |
|         self.assertFalse(b == a)
 | |
| 
 | |
|     def test_equal_on_not_equal_value(self):
 | |
|         """
 | |
|         Test that the equality operator returns False when comparing two
 | |
|         PublicKey objects with different data.
 | |
|         """
 | |
|         a = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
 | |
|             enums.KeyFormatType.X_509)
 | |
|         b = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 1024, self.bytes_2048,
 | |
|             enums.KeyFormatType.X_509)
 | |
|         self.assertFalse(a == b)
 | |
|         self.assertFalse(b == a)
 | |
| 
 | |
|     def test_equal_on_not_equal_format_type(self):
 | |
|         """
 | |
|         Test that the equality operator returns False when comparing two
 | |
|         PublicKey objects with different data.
 | |
|         """
 | |
|         a = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
 | |
|             enums.KeyFormatType.X_509)
 | |
|         b = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
 | |
|             enums.KeyFormatType.PKCS_1)
 | |
|         self.assertFalse(a == b)
 | |
|         self.assertFalse(b == a)
 | |
| 
 | |
|     def test_equal_on_not_equal_key_wrapping_data(self):
 | |
|         """
 | |
|         Test that the equality operator returns False when comparing two
 | |
|         PublicKey objects with different key wrapping data.
 | |
|         """
 | |
|         a = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA,
 | |
|             1024,
 | |
|             self.bytes_1024,
 | |
|             enums.KeyFormatType.X_509,
 | |
|             key_wrapping_data={}
 | |
|         )
 | |
|         b = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA,
 | |
|             1024,
 | |
|             self.bytes_1024,
 | |
|             enums.KeyFormatType.X_509,
 | |
|             key_wrapping_data={
 | |
|                 'wrapping_method': enums.WrappingMethod.ENCRYPT
 | |
|             }
 | |
|         )
 | |
| 
 | |
|         self.assertFalse(a == b)
 | |
|         self.assertFalse(b == a)
 | |
| 
 | |
|     def test_equal_on_type_mismatch(self):
 | |
|         """
 | |
|         Test that the equality operator returns False when comparing a
 | |
|         PublicKey object to a non-PublicKey object.
 | |
|         """
 | |
|         a = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
 | |
|             enums.KeyFormatType.X_509)
 | |
|         b = "invalid"
 | |
|         self.assertFalse(a == b)
 | |
|         self.assertFalse(b == a)
 | |
| 
 | |
|     def test_not_equal_on_equal(self):
 | |
|         """
 | |
|         Test that the inequality operator returns False when comparing
 | |
|         two PublicKey objects with the same internal data.
 | |
|         """
 | |
|         a = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
 | |
|             enums.KeyFormatType.PKCS_1)
 | |
|         b = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
 | |
|             enums.KeyFormatType.PKCS_1)
 | |
|         self.assertFalse(a != b)
 | |
|         self.assertFalse(b != a)
 | |
| 
 | |
|     def test_not_equal_on_not_equal_algorithm(self):
 | |
|         """
 | |
|         Test that the equality operator returns True when comparing two
 | |
|         PublicKey objects with different data.
 | |
|         """
 | |
|         a = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
 | |
|             enums.KeyFormatType.PKCS_1)
 | |
|         b = PublicKey(
 | |
|             enums.CryptographicAlgorithm.AES, 2048, self.bytes_2048,
 | |
|             enums.KeyFormatType.PKCS_1)
 | |
|         self.assertTrue(a != b)
 | |
|         self.assertTrue(b != a)
 | |
| 
 | |
|     def test_not_equal_on_not_equal_length(self):
 | |
|         """
 | |
|         Test that the equality operator returns True when comparing two
 | |
|         PublicKey objects with different data.
 | |
|         """
 | |
|         a = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
 | |
|             enums.KeyFormatType.PKCS_1)
 | |
|         b = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 1024, self.bytes_1024,
 | |
|             enums.KeyFormatType.PKCS_1)
 | |
|         self.assertTrue(a != b)
 | |
|         self.assertTrue(b != a)
 | |
| 
 | |
|     def test_not_equal_on_not_equal_value(self):
 | |
|         """
 | |
|         Test that the equality operator returns True when comparing two
 | |
|         PublicKey objects with different data.
 | |
|         """
 | |
|         a = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
 | |
|             enums.KeyFormatType.PKCS_1)
 | |
|         b = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_1024,
 | |
|             enums.KeyFormatType.PKCS_1)
 | |
|         self.assertTrue(a != b)
 | |
|         self.assertTrue(b != a)
 | |
| 
 | |
|     def test_not_equal_on_not_equal_format_type(self):
 | |
|         """
 | |
|         Test that the equality operator returns True when comparing two
 | |
|         PublicKey objects with different data.
 | |
|         """
 | |
|         a = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
 | |
|             enums.KeyFormatType.PKCS_1)
 | |
|         b = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
 | |
|             enums.KeyFormatType.X_509)
 | |
|         self.assertTrue(a != b)
 | |
|         self.assertTrue(b != a)
 | |
| 
 | |
|     def test_not_equal_on_not_equal_key_wrapping_data(self):
 | |
|         """
 | |
|         Test that the inequality operator returns True when comparing two
 | |
|         PublicKey objects with different key wrapping data.
 | |
|         """
 | |
|         a = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA,
 | |
|             1024,
 | |
|             self.bytes_1024,
 | |
|             enums.KeyFormatType.X_509,
 | |
|             key_wrapping_data={}
 | |
|         )
 | |
|         b = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA,
 | |
|             1024,
 | |
|             self.bytes_1024,
 | |
|             enums.KeyFormatType.X_509,
 | |
|             key_wrapping_data={
 | |
|                 'wrapping_method': enums.WrappingMethod.ENCRYPT
 | |
|             }
 | |
|         )
 | |
| 
 | |
|         self.assertTrue(a != b)
 | |
|         self.assertTrue(b != a)
 | |
| 
 | |
|     def test_not_equal_on_type_mismatch(self):
 | |
|         """
 | |
|         Test that the equality operator returns True when comparing a
 | |
|         PublicKey object to a non-PublicKey object.
 | |
|         """
 | |
|         a = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
 | |
|             enums.KeyFormatType.PKCS_1)
 | |
|         b = "invalid"
 | |
|         self.assertTrue(a != b)
 | |
|         self.assertTrue(b != a)
 | |
| 
 | |
|     def test_save(self):
 | |
|         """
 | |
|         Test that the object can be saved using SQLAlchemy. This will add it to
 | |
|         the database, verify that no exceptions are thrown, and check that its
 | |
|         unique identifier was set.
 | |
|         """
 | |
|         key = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
 | |
|             enums.KeyFormatType.PKCS_1)
 | |
|         Session = sessionmaker(bind=self.engine)
 | |
|         session = Session()
 | |
|         session.add(key)
 | |
|         session.commit()
 | |
|         self.assertIsNotNone(key.unique_identifier)
 | |
| 
 | |
|     def test_get(self):
 | |
|         """
 | |
|         Test that the object can be saved and then retrieved using SQLAlchemy.
 | |
|         This adds is to the database and then retrieves it by ID and verifies
 | |
|         some of the attributes.
 | |
|         """
 | |
|         test_name = 'bowser'
 | |
|         masks = [enums.CryptographicUsageMask.ENCRYPT,
 | |
|                  enums.CryptographicUsageMask.WRAP_KEY]
 | |
|         key = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
 | |
|             enums.KeyFormatType.PKCS_1, masks=masks, name=test_name)
 | |
|         Session = sessionmaker(bind=self.engine)
 | |
|         session = Session()
 | |
|         session.add(key)
 | |
|         session.commit()
 | |
| 
 | |
|         session = Session()
 | |
|         get_obj = session.query(PublicKey).filter(
 | |
|             ManagedObject.unique_identifier == key.unique_identifier
 | |
|             ).one()
 | |
|         session.commit()
 | |
|         self.assertEqual(1, len(get_obj.names))
 | |
|         self.assertEqual([test_name], get_obj.names)
 | |
|         self.assertEqual(enums.ObjectType.PUBLIC_KEY, get_obj.object_type)
 | |
|         self.assertEqual(self.bytes_2048, get_obj.value)
 | |
|         self.assertEqual(enums.CryptographicAlgorithm.RSA,
 | |
|                          get_obj.cryptographic_algorithm)
 | |
|         self.assertEqual(2048, get_obj.cryptographic_length)
 | |
|         self.assertEqual(enums.KeyFormatType.PKCS_1, get_obj.key_format_type)
 | |
|         self.assertEqual(masks, get_obj.cryptographic_usage_masks)
 | |
| 
 | |
|     def test_add_multiple_names(self):
 | |
|         """
 | |
|         Test that multiple names can be added to a managed object. This
 | |
|         verifies a few properties. First this verifies that names can be added
 | |
|         using simple strings. It also verifies that the index for each
 | |
|         subsequent string is set accordingly. Finally this tests that the names
 | |
|         can be saved and retrieved from the database.
 | |
|         """
 | |
|         expected_names = ['bowser', 'frumpy', 'big fat cat']
 | |
|         key = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
 | |
|             enums.KeyFormatType.PKCS_1, name=expected_names[0])
 | |
|         key.names.append(expected_names[1])
 | |
|         key.names.append(expected_names[2])
 | |
|         self.assertEquals(3, key.name_index)
 | |
|         expected_mo_names = list()
 | |
|         for i, name in enumerate(expected_names):
 | |
|             expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
 | |
|         self.assertEquals(expected_mo_names, key._names)
 | |
| 
 | |
|         Session = sessionmaker(bind=self.engine)
 | |
|         session = Session()
 | |
|         session.add(key)
 | |
|         session.commit()
 | |
| 
 | |
|         session = Session()
 | |
|         get_obj = session.query(PublicKey).filter(
 | |
|             ManagedObject.unique_identifier == key.unique_identifier
 | |
|             ).one()
 | |
|         session.commit()
 | |
|         self.assertEquals(expected_mo_names, get_obj._names)
 | |
| 
 | |
|     def test_remove_name(self):
 | |
|         """
 | |
|         Tests that a name can be removed from the list of names. This will
 | |
|         verify that the list of names is correct. It will verify that updating
 | |
|         this object removes the name from the database.
 | |
|         """
 | |
|         names = ['bowser', 'frumpy', 'big fat cat']
 | |
|         remove_index = 1
 | |
|         key = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
 | |
|             enums.KeyFormatType.PKCS_1, name=names[0])
 | |
|         key.names.append(names[1])
 | |
|         key.names.append(names[2])
 | |
|         key.names.pop(remove_index)
 | |
|         self.assertEquals(3, key.name_index)
 | |
| 
 | |
|         expected_names = list()
 | |
|         expected_mo_names = list()
 | |
|         for i, name in enumerate(names):
 | |
|             if i != remove_index:
 | |
|                 expected_names.append(name)
 | |
|                 expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
 | |
|         self.assertEquals(expected_names, key.names)
 | |
|         self.assertEquals(expected_mo_names, key._names)
 | |
| 
 | |
|         Session = sessionmaker(bind=self.engine)
 | |
|         session = Session()
 | |
|         session.add(key)
 | |
|         session.commit()
 | |
| 
 | |
|         session = Session()
 | |
|         get_obj = session.query(PublicKey).filter(
 | |
|             ManagedObject.unique_identifier == key.unique_identifier
 | |
|             ).one()
 | |
|         session.commit()
 | |
|         self.assertEquals(expected_names, get_obj.names)
 | |
|         self.assertEquals(expected_mo_names, get_obj._names)
 | |
| 
 | |
|     def test_remove_and_add_name(self):
 | |
|         """
 | |
|         Tests that names can be removed from the list of names and more added.
 | |
|         This will verify that the list of names is correct. It will verify that
 | |
|         updating this object removes the name from the database. It will verify
 | |
|         that the indices for the removed names are not reused.
 | |
|         """
 | |
|         names = ['bowser', 'frumpy', 'big fat cat']
 | |
|         key = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
 | |
|             enums.KeyFormatType.PKCS_1, name=names[0])
 | |
|         key.names.append(names[1])
 | |
|         key.names.append(names[2])
 | |
|         key.names.pop()
 | |
|         key.names.pop()
 | |
|         key.names.append('dog')
 | |
|         self.assertEquals(4, key.name_index)
 | |
| 
 | |
|         expected_names = ['bowser', 'dog']
 | |
|         expected_mo_names = list()
 | |
|         expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0],
 | |
|                                                             0))
 | |
|         expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1],
 | |
|                                                             3))
 | |
|         self.assertEquals(expected_names, key.names)
 | |
|         self.assertEquals(expected_mo_names, key._names)
 | |
| 
 | |
|         Session = sessionmaker(bind=self.engine)
 | |
|         session = Session()
 | |
|         session.add(key)
 | |
|         session.commit()
 | |
| 
 | |
|         session = Session()
 | |
|         get_obj = session.query(PublicKey).filter(
 | |
|             ManagedObject.unique_identifier == key.unique_identifier
 | |
|             ).one()
 | |
|         session.commit()
 | |
|         self.assertEquals(expected_names, get_obj.names)
 | |
|         self.assertEquals(expected_mo_names, get_obj._names)
 | |
| 
 | |
|     def test_update_with_add_name(self):
 | |
|         """
 | |
|         Tests that an OpaqueObject already stored in the database can be
 | |
|         updated. This will store an OpaqueObject in the database. It will add a
 | |
|         name to it in one session, and then retrieve it in another session to
 | |
|         verify that it has all of the correct names.
 | |
| 
 | |
|         This test and the subsequent test_udpate_* methods are different than
 | |
|         the name tests above because these are updating objects already stored
 | |
|         in the database. This tests will simulate what happens when the KMIP
 | |
|         client calls an add attribute method.
 | |
|         """
 | |
|         first_name = 'bowser'
 | |
|         key = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
 | |
|             enums.KeyFormatType.PKCS_1, name=first_name)
 | |
|         Session = sessionmaker(bind=self.engine)
 | |
|         session = Session()
 | |
|         session.add(key)
 | |
|         session.commit()
 | |
| 
 | |
|         added_name = 'frumpy'
 | |
|         expected_names = [first_name, added_name]
 | |
|         expected_mo_names = list()
 | |
|         for i, name in enumerate(expected_names):
 | |
|             expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
 | |
| 
 | |
|         session = Session()
 | |
|         update_key = session.query(PublicKey).filter(
 | |
|             ManagedObject.unique_identifier == key.unique_identifier
 | |
|             ).one()
 | |
|         update_key.names.append(added_name)
 | |
|         session.commit()
 | |
| 
 | |
|         session = Session()
 | |
|         get_obj = session.query(PublicKey).filter(
 | |
|             ManagedObject.unique_identifier == key.unique_identifier
 | |
|             ).one()
 | |
|         session.commit()
 | |
|         self.assertEquals(expected_names, get_obj.names)
 | |
|         self.assertEquals(expected_mo_names, get_obj._names)
 | |
| 
 | |
|     def test_update_with_remove_name(self):
 | |
|         """
 | |
|         Tests that an OpaqueObject already stored in the database can be
 | |
|         updated. This will store an OpaqueObject in the database. It will
 | |
|         remove a name from it in one session, and then retrieve it in another
 | |
|         session to verify that it has all of the correct names.
 | |
|         """
 | |
|         names = ['bowser', 'frumpy', 'big fat cat']
 | |
|         remove_index = 1
 | |
|         key = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
 | |
|             enums.KeyFormatType.PKCS_1, name=names[0])
 | |
|         key.names.append(names[1])
 | |
|         key.names.append(names[2])
 | |
| 
 | |
|         Session = sessionmaker(bind=self.engine)
 | |
|         session = Session()
 | |
|         session.add(key)
 | |
|         session.commit()
 | |
| 
 | |
|         expected_names = list()
 | |
|         expected_mo_names = list()
 | |
|         for i, name in enumerate(names):
 | |
|             if i != remove_index:
 | |
|                 expected_names.append(name)
 | |
|                 expected_mo_names.append(sqltypes.ManagedObjectName(name, i))
 | |
| 
 | |
|         session = Session()
 | |
|         update_key = session.query(PublicKey).filter(
 | |
|             ManagedObject.unique_identifier == key.unique_identifier
 | |
|             ).one()
 | |
|         update_key.names.pop(remove_index)
 | |
|         session.commit()
 | |
| 
 | |
|         session = Session()
 | |
|         get_obj = session.query(PublicKey).filter(
 | |
|             ManagedObject.unique_identifier == key.unique_identifier
 | |
|             ).one()
 | |
|         session.commit()
 | |
|         self.assertEquals(expected_names, get_obj.names)
 | |
|         self.assertEquals(expected_mo_names, get_obj._names)
 | |
| 
 | |
|     def test_update_with_remove_and_add_name(self):
 | |
|         """
 | |
|         Tests that an OpaqueObject already stored in the database can be
 | |
|         updated. This will store an OpaqueObject in the database. It will
 | |
|         remove a name and add another one to it in one session, and then
 | |
|         retrieve it in another session to verify that it has all of the correct
 | |
|         names. This simulates multiple operation being sent for the same
 | |
|         object.
 | |
|         """
 | |
|         names = ['bowser', 'frumpy', 'big fat cat']
 | |
|         key = PublicKey(
 | |
|             enums.CryptographicAlgorithm.RSA, 2048, self.bytes_2048,
 | |
|             enums.KeyFormatType.PKCS_1, name=names[0])
 | |
|         key.names.append(names[1])
 | |
|         key.names.append(names[2])
 | |
| 
 | |
|         Session = sessionmaker(bind=self.engine)
 | |
|         session = Session()
 | |
|         session.add(key)
 | |
|         session.commit()
 | |
| 
 | |
|         session = Session()
 | |
|         update_key = session.query(PublicKey).filter(
 | |
|             ManagedObject.unique_identifier == key.unique_identifier
 | |
|             ).one()
 | |
|         update_key.names.pop()
 | |
|         update_key.names.pop()
 | |
|         update_key.names.append('dog')
 | |
|         session.commit()
 | |
| 
 | |
|         expected_names = ['bowser', 'dog']
 | |
|         expected_mo_names = list()
 | |
|         expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[0],
 | |
|                                                             0))
 | |
|         expected_mo_names.append(sqltypes.ManagedObjectName(expected_names[1],
 | |
|                                                             3))
 | |
| 
 | |
|         session = Session()
 | |
|         get_obj = session.query(PublicKey).filter(
 | |
|             ManagedObject.unique_identifier == key.unique_identifier
 | |
|             ).one()
 | |
|         session.commit()
 | |
|         self.assertEquals(expected_names, get_obj.names)
 | |
|         self.assertEquals(expected_mo_names, get_obj._names)
 |