Update the RequestBatchItem to support the ephemeral field

This change updates the RequestBatchItem to support the new
ephemeral field added in KMIP 2.0. Unit tests have been added to
cover the change.
This commit is contained in:
Peter Hamilton 2019-05-09 10:28:36 -04:00 committed by Peter Hamilton
parent a7f05ab7be
commit 4a9690165a
2 changed files with 136 additions and 1 deletions

View File

@ -23,6 +23,7 @@ from kmip.core.messages.contents import BatchErrorContinuationOption
from kmip.core.factories.payloads.request import RequestPayloadFactory
from kmip.core.factories.payloads.response import ResponsePayloadFactory
from kmip.core import primitives
from kmip.core.primitives import Struct
from kmip.core.utils import BytearrayStream
@ -199,7 +200,8 @@ class RequestBatchItem(Struct):
operation=None,
unique_batch_item_id=None,
request_payload=None,
message_extension=None):
message_extension=None,
ephemeral=None):
super(RequestBatchItem, self).__init__(tag=Tags.REQUEST_BATCH_ITEM)
self.payload_factory = RequestPayloadFactory()
@ -208,6 +210,26 @@ class RequestBatchItem(Struct):
self.unique_batch_item_id = unique_batch_item_id
self.request_payload = request_payload
self.message_extension = message_extension
self.ephemeral = ephemeral
@property
def ephemeral(self):
if self._ephemeral:
return self._ephemeral.value
return None
@ephemeral.setter
def ephemeral(self, value):
if value is None:
self._ephemeral = None
elif isinstance(value, bool):
ephemeral = primitives.Boolean(
value=value,
tag=enums.Tags.EPHEMERAL
)
self._ephemeral = ephemeral
else:
raise TypeError("The ephemeral value must be a boolean.")
def read(self, istream, kmip_version=enums.KMIPVersion.KMIP_1_0):
super(RequestBatchItem, self).read(
@ -220,6 +242,12 @@ class RequestBatchItem(Struct):
self.operation = contents.Operation()
self.operation.read(tstream, kmip_version=kmip_version)
if kmip_version >= enums.KMIPVersion.KMIP_2_0:
if self.is_tag_next(enums.Tags.EPHEMERAL, tstream):
ephemeral = primitives.Boolean(tag=enums.Tags.EPHEMERAL)
ephemeral.read(tstream, kmip_version=kmip_version)
self._ephemeral = ephemeral
# Read the unique batch item ID if it is present
if self.is_tag_next(Tags.UNIQUE_BATCH_ITEM_ID, tstream):
self.unique_batch_item_id = contents.UniqueBatchItemID()
@ -244,6 +272,10 @@ class RequestBatchItem(Struct):
# Write the contents of the batch item to the stream
self.operation.write(tstream, kmip_version=kmip_version)
if kmip_version >= enums.KMIPVersion.KMIP_2_0:
if self._ephemeral:
self._ephemeral.write(tstream, kmip_version=kmip_version)
if self.unique_batch_item_id is not None:
self.unique_batch_item_id.write(tstream, kmip_version=kmip_version)

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from testtools import TestCase
import binascii
import six
@ -53,6 +54,108 @@ from kmip.core import utils
from kmip.core.utils import BytearrayStream
class TestRequestBatchItem(testtools.TestCase):
def setUp(self):
super(TestRequestBatchItem, self).setUp()
# Encoding obtained from the KMIP 1.1 testing document,
# Section 3.1.1.
#
# This encoding matches the following set of values:
# Request Batch Item
# Operation - Destroy
# Request Payload
# Unique Identifier - fb4b5b9c-6188-4c63-8142-fe9c328129fc
self.encoding_kmip_2_0 = utils.BytearrayStream(
b'\x42\x00\x0F\x01\x00\x00\x00\x58'
b'\x42\x00\x5C\x05\x00\x00\x00\x04\x00\x00\x00\x14\x00\x00\x00\x00'
b'\x42\x01\x54\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01'
b'\x42\x00\x79\x01\x00\x00\x00\x30'
b'\x42\x00\x94\x07\x00\x00\x00\x24'
b'\x66\x62\x34\x62\x35\x62\x39\x63\x2D\x36\x31\x38\x38\x2D\x34\x63'
b'\x36\x33\x2D\x38\x31\x34\x32\x2D\x66\x65\x39\x63\x33\x32\x38\x31'
b'\x32\x39\x66\x63\x00\x00\x00\x00'
)
def tearDown(self):
super(TestRequestBatchItem, self).tearDown()
def test_invalid_ephemeral(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the ephemeral value of a RequestBatchItem.
"""
kwargs = {"ephemeral": "invalid"}
self.assertRaisesRegex(
TypeError,
"The ephemeral value must be a boolean.",
messages.RequestBatchItem,
**kwargs
)
args = (
messages.RequestBatchItem(),
"ephemeral",
"invalid"
)
self.assertRaisesRegex(
TypeError,
"The ephemeral value must be a boolean.",
setattr,
*args
)
def test_read_kmip_2_0(self):
"""
Test that a RequestBatchItem structure can be correctly read in
from a data stream when including KMIP 2.0 fields.
"""
request_batch_item = messages.RequestBatchItem()
self.assertIsNone(request_batch_item.operation)
self.assertIsNone(request_batch_item.ephemeral)
request_batch_item.read(
self.encoding_kmip_2_0,
kmip_version=enums.KMIPVersion.KMIP_2_0
)
self.assertEqual(
enums.Operation.DESTROY,
request_batch_item.operation.value
)
self.assertTrue(request_batch_item.ephemeral)
# self.assertEqual(
# payloads.DestroyRequestPayload(
# unique_identifier=attr.UniqueIdentifier(
# value="fb4b5b9c-6188-4c63-8142-fe9c328129fc"
# )
# ),
# request_batch_item.request_payload
# )
def test_write_kmip_2_0(self):
request_batch_item = messages.RequestBatchItem(
operation=contents.Operation(enums.Operation.DESTROY),
ephemeral=True,
request_payload=payloads.DestroyRequestPayload(
unique_identifier=attr.UniqueIdentifier(
"fb4b5b9c-6188-4c63-8142-fe9c328129fc"
)
)
)
buffer = utils.BytearrayStream()
request_batch_item.write(
buffer,
kmip_version=enums.KMIPVersion.KMIP_2_0
)
self.assertEqual(len(self.encoding_kmip_2_0), len(buffer))
self.assertEqual(str(self.encoding_kmip_2_0), str(buffer))
class TestRequestMessage(TestCase):
def setUp(self):