Add DeleteAttribute support to the client

This change adds DeleteAttribute support to the ProxyKmipClient,
leveraging the new generic request capability in the underlying
KMIPProxy client. Going forward all new attribute support will
leverage the new request capability and older supported operations
will be migrated to use it as well, with the ultimate vision
being a final merger of the two client classes into one easy to
use architecture. New unit tests have been added to cover the new
client additions.

Partially implements #547
This commit is contained in:
Peter Hamilton 2019-11-15 16:17:05 -05:00 committed by Peter Hamilton
parent 77d5b32ea4
commit b045e08ce2
5 changed files with 414 additions and 0 deletions

View File

@ -228,6 +228,23 @@ class KeyFormatTypeNotSupported(KmipError):
)
class OperationFailure(KmipError):
"""
An exception raised upon the failure of a KMIP appliance operation.
"""
def __init__(self, status, reason, message):
"""
Construct the error message and attributes for the KMIP operation
failure.
Args:
status: a ResultStatus enumeration
reason: a ResultReason enumeration
message: a string providing additional error information
"""
super(OperationFailure, self).__init__(status, reason, message)
class OperationNotSupported(KmipError):
"""
An error generated when an unsupported operation is invoked.

View File

@ -25,6 +25,8 @@ from kmip.core.factories import attributes
from kmip.core.attributes import CryptographicParameters
from kmip.core.attributes import DerivationParameters
from kmip.core.messages import payloads
from kmip.pie import exceptions
from kmip.pie import factory
from kmip.pie import objects as pobjects
@ -386,6 +388,49 @@ class ProxyKmipClient(object):
message = result.result_message.value
raise exceptions.KmipOperationFailure(status, reason, message)
@is_connected
def delete_attribute(self, unique_identifier=None, **kwargs):
"""
Delete an attribute from a KMIP managed object.
Args:
unique_identifier (string): The ID of the managed object.
**kwargs (various): A placeholder for attribute values used to
identify the attribute to delete. For KMIP 1.0 - 1.4, the
supported parameters are:
attribute_name (string): The name of the attribute to
delete. Required.
attribute_index (int): The index of the attribute to
delete. Defaults to zero.
For KMIP 2.0+, the supported parameters are:
current_attribute (struct): A CurrentAttribute object
containing the attribute to delete. Required if the
attribute reference is not specified.
attribute_reference (struct): An AttributeReference
object containing the name of the attribute to
delete. Required if the current attribute is not
specified.
Returns:
string: The ID of the managed object the attribute was deleted
from.
struct: A Primitive object representing the deleted attribute.
Only returned if used for KMIP 1.0 - 1.4 messages.
"""
request_payload = payloads.DeleteAttributeRequestPayload(
unique_identifier=unique_identifier,
attribute_name=kwargs.get("attribute_name"),
attribute_index=kwargs.get("attribute_index"),
current_attribute=kwargs.get("current_attribute"),
attribute_reference=kwargs.get("attribute_reference")
)
response_payload = self.proxy.send_request_payload(
enums.Operation.DELETE_ATTRIBUTE,
request_payload
)
return response_payload.unique_identifier, response_payload.attribute
@is_connected
def register(self, managed_object):
"""

View File

