mirror of
				https://github.com/OpenKMIP/PyKMIP.git
				synced 2025-10-25 01:24:15 +02:00 
			
		
		
		
	Add Sign operation to clients
This commit is contained in:
		
							parent
							
								
									2915bf5f11
								
							
						
					
					
						commit
						22daacf5e9
					
				| @ -1069,6 +1069,64 @@ class ProxyKmipClient(api.KmipClient): | ||||
|                 result.get('result_message') | ||||
|             ) | ||||
| 
 | ||||
|     def sign(self, data, uid=None, cryptographic_parameters=None): | ||||
|         """ | ||||
|         Create a digital signature for data using the specified signing key. | ||||
| 
 | ||||
|         Args: | ||||
|             data (bytes): The bytes of the data to be signed. Required. | ||||
|             uid (string): The unique ID of the signing key to use. | ||||
|                 Optional, defaults to None. | ||||
|             cryptographic_parameters (dict): A dictionary containing various | ||||
|                 cryptographic settings to be used for creating the signature | ||||
|                 (e.g., cryptographic algorithm, hashing algorithm, and/or | ||||
|                 digital signature algorithm). Optional, defaults to None. | ||||
| 
 | ||||
|         Returns: | ||||
|             signature (bytes): Bytes representing the signature of the data | ||||
| 
 | ||||
|         Raises: | ||||
|             ClientConnectionNotOpen: if the client connection is unusable | ||||
|             KmipOperationFailure: if the operation result is a failure | ||||
|             TypeError: if the input arguments are invalid | ||||
|         """ | ||||
|         # Check input | ||||
|         if not isinstance(data, six.binary_type): | ||||
|             raise TypeError("Data to be signed must be bytes.") | ||||
|         if uid is not None: | ||||
|             if not isinstance(uid, six.string_types): | ||||
|                 raise TypeError("Unique identifier must be a string.") | ||||
|         if cryptographic_parameters is not None: | ||||
|             if not isinstance(cryptographic_parameters, dict): | ||||
|                 raise TypeError( | ||||
|                     "Cryptographic parameters must be a dictionary." | ||||
|                 ) | ||||
| 
 | ||||
|         # Verify that operations can be served at this time | ||||
|         if not self._is_open: | ||||
|             raise exceptions.ClientConnectionNotOpen() | ||||
| 
 | ||||
|         cryptographic_parameters = self._build_cryptographic_parameters( | ||||
|             cryptographic_parameters | ||||
|         ) | ||||
| 
 | ||||
|         # Sign the provided data and handle results | ||||
|         result = self.proxy.sign( | ||||
|             data, | ||||
|             uid, | ||||
|             cryptographic_parameters | ||||
|         ) | ||||
| 
 | ||||
|         status = result.get('result_status') | ||||
|         if status == enums.ResultStatus.SUCCESS: | ||||
|             return result.get('signature') | ||||
|         else: | ||||
|             raise exceptions.KmipOperationFailure( | ||||
|                 status, | ||||
|                 result.get('result_reason'), | ||||
|                 result.get('result_message') | ||||
|             ) | ||||
| 
 | ||||
|     def mac(self, data, uid=None, algorithm=None): | ||||
|         """ | ||||
|         Get the message authentication code for data. | ||||
|  | ||||
| @ -64,6 +64,7 @@ from kmip.core.messages.payloads import query | ||||
| from kmip.core.messages.payloads import rekey_key_pair | ||||
| from kmip.core.messages.payloads import register | ||||
| from kmip.core.messages.payloads import revoke | ||||
| from kmip.core.messages.payloads import sign | ||||
| from kmip.core.messages.payloads import signature_verify | ||||
| from kmip.core.messages.payloads import mac | ||||
| 
 | ||||
| @ -747,6 +748,72 @@ class KMIPProxy(KMIP): | ||||
| 
 | ||||
|         return result | ||||
| 
 | ||||
|     def sign(self, data, unique_identifier=None, | ||||
|              cryptographic_parameters=None, credential=None): | ||||
|         """ | ||||
|         Sign specified data using a specified signing key. | ||||
| 
 | ||||
|         Args: | ||||
|             data (bytes): Data to be signed. Required. | ||||
|             unique_identifier (string): The unique ID of the signing | ||||
|                 key to be used. Optional, defaults to None. | ||||
|             cryptographic_parameters (CryptographicParameters): A structure | ||||
|                 containing various cryptographic settings to be used for | ||||
|                 creating the signature. Optional, defaults to None. | ||||
|             credential (Credential): A credential object containing a set of | ||||
|                 authorization parameters for the operation. Optional, defaults | ||||
|                 to None. | ||||
|         Returns: | ||||
|             dict: The results of the sign operation, containing the | ||||
|                 following key/value pairs: | ||||
| 
 | ||||