@ -39,6 +39,8 @@ from kmip.core.enums import ConformanceClause
from kmip.core.enums import CredentialType
from kmip.core.enums import Operation as OperationEnum
from kmip.core import exceptions
from kmip.core.factories.credentials import CredentialFactory
from kmip.core import objects
@ -309,6 +311,86 @@ class KMIPProxy(object):
pass
self.socket = None
def send_request_payload(self, operation, payload, credential=None):
"""
Send a KMIP request.
Args:
operation (enum): An Operation enumeration specifying the type
of operation to be requested. Required.
payload (struct): A RequestPayload structure containing the
parameters for a specific KMIP operation. Required.
credential (struct): A Credential structure containing
authentication information for the server. Optional, defaults
to None.
Returns:
response (struct): A ResponsePayload structure containing the
results of the KMIP operation specified in the request.
Raises:
TypeError: if the payload is not a RequestPayload instance or if
the operation and payload type do not match
InvalidMessage: if the response message does not have the right
number of response payloads, or does not match the request
operation
"""
if not isinstance(payload, payloads.RequestPayload):
raise TypeError(
"The request payload must be a RequestPayload object."
)
# TODO (peterhamilton) For now limit this to the new DeleteAttribute
# operation. Migrate over existing operations to use this method
# instead.
if operation == enums.Operation.DELETE_ATTRIBUTE:
if not isinstance(payload, payloads.DeleteAttributeRequestPayload):
raise TypeError(
"The request payload for the DeleteAttribute operation "
"must be a DeleteAttributeRequestPayload object."
)
batch_item = messages.RequestBatchItem(
operation=operation,
request_payload=payload
)
request_message = self._build_request_message(credential, [batch_item])
response_message = self._send_and_receive_message(request_message)
if len(response_message.batch_items) != 1:
raise exceptions.InvalidMessage(
"The response message does not have the right number of "
"requested operation results."
)
batch_item = response_message.batch_items[0]
if batch_item.result_status.value != enums.ResultStatus.SUCCESS:
raise exceptions.OperationFailure(
batch_item.result_status.value,
batch_item.result_reason.value,
batch_item.result_message.value
)
if batch_item.operation.value != operation:
raise exceptions.InvalidMessage(
"The response message does not match the request operation."
)
# TODO (peterhamilton) Same as above for now.
if batch_item.operation.value == enums.Operation.DELETE_ATTRIBUTE:
if not isinstance(
batch_item.response_payload,
payloads.DeleteAttributeResponsePayload
):
raise exceptions.InvalidMessage(
"Invalid response payload received for the "
"DeleteAttribute operation."
)
return batch_item.response_payload
def create(self, object_type, template_attribute, credential=None):
return self._create(object_type=object_type,
template_attribute=template_attribute,

View File

@ -24,6 +24,7 @@ from kmip.core import objects as obj
from kmip.core.factories import attributes
from kmip.core.messages import contents
from kmip.core.messages import payloads
from kmip.core.primitives import DateTime
from kmip.services.kmip_client import KMIPProxy
@ -758,6 +759,41 @@ class TestProxyKmipClient(testtools.TestCase):
KmipOperationFailure, error_msg,
client.create_key_pair, *args)
@mock.patch(
"kmip.pie.client.KMIPProxy",
mock.MagicMock(spec_set=KMIPProxy)
)
def test_delete_attribute(self):
"""
Test that the client can delete an attribute.
"""
request_payload = payloads.DeleteAttributeRequestPayload(
unique_identifier="1",
attribute_name="Object Group",
attribute_index=2
)
response_payload = payloads.DeleteAttributeResponsePayload(
unique_identifier="1",
attribute=None
)
with ProxyKmipClient() as client:
client.proxy.send_request_payload.return_value = response_payload
unique_identifier, attribute = client.delete_attribute(
"1",
attribute_name="Object Group",
attribute_index=2
)
args = (
enums.Operation.DELETE_ATTRIBUTE,
request_payload
)
client.proxy.send_request_payload.assert_called_with(*args)
self.assertEqual("1", unique_identifier)
self.assertIsNone(attribute)
@mock.patch(
'kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)
)

View File

@ -30,6 +30,8 @@ from kmip.core.enums import QueryFunction as QueryFunctionEnum
from kmip.core.enums import CryptographicAlgorithm as \
CryptographicAlgorithmEnum
from kmip.core import exceptions
from kmip.core.factories.attributes import AttributeFactory
from kmip.core.factories.credentials import CredentialFactory
from kmip.core.factories.secrets import SecretFactory
@ -772,6 +774,238 @@ class TestKMIPClient(TestCase):
self.client._create_socket(sock)
self.assertEqual(ssl.SSLSocket, type(self.client.socket))
@mock.patch(
"kmip.services.kmip_client.KMIPProxy._build_request_message"
)
@mock.patch(
"kmip.services.kmip_client.KMIPProxy._send_and_receive_message"
)
def test_send_request_payload(self, send_mock, build_mock):
"""
Test that the client can send a request payload and correctly handle
the resulting response messsage.
"""
request_payload = payloads.DeleteAttributeRequestPayload(
unique_identifier="1",
attribute_name="Object Group",
attribute_index=2
)
response_payload = payloads.DeleteAttributeResponsePayload(
unique_identifier="1",
attribute=None
)
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.DELETE_ATTRIBUTE),
result_status=ResultStatus(ResultStatusEnum.SUCCESS),
response_payload=response_payload
)
response_message = ResponseMessage(batch_items=[batch_item])
build_mock.return_value = None
send_mock.return_value = response_message
result = self.client.send_request_payload(
OperationEnum.DELETE_ATTRIBUTE,
request_payload
)
self.assertIsInstance(result, payloads.DeleteAttributeResponsePayload)
self.assertEqual(result, response_payload)
def test_send_request_payload_invalid_payload(self):
"""
Test that a TypeError is raised when an invalid payload is used to
send a request.
"""
args = (OperationEnum.DELETE_ATTRIBUTE, "invalid")
self.assertRaisesRegex(
TypeError,
"The request payload must be a RequestPayload object.",
self.client.send_request_payload,
*args
)
def test_send_request_payload_mismatch_operation_payload(self):
"""
Test that a TypeError is raised when the operation and request payload
do not match up when used to send a request.
"""
args = (
OperationEnum.DELETE_ATTRIBUTE,
payloads.CreateRequestPayload()
)
self.assertRaisesRegex(
TypeError,
"The request payload for the DeleteAttribute operation must be a "
"DeleteAttributeRequestPayload object.",
self.client.send_request_payload,
*args
)
@mock.patch(
"kmip.services.kmip_client.KMIPProxy._build_request_message"
)
@mock.patch(
"kmip.services.kmip_client.KMIPProxy._send_and_receive_message"
)
def test_send_request_payload_incorrect_number_of_batch_items(
self,
send_mock,
build_mock
):
"""
Test that an InvalidMessage error is raised when the wrong number of
response payloads are returned from the server.
"""
build_mock.return_value = None
send_mock.return_value = ResponseMessage(batch_items=[])
args = (
OperationEnum.DELETE_ATTRIBUTE,
payloads.DeleteAttributeRequestPayload(
unique_identifier="1",
attribute_name="Object Group",
attribute_index=2
)
)
self.assertRaisesRegex(
exceptions.InvalidMessage,
"The response message does not have the right number of requested "
"operation results.",
self.client.send_request_payload,
*args
)
@mock.patch(
"kmip.services.kmip_client.KMIPProxy._build_request_message"
)
@mock.patch(
"kmip.services.kmip_client.KMIPProxy._send_and_receive_message"
)
def test_send_request_payload_mismatch_response_operation(
self,
send_mock,
build_mock
):
"""
Test that an InvalidMessage error is raised when the wrong operation
is returned from the server.
"""
response_payload = payloads.DeleteAttributeResponsePayload(
unique_identifier="1",
attribute=None
)
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.CREATE),
result_status=ResultStatus(ResultStatusEnum.SUCCESS),
response_payload=response_payload
)
build_mock.return_value = None
send_mock.return_value = ResponseMessage(batch_items=[batch_item])
args = (
OperationEnum.DELETE_ATTRIBUTE,
payloads.DeleteAttributeRequestPayload(
unique_identifier="1",
attribute_name="Object Group",
attribute_index=2
)
)
self.assertRaisesRegex(
exceptions.InvalidMessage,
"The response message does not match the request operation.",
self.client.send_request_payload,
*args
)
@mock.patch(
"kmip.services.kmip_client.KMIPProxy._build_request_message"
)
@mock.patch(
"kmip.services.kmip_client.KMIPProxy._send_and_receive_message"
)
def test_send_request_payload_mismatch_response_payload(
self,
send_mock,
build_mock
):
"""
Test that an InvalidMessage error is raised when the wrong payload
is returned from the server.
"""
response_payload = payloads.DestroyResponsePayload(
unique_identifier="1"
)
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.DELETE_ATTRIBUTE),
result_status=ResultStatus(ResultStatusEnum.SUCCESS),
response_payload=response_payload
)
build_mock.return_value = None
send_mock.return_value = ResponseMessage(batch_items=[batch_item])
args = (
OperationEnum.DELETE_ATTRIBUTE,
payloads.DeleteAttributeRequestPayload(
unique_identifier="1",
attribute_name="Object Group",
attribute_index=2
)
)
self.assertRaisesRegex(
exceptions.InvalidMessage,
"Invalid response payload received for the DeleteAttribute "
"operation.",
self.client.send_request_payload,
*args
)
@mock.patch(
"kmip.services.kmip_client.KMIPProxy._build_request_message"
)
@mock.patch(
"kmip.services.kmip_client.KMIPProxy._send_and_receive_message"
)
def test_send_request_payload_operation_failure(
self,
send_mock,
build_mock
):
"""
Test that a KmipOperationFailure error is raised when a payload
with a failure status is returned.
"""
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.DELETE_ATTRIBUTE),
result_status=ResultStatus(ResultStatusEnum.OPERATION_FAILED),
result_reason=ResultReason(ResultReasonEnum.GENERAL_FAILURE),
result_message=ResultMessage("Test failed!")
)
build_mock.return_value = None
send_mock.return_value = ResponseMessage(batch_items=[batch_item])
args = (
OperationEnum.DELETE_ATTRIBUTE,
payloads.DeleteAttributeRequestPayload(
unique_identifier="1",
attribute_name="Object Group",
attribute_index=2
)
)
self.assertRaisesRegex(
exceptions.OperationFailure,
"Test failed!",
self.client.send_request_payload,
*args
)
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._build_request_message'
)