|             Key                  | Value | ||||
|             ---------------------|----------------------------------------- | ||||
|             'unique_identifier'  | (string) The unique ID of the signing | ||||
|                                  | key used to create the signature | ||||
|             'signature'          | (bytes) The bytes of the signature | ||||
|             'result_status'      | (ResultStatus) An enumeration indicating | ||||
|                                  | the status of the operation result | ||||
|             'result_reason'      | (ResultReason) An enumeration providing | ||||
|                                  | context for the result status. | ||||
|             'result_message'     | (string) A message providing additional | ||||
|                                  | context for the operation result. | ||||
|         """ | ||||
|         operation = Operation(OperationEnum.SIGN) | ||||
| 
 | ||||
|         request_payload = sign.SignRequestPayload( | ||||
|             unique_identifier=unique_identifier, | ||||
|             cryptographic_parameters=cryptographic_parameters, | ||||
|             data=data | ||||
|         ) | ||||
| 
 | ||||
|         batch_item = messages.RequestBatchItem( | ||||
|             operation=operation, | ||||
|             request_payload=request_payload | ||||
|         ) | ||||
| 
 | ||||
|         request = self._build_request_message(credential, [batch_item]) | ||||
|         response = self._send_and_receive_message(request) | ||||
|         batch_item = response.batch_items[0] | ||||
|         payload = batch_item.response_payload | ||||
| 
 | ||||
|         result = {} | ||||
| 
 | ||||
|         if payload: | ||||
|             result['unique_identifier'] = payload.unique_identifier | ||||
|             result['signature'] = payload.signature_data | ||||
|         result['result_status'] = batch_item.result_status.value | ||||
|         try: | ||||
|             result['result_reason'] = batch_item.result_reason.value | ||||
|         except: | ||||
|             result['result_reason'] = batch_item.result_reason | ||||
|         try: | ||||
|             result['result_message'] = batch_item.result_message.value | ||||
|         except: | ||||
|             result['result_message'] = batch_item.result_message | ||||
| 
 | ||||
|         return result | ||||
| 
 | ||||
|     def mac(self, data, unique_identifier=None, | ||||
|             cryptographic_parameters=None, credential=None): | ||||
|         return self._mac( | ||||
|  | ||||
| @ -1982,6 +1982,85 @@ class TestProxyKmipClient(testtools.TestCase): | ||||
| 
 | ||||
|         self.assertEqual(enums.ValidityIndicator.VALID, validity) | ||||
| 
 | ||||
|     @mock.patch('kmip.pie.client.KMIPProxy', | ||||
|                 mock.MagicMock(spec_set=KMIPProxy)) | ||||
|     def test_sign(self): | ||||
|         """ | ||||
|         Test that the client can sign data. | ||||
|         """ | ||||
|         mock_signature = b'aaaaaaaaaaaaaaaaaaaaaaaaaa' | ||||
|         result = { | ||||
|             'result_status': enums.ResultStatus.SUCCESS, | ||||
|             'unique_identifier': '1', | ||||
|             'signature': mock_signature | ||||
|         } | ||||
| 
 | ||||
|         client = ProxyKmipClient() | ||||
|         client.open() | ||||
|         client.proxy.sign.return_value = result | ||||
| 
 | ||||
|         actual_signature = client.sign( | ||||
|             b'\x01\x02\x03\x04\x05\x06\x07\x08', | ||||
|             uid='1', | ||||
|             cryptographic_parameters={ | ||||
|                  'padding_method': enums.PaddingMethod.PSS, | ||||
|                  'cryptographic_algorithm': | ||||
|                  enums.CryptographicAlgorithm.RSA | ||||
|             } | ||||
|         ) | ||||
| 
 | ||||
|         self.assertEqual(mock_signature, actual_signature) | ||||
| 
 | ||||
|     @mock.patch('kmip.pie.client.KMIPProxy', | ||||
|                 mock.MagicMock(spec_set=KMIPProxy)) | ||||
|     def test_sign_on_invalid_inputs(self): | ||||
|         """ | ||||
|         Test that TypeError exceptions are raised when trying to sign | ||||
|         data with invalid parameters. | ||||
|         """ | ||||
|         client = ProxyKmipClient() | ||||
|         client.open() | ||||
|         client.proxy.sign.return_value = {} | ||||
|         args = [1234] | ||||
|         kwargs = { | ||||
|             'uid': '1', | ||||
|             'cryptographic_parameters': {} | ||||
|         } | ||||
|         self.assertRaisesRegexp( | ||||
|             TypeError, | ||||
|             "Data to be signed must be bytes.", | ||||
|             client.sign, | ||||
|             *args, | ||||
|             **kwargs | ||||
|         ) | ||||
| 
 | ||||
|         args = [ | ||||
|             b'\x01\x02\x03\x04' | ||||
|         ] | ||||
|         kwargs = { | ||||
|             'uid': 0, | ||||
|             'cryptographic_parameters': {} | ||||
|         } | ||||
|         self.assertRaisesRegexp( | ||||
|             TypeError, | ||||
|             "Unique identifier must be a string.", | ||||
|             client.sign, | ||||
|             *args, | ||||
|             **kwargs | ||||
|         ) | ||||
| 
 | ||||
|         kwargs = { | ||||
|             'uid': '1', | ||||
|             'cryptographic_parameters': 'invalid' | ||||
|         } | ||||
|         self.assertRaisesRegexp( | ||||
|             TypeError, | ||||
|             "Cryptographic parameters must be a dictionary.", | ||||
|             client.sign, | ||||
|             *args, | ||||
|             **kwargs | ||||
|         ) | ||||
| 
 | ||||
|     @mock.patch('kmip.pie.client.KMIPProxy', | ||||
|                 mock.MagicMock(spec_set=KMIPProxy)) | ||||
|     def test_signature_verify_on_invalid_inputs(self): | ||||
|  | ||||
| @ -55,6 +55,7 @@ from kmip.core.messages.payloads.query import \ | ||||
|     QueryRequestPayload, QueryResponsePayload | ||||
| from kmip.core.messages.payloads.rekey_key_pair import \ | ||||
|     RekeyKeyPairRequestPayload, RekeyKeyPairResponsePayload | ||||
| from kmip.core.messages.payloads import sign | ||||
| from kmip.core.messages.payloads import signature_verify | ||||
| 
 | ||||
| from kmip.core.misc import Offset | ||||
| @ -961,6 +962,52 @@ class TestKMIPClient(TestCase): | ||||
|         self.assertEqual(None, result.get('result_reason')) | ||||
|         self.assertEqual(None, result.get('result_message')) | ||||
| 
 | ||||
|     @mock.patch( | ||||
|         'kmip.services.kmip_client.KMIPProxy._build_request_message' | ||||
|     ) | ||||
|     @mock.patch( | ||||
|         'kmip.services.kmip_client.KMIPProxy._send_and_receive_message' | ||||
|     ) | ||||
|     def test_sign(self, send_mock, build_mock): | ||||
|         """ | ||||
|         Test that the client can sign data | ||||
|         """ | ||||
|         payload = sign.SignResponsePayload( | ||||
|             unique_identifier='1', | ||||
|             signature_data=b'aaaaaaaaaaaaaaaa' | ||||
|         ) | ||||
|         batch_item = ResponseBatchItem( | ||||
|             operation=Operation(OperationEnum.SIGN), | ||||
|             result_status=ResultStatus(ResultStatusEnum.SUCCESS), | ||||
|             response_payload=payload | ||||
|         ) | ||||
|         response = ResponseMessage(batch_items=[batch_item]) | ||||
| 
 | ||||
|         build_mock.return_value = None | ||||
|         send_mock.return_value = response | ||||
| 
 | ||||
|         result = self.client.sign( | ||||
|             b'\x11\x11\x11\x11\x11\x11\x11\x11', | ||||
|             unique_identifier='1', | ||||
|             cryptographic_parameters=CryptographicParameters( | ||||
|                 padding_method=enums.PaddingMethod.PKCS1v15, | ||||
|                 cryptographic_algorithm=enums.CryptographicAlgorithm.RSA, | ||||
|                 hashing_algorithm=enums.HashingAlgorithm.SHA_224 | ||||
|             ) | ||||
|         ) | ||||
| 
 | ||||
|         self.assertEqual('1', result.get('unique_identifier')) | ||||
|         self.assertEqual( | ||||
|             b'aaaaaaaaaaaaaaaa', | ||||
|             result.get('signature') | ||||
|         ) | ||||
|         self.assertEqual( | ||||
|             ResultStatusEnum.SUCCESS, | ||||
|             result.get('result_status') | ||||
|         ) | ||||
|         self.assertEqual(None, result.get('result_reason')) | ||||
|         self.assertEqual(None, result.get('result_message')) | ||||
| 
 | ||||
|     @mock.patch('kmip.services.kmip_client.KMIPProxy._send_message', | ||||
|                 mock.MagicMock()) | ||||
|     @mock.patch('kmip.services.kmip_client.KMIPProxy._receive_message', | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user