mirror of https://github.com/OpenKMIP/PyKMIP.git
Adding payloads for the GetAttributeList operation
This change adds request and response payloads for the GetAttributeList operation. It updates the payload factories to support these new objects and updates and adds all associated or required test suites. A new exception is also included that is thrown by objects attempting to parse invalid KMIP encodings.
This commit is contained in:
parent
35602c9b5c
commit
a21f0b66c6
|
@ -0,0 +1,21 @@
|
|||
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
class InvalidKmipEncoding(Exception):
|
||||
"""
|
||||
An exception raised when processing invalid KMIP message encodings.
|
||||
"""
|
||||
pass
|
|
@ -21,6 +21,7 @@ from kmip.core.messages.payloads import create_key_pair
|
|||
from kmip.core.messages.payloads import destroy
|
||||
from kmip.core.messages.payloads import discover_versions
|
||||
from kmip.core.messages.payloads import get
|
||||
from kmip.core.messages.payloads import get_attribute_list
|
||||
from kmip.core.messages.payloads import locate
|
||||
from kmip.core.messages.payloads import query
|
||||
from kmip.core.messages.payloads import rekey_key_pair
|
||||
|
@ -48,6 +49,9 @@ class RequestPayloadFactory(PayloadFactory):
|
|||
def _create_get_payload(self):
|
||||
return get.GetRequestPayload()
|
||||
|
||||
def _create_get_attribute_list_payload(self):
|
||||
return get_attribute_list.GetAttributeListRequestPayload()
|
||||
|
||||
def _create_destroy_payload(self):
|
||||
return destroy.DestroyRequestPayload()
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ from kmip.core.messages.payloads import create_key_pair
|
|||
from kmip.core.messages.payloads import destroy
|
||||
from kmip.core.messages.payloads import discover_versions
|
||||
from kmip.core.messages.payloads import get
|
||||
from kmip.core.messages.payloads import get_attribute_list
|
||||
from kmip.core.messages.payloads import locate
|
||||
from kmip.core.messages.payloads import query
|
||||
from kmip.core.messages.payloads import rekey_key_pair
|
||||
|
@ -48,6 +49,9 @@ class ResponsePayloadFactory(PayloadFactory):
|
|||
def _create_get_payload(self):
|
||||
return get.GetResponsePayload()
|
||||
|
||||
def _create_get_attribute_list_payload(self):
|
||||
return get_attribute_list.GetAttributeListResponsePayload()
|
||||
|
||||
def _create_destroy_payload(self):
|
||||
return destroy.DestroyResponsePayload()
|
||||
|
||||
|
|
|
@ -0,0 +1,269 @@
|
|||
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
|
||||
from kmip.core import enums
|
||||
from kmip.core import exceptions
|
||||
from kmip.core import primitives
|
||||
from kmip.core import utils
|
||||
|
||||
|
||||
class GetAttributeListRequestPayload(primitives.Struct):
|
||||
"""
|
||||
A request payload for the GetAttributeList operation.
|
||||
|
||||
The payload can contain the ID of the managed object the attributes should
|
||||
belong too. If omitted, the server will use the ID placeholder by default.
|
||||
See Section 4.13 of the KMIP 1.1 specification for more information.
|
||||
|
||||
Attributes:
|
||||
uid: The unique ID of the managed object with which the retrieved
|
||||
attributes should be associated.
|
||||
"""
|
||||
def __init__(self, uid=None):
|
||||
"""
|
||||
Construct a GetAttributeList request payload.
|
||||
|
||||
Args:
|
||||
uid (string): The ID of the managed object with which the retrieved
|
||||
attributes should be associated. Optional, defaults to None.
|
||||
"""
|
||||
super(GetAttributeListRequestPayload, self).__init__(
|
||||
enums.Tags.REQUEST_PAYLOAD)
|
||||
|
||||
self.uid = uid
|
||||
|
||||
self.validate()
|
||||
|
||||
def read(self, istream):
|
||||
"""
|
||||
Read the data encoding the GetAttributeList request payload and decode
|
||||
it into its constituent parts.
|
||||
|
||||
Args:
|
||||
istream (stream): A data stream containing encoded object data,
|
||||
supporting a read method; usually a BytearrayStream object.
|
||||
"""
|
||||
super(GetAttributeListRequestPayload, self).read(istream)
|
||||
tstream = utils.BytearrayStream(istream.read(self.length))
|
||||
|
||||
if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream):
|
||||
uid = primitives.TextString(tag=enums.Tags.UNIQUE_IDENTIFIER)
|
||||
uid.read(tstream)
|
||||
self.uid = uid.value
|
||||
else:
|
||||
self.uid = None
|
||||
|
||||
self.is_oversized(tstream)
|
||||
self.validate()
|
||||
|
||||
def write(self, ostream):
|
||||
"""
|
||||
Write the data encoding the GetAttributeList request payload to a
|
||||
stream.
|
||||
|
||||
Args:
|
||||
ostream (stream): A data stream in which to encode object data,
|
||||
supporting a write method; usually a BytearrayStream object.
|
||||
"""
|
||||
tstream = utils.BytearrayStream()
|
||||
|
||||
if self.uid:
|
||||
uid = primitives.TextString(
|
||||
value=self.uid, tag=enums.Tags.UNIQUE_IDENTIFIER)
|
||||
uid.write(tstream)
|
||||
|
||||
self.length = tstream.length()
|
||||
super(GetAttributeListRequestPayload, self).write(ostream)
|
||||
ostream.write(tstream.buffer)
|
||||
|
||||
def validate(self):
|
||||
"""
|
||||
Error check the attributes of the GetAttributeList request payload.
|
||||
"""
|
||||
if self.uid is not None:
|
||||
if not isinstance(self.uid, six.string_types):
|
||||
raise TypeError(
|
||||
"uid must be a string; "
|
||||
"expected (one of): {0}, observed: {1}".format(
|
||||
six.string_types, type(self.uid)))
|
||||
|
||||
def __repr__(self):
|
||||
uid = "uid={0}".format(self.uid)
|
||||
return "GetAttributeListRequestPayload({0})".format(uid)
|
||||
|
||||
def __str__(self):
|
||||
return str({'uid': self.uid})
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, GetAttributeListRequestPayload):
|
||||
if self.uid == other.uid:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __ne__(self, other):
|
||||
if isinstance(other, GetAttributeListRequestPayload):
|
||||
return not self.__eq__(other)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
|
||||
class GetAttributeListResponsePayload(primitives.Struct):
|
||||
"""
|
||||
A response payload for the GetAttributeList operation.
|
||||
|
||||
The payload will contain the ID of the managed object with which the
|
||||
attributes are associated. It will also contain a list of attribute names
|
||||
identifying the types of attributes associated with the aforementioned
|
||||
managed object. See Section 4.13 of the KMIP 1.1 specification for more
|
||||
information.
|
||||
|
||||
Attributes:
|
||||
uid: The unique ID of the managed object with which the retrieved
|
||||
attributes should be associated.
|
||||
attribute_names: The list of attribute names of the attributes
|
||||
associated with managed object identified by the uid above.
|
||||
"""
|
||||
def __init__(self, uid=None, attribute_names=None):
|
||||
"""
|
||||
Construct a GetAttributeList response payload.
|
||||
|
||||
Args:
|
||||
uid (string): The ID of the managed object with which the retrieved
|
||||
attributes should be associated. Optional, defaults to None.
|
||||
attribute_names (list): A list of strings identifying the names of
|
||||
the attributes associated with the managed object. Optional,
|
||||
defaults to None.
|
||||
"""
|
||||
super(GetAttributeListResponsePayload, self).__init__(
|
||||
enums.Tags.RESPONSE_PAYLOAD)
|
||||
|
||||
self.uid = uid
|
||||
|
||||
if attribute_names:
|
||||
self.attribute_names = attribute_names
|
||||
else:
|
||||
self.attribute_names = list()
|
||||
|
||||
self.validate()
|
||||
|
||||
def read(self, istream):
|
||||
"""
|
||||
Read the data encoding the GetAttributeList response payload and decode
|
||||
it into its constituent parts.
|
||||
|
||||
Args:
|
||||
istream (stream): A data stream containing encoded object data,
|
||||
supporting a read method; usually a BytearrayStream object.
|
||||
"""
|
||||
super(GetAttributeListResponsePayload, self).read(istream)
|
||||
tstream = utils.BytearrayStream(istream.read(self.length))
|
||||
|
||||
if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream):
|
||||
uid = primitives.TextString(tag=enums.Tags.UNIQUE_IDENTIFIER)
|
||||
uid.read(tstream)
|
||||
self.uid = uid.value
|
||||
else:
|
||||
raise exceptions.InvalidKmipEncoding(
|
||||
"expected uid encoding not found")
|
||||
|
||||
names = list()
|
||||
while(self.is_tag_next(enums.Tags.ATTRIBUTE_NAME, tstream)):
|
||||
name = primitives.TextString(tag=enums.Tags.ATTRIBUTE_NAME)
|
||||
name.read(tstream)
|
||||
names.append(name.value)
|
||||
self.attribute_names = names
|
||||
|
||||
self.is_oversized(tstream)
|
||||
self.validate()
|
||||
|
||||
def write(self, ostream):
|
||||
"""
|
||||
Write the data encoding the GetAttributeList response payload to a
|
||||
stream.
|
||||
|
||||
Args:
|
||||
ostream (stream): A data stream in which to encode object data,
|
||||
supporting a write method; usually a BytearrayStream object.
|
||||
"""
|
||||
tstream = utils.BytearrayStream()
|
||||
|
||||
uid = primitives.TextString(
|
||||
value=self.uid, tag=enums.Tags.UNIQUE_IDENTIFIER)
|
||||
uid.write(tstream)
|
||||
|
||||
for name in self.attribute_names:
|
||||
name = primitives.TextString(
|
||||
value=name, tag=enums.Tags.ATTRIBUTE_NAME)
|
||||
name.write(tstream)
|
||||
|
||||
self.length = tstream.length()
|
||||
super(GetAttributeListResponsePayload, self).write(ostream)
|
||||
ostream.write(tstream.buffer)
|
||||
|
||||
def validate(self):
|
||||
"""
|
||||
Error check the attributes of the GetAttributeList response payload.
|
||||
"""
|
||||
if self.uid is not None:
|
||||
if not isinstance(self.uid, six.string_types):
|
||||
raise TypeError(
|
||||
"uid must be a string; "
|
||||
"expected (one of): {0}, observed: {1}".format(
|
||||
six.string_types, type(self.uid)))
|
||||
|
||||
if self.attribute_names:
|
||||
if not isinstance(self.attribute_names, list):
|
||||
raise TypeError("attribute names must be a list")
|
||||
for i in range(len(self.attribute_names)):
|
||||
name = self.attribute_names[i]
|
||||
if not isinstance(name, six.string_types):
|
||||
raise TypeError(
|
||||
"attribute name ({0} of {1}) must be a string".format(
|
||||
i + 1, len(self.attribute_names)))
|
||||
|
||||
def __repr__(self):
|
||||
uid = "uid={0}".format(self.uid)
|
||||
names = "attribute_names={0}".format(self.attribute_names)
|
||||
return "GetAttributeListResponsePayload({0}, {1})".format(uid, names)
|
||||
|
||||
def __str__(self):
|
||||
return str({'uid': self.uid, 'attribute_names': self.attribute_names})
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, GetAttributeListResponsePayload):
|
||||
if self.uid != other.uid:
|
||||
return False
|
||||
elif ((isinstance(self.attribute_names, list) and
|
||||
isinstance(other.attribute_names, list)) and
|
||||
len(self.attribute_names) == len(other.attribute_names)):
|
||||
for name in self.attribute_names:
|
||||
if name not in other.attribute_names:
|
||||
return False
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __ne__(self, other):
|
||||
if isinstance(other, GetAttributeListResponsePayload):
|
||||
return not self.__eq__(other)
|
||||
else:
|
||||
return NotImplemented
|
|
@ -24,6 +24,7 @@ from kmip.core.messages.payloads import create_key_pair
|
|||
from kmip.core.messages.payloads import destroy
|
||||
from kmip.core.messages.payloads import discover_versions
|
||||
from kmip.core.messages.payloads import get
|
||||
from kmip.core.messages.payloads import get_attribute_list
|
||||
from kmip.core.messages.payloads import locate
|
||||
from kmip.core.messages.payloads import query
|
||||
from kmip.core.messages.payloads import rekey_key_pair
|
||||
|
@ -93,8 +94,9 @@ class TestRequestPayloadFactory(testtools.TestCase):
|
|||
self.factory.create, Operation.GET_ATTRIBUTES)
|
||||
|
||||
def test_create_get_attributes_list_payload(self):
|
||||
self._test_not_implemented(
|
||||
self.factory.create, Operation.GET_ATTRIBUTE_LIST)
|
||||
payload = self.factory.create(Operation.GET_ATTRIBUTE_LIST)
|
||||
self._test_payload_type(
|
||||
payload, get_attribute_list.GetAttributeListRequestPayload)
|
||||
|
||||
def test_create_add_attribute_payload(self):
|
||||
self._test_not_implemented(
|
||||
|
|
|
@ -24,6 +24,7 @@ from kmip.core.messages.payloads import create_key_pair
|
|||
from kmip.core.messages.payloads import destroy
|
||||
from kmip.core.messages.payloads import discover_versions
|
||||
from kmip.core.messages.payloads import get
|
||||
from kmip.core.messages.payloads import get_attribute_list
|
||||
from kmip.core.messages.payloads import locate
|
||||
from kmip.core.messages.payloads import query
|
||||
from kmip.core.messages.payloads import rekey_key_pair
|
||||
|
@ -93,8 +94,9 @@ class TestResponsePayloadFactory(testtools.TestCase):
|
|||
self.factory.create, Operation.GET_ATTRIBUTES)
|
||||
|
||||
def test_create_get_attributes_list_payload(self):
|
||||
self._test_not_implemented(
|
||||
self.factory.create, Operation.GET_ATTRIBUTE_LIST)
|
||||
payload = self.factory.create(Operation.GET_ATTRIBUTE_LIST)
|
||||
self._test_payload_type(
|
||||
payload, get_attribute_list.GetAttributeListResponsePayload)
|
||||
|
||||
def test_create_add_attribute_payload(self):
|
||||
self._test_not_implemented(
|
||||
|
|
|
@ -0,0 +1,544 @@
|
|||
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import testtools
|
||||
|
||||
from kmip.core import exceptions
|
||||
from kmip.core import utils
|
||||
|
||||
from kmip.core.messages.payloads import get_attribute_list
|
||||
|
||||
|
||||
class TestGetAttributeListRequestPayload(testtools.TestCase):
|
||||
"""
|
||||
Test suite for the GetAttributeList request payload.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestGetAttributeListRequestPayload, self).setUp()
|
||||
|
||||
# Encodings taken from Sections 3.1.4 of the KMIP 1.1 testing
|
||||
# documentation.
|
||||
self.encoding_with_uid = utils.BytearrayStream((
|
||||
b'\x42\x00\x79\x01\x00\x00\x00\x30\x42\x00\x94\x07\x00\x00\x00\x24'
|
||||
b'\x62\x34\x66\x61\x65\x65\x31\x30\x2D\x61\x61\x32\x61\x2D\x34\x34'
|
||||
b'\x34\x36\x2D\x38\x61\x64\x34\x2D\x30\x38\x38\x31\x66\x33\x34\x32'
|
||||
b'\x32\x39\x35\x39\x00\x00\x00\x00'))
|
||||
|
||||
self.encoding_without_uid = utils.BytearrayStream((
|
||||
b'\x42\x00\x79\x01\x00\x00\x00\x00'))
|
||||
|
||||
self.uid = 'b4faee10-aa2a-4446-8ad4-0881f3422959'
|
||||
|
||||
def tearDown(self):
|
||||
super(TestGetAttributeListRequestPayload, self).tearDown()
|
||||
|
||||
def test_init(self):
|
||||
"""
|
||||
Test that a GetAttributeList request payload can be constructed with
|
||||
no arguments.
|
||||
"""
|
||||
get_attribute_list.GetAttributeListRequestPayload()
|
||||
|
||||
def test_init_with_args(self):
|
||||
"""
|
||||
Test that a GetAttributeList request payload can be constructed with a
|
||||
valid value.
|
||||
"""
|
||||
get_attribute_list.GetAttributeListRequestPayload(self.uid)
|
||||
|
||||
def test_validate_with_invalid_uid(self):
|
||||
"""
|
||||
Test that a TypeError exception is raised when an invalid ID is used
|
||||
to construct a GetAttributeList request payload.
|
||||
"""
|
||||
kwargs = {'uid': 0}
|
||||
self.assertRaisesRegexp(
|
||||
TypeError, "uid must be a string",
|
||||
get_attribute_list.GetAttributeListRequestPayload, **kwargs)
|
||||
|
||||
def test_read(self):
|
||||
"""
|
||||
Test that a GetAttributeList request payload can be read from a data
|
||||
stream.
|
||||
"""
|
||||
payload = get_attribute_list.GetAttributeListRequestPayload()
|
||||
payload.read(self.encoding_with_uid)
|
||||
self.assertEqual(self.uid, payload.uid)
|
||||
|
||||
def test_read_with_no_uid(self):
|
||||
"""
|
||||
Test that a GetAttributeList request payload with no ID can be read
|
||||
from a data stream.
|
||||
"""
|
||||
payload = get_attribute_list.GetAttributeListRequestPayload()
|
||||
payload.read(self.encoding_without_uid)
|
||||
self.assertEqual(None, payload.uid)
|
||||
|
||||
def test_write(self):
|
||||
"""
|
||||
Test that a GetAttributeList request payload can be written to a data
|
||||
stream.
|
||||
"""
|
||||
payload = get_attribute_list.GetAttributeListRequestPayload(self.uid)
|
||||
stream = utils.BytearrayStream()
|
||||
payload.write(stream)
|
||||
|
||||
self.assertEqual(len(self.encoding_with_uid), len(stream))
|
||||
self.assertEqual(self.encoding_with_uid, stream)
|
||||
|
||||
def test_write_with_no_uid(self):
|
||||
"""
|
||||
Test that a GetAttributeList request payload with no ID can be written
|
||||
to a data stream.
|
||||
"""
|
||||
payload = get_attribute_list.GetAttributeListRequestPayload()
|
||||
stream = utils.BytearrayStream()
|
||||
payload.write(stream)
|
||||
|
||||
self.assertEqual(len(self.encoding_without_uid), len(stream))
|
||||
self.assertEqual(self.encoding_without_uid, stream)
|
||||
|
||||
def test_repr(self):
|
||||
"""
|
||||
Test that repr can be applied to a GetAttributeList request payload.
|
||||
"""
|
||||
payload = get_attribute_list.GetAttributeListRequestPayload(self.uid)
|
||||
args = "uid={0}".format(payload.uid)
|
||||
expected = "GetAttributeListRequestPayload({0})".format(args)
|
||||
observed = repr(payload)
|
||||
self.assertEqual(expected, observed)
|
||||
|
||||
def test_str(self):
|
||||
"""
|
||||
Test that str can be applied to a GetAttributeList request payload.
|
||||
"""
|
||||
payload = get_attribute_list.GetAttributeListRequestPayload(self.uid)
|
||||
expected = str({'uid': payload.uid})
|
||||
observed = str(payload)
|
||||
self.assertEqual(expected, observed)
|
||||
|
||||
def test_equal_on_equal(self):
|
||||
"""
|
||||
Test that the equality operator returns True when comparing two
|
||||
GetAttributeList request payloads with the same data.
|
||||
"""
|
||||
a = get_attribute_list.GetAttributeListRequestPayload(self.uid)
|
||||
b = get_attribute_list.GetAttributeListRequestPayload(self.uid)
|
||||
|
||||
self.assertTrue(a == b)
|
||||
self.assertTrue(b == a)
|
||||
|
||||
def test_equal_on_not_equal_uid(self):
|
||||
"""
|
||||
Test that the equality operator returns False when comparing two
|
||||
GetAttributeList request payloads with different data.
|
||||
"""
|
||||
a = get_attribute_list.GetAttributeListRequestPayload(self.uid)
|
||||
b = get_attribute_list.GetAttributeListRequestPayload('invalid')
|
||||
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
||||
def test_equal_on_type_mismatch(self):
|
||||
"""
|
||||
Test that the equality operator returns False when comparing a
|
||||
GetAttributeList request payload to a non-GetAttributeList request
|
||||
payload.
|
||||
"""
|
||||
a = get_attribute_list.GetAttributeListRequestPayload(self.uid)
|
||||
b = "invalid"
|
||||
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
||||
def test_not_equal_on_equal(self):
|
||||
"""
|
||||
Test that the inequality operator returns False when comparing
|
||||
two GetAttributeList request payloads with the same internal data.
|
||||
"""
|
||||
a = get_attribute_list.GetAttributeListRequestPayload(self.uid)
|
||||
b = get_attribute_list.GetAttributeListRequestPayload(self.uid)
|
||||
|
||||
self.assertFalse(a != b)
|
||||
self.assertFalse(b != a)
|
||||
|
||||
def test_not_equal_on_not_equal_uid(self):
|
||||
"""
|
||||
Test that the inequality operator returns True when comparing two
|
||||
GetAttributeList request payloads with different data.
|
||||
"""
|
||||
a = get_attribute_list.GetAttributeListRequestPayload(self.uid)
|
||||
b = get_attribute_list.GetAttributeListRequestPayload('invalid')
|
||||
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
def test_not_equal_on_type_mismatch(self):
|
||||
"""
|
||||
Test that the equality operator returns True when comparing a
|
||||
GetAttributeList request payload to a non-GetAttributeList request
|
||||
payload.
|
||||
"""
|
||||
a = get_attribute_list.GetAttributeListRequestPayload(self.uid)
|
||||
b = "invalid"
|
||||
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
|
||||
class TestGetAttributeListResponsePayload(testtools.TestCase):
|
||||
"""
|
||||
Test encodings obtained from Sections 12.1 and 12.2 of the KMIP 1.1 Test
|
||||
Cases documentation.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestGetAttributeListResponsePayload, self).setUp()
|
||||
|
||||
# Encodings taken from Sections 3.1.4 of the KMIP 1.1 testing
|
||||
# documentation.
|
||||
self.encoding_with_uid_with_names = utils.BytearrayStream((
|
||||
b'\x42\x00\x7C\x01\x00\x00\x01\x60\x42\x00\x94\x07\x00\x00\x00\x24'
|
||||
b'\x62\x34\x66\x61\x65\x65\x31\x30\x2D\x61\x61\x32\x61\x2D\x34\x34'
|
||||
b'\x34\x36\x2D\x38\x61\x64\x34\x2D\x30\x38\x38\x31\x66\x33\x34\x32'
|
||||
b'\x32\x39\x35\x39\x00\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x14'
|
||||
b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x4C\x65'
|
||||
b'\x6E\x67\x74\x68\x00\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x17'
|
||||
b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x41\x6C'
|
||||
b'\x67\x6F\x72\x69\x74\x68\x6D\x00\x42\x00\x0A\x07\x00\x00\x00\x05'
|
||||
b'\x53\x74\x61\x74\x65\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x06'
|
||||
b'\x44\x69\x67\x65\x73\x74\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x0A'
|
||||
b'\x4C\x65\x61\x73\x65\x20\x54\x69\x6D\x65\x00\x00\x00\x00\x00\x00'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x0C\x49\x6E\x69\x74\x69\x61\x6C\x20'
|
||||
b'\x44\x61\x74\x65\x00\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x11'
|
||||
b'\x55\x6E\x69\x71\x75\x65\x20\x49\x64\x65\x6E\x74\x69\x66\x69\x65'
|
||||
b'\x72\x00\x00\x00\x00\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x04'
|
||||
b'\x4E\x61\x6D\x65\x00\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x18'
|
||||
b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x55\x73'
|
||||
b'\x61\x67\x65\x20\x4D\x61\x73\x6B\x42\x00\x0A\x07\x00\x00\x00\x0B'
|
||||
b'\x4F\x62\x6A\x65\x63\x74\x20\x54\x79\x70\x65\x00\x00\x00\x00\x00'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x13\x43\x6F\x6E\x74\x61\x63\x74\x20'
|
||||
b'\x49\x6E\x66\x6F\x72\x6D\x61\x74\x69\x6F\x6E\x00\x00\x00\x00\x00'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x10\x4C\x61\x73\x74\x20\x43\x68\x61'
|
||||
b'\x6E\x67\x65\x20\x44\x61\x74\x65'))
|
||||
self.encoding_without_uid_with_names = utils.BytearrayStream((
|
||||
b'\x42\x00\x7C\x01\x00\x00\x01\x60\x42\x00\x0A\x07\x00\x00\x00\x14'
|
||||
b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x4C\x65'
|
||||
b'\x6E\x67\x74\x68\x00\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x17'
|
||||
b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x41\x6C'
|
||||
b'\x67\x6F\x72\x69\x74\x68\x6D\x00\x42\x00\x0A\x07\x00\x00\x00\x05'
|
||||
b'\x53\x74\x61\x74\x65\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x06'
|
||||
b'\x44\x69\x67\x65\x73\x74\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x0A'
|
||||
b'\x4C\x65\x61\x73\x65\x20\x54\x69\x6D\x65\x00\x00\x00\x00\x00\x00'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x0C\x49\x6E\x69\x74\x69\x61\x6C\x20'
|
||||
b'\x44\x61\x74\x65\x00\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x11'
|
||||
b'\x55\x6E\x69\x71\x75\x65\x20\x49\x64\x65\x6E\x74\x69\x66\x69\x65'
|
||||
b'\x72\x00\x00\x00\x00\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x04'
|
||||
b'\x4E\x61\x6D\x65\x00\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x18'
|
||||
b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x55\x73'
|
||||
b'\x61\x67\x65\x20\x4D\x61\x73\x6B\x42\x00\x0A\x07\x00\x00\x00\x0B'
|
||||
b'\x4F\x62\x6A\x65\x63\x74\x20\x54\x79\x70\x65\x00\x00\x00\x00\x00'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x13\x43\x6F\x6E\x74\x61\x63\x74\x20'
|
||||
b'\x49\x6E\x66\x6F\x72\x6D\x61\x74\x69\x6F\x6E\x00\x00\x00\x00\x00'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x10\x4C\x61\x73\x74\x20\x43\x68\x61'
|
||||
b'\x6E\x67\x65\x20\x44\x61\x74\x65'))
|
||||
self.encoding_with_uid_without_names = utils.BytearrayStream((
|
||||
b'\x42\x00\x7C\x01\x00\x00\x00\x30\x42\x00\x94\x07\x00\x00\x00\x24'
|
||||
b'\x62\x34\x66\x61\x65\x65\x31\x30\x2D\x61\x61\x32\x61\x2D\x34\x34'
|
||||
b'\x34\x36\x2D\x38\x61\x64\x34\x2D\x30\x38\x38\x31\x66\x33\x34\x32'
|
||||
b'\x32\x39\x35\x39\x00\x00\x00\x00'))
|
||||
|
||||
self.uid = 'b4faee10-aa2a-4446-8ad4-0881f3422959'
|
||||
self.attribute_names = list((
|
||||
'Cryptographic Length',
|
||||
'Cryptographic Algorithm',
|
||||
'State',
|
||||
'Digest',
|
||||
'Lease Time',
|
||||
'Initial Date',
|
||||
'Unique Identifier',
|
||||
'Name',
|
||||
'Cryptographic Usage Mask',
|
||||
'Object Type',
|
||||
'Contact Information',
|
||||
'Last Change Date'))
|
||||
|
||||
def tearDown(self):
|
||||
super(TestGetAttributeListResponsePayload, self).tearDown()
|
||||
|
||||
def test_init(self):
|
||||
"""
|
||||
Test that a GetAttributeList response payload can be constructed.
|
||||
"""
|
||||
get_attribute_list.GetAttributeListResponsePayload()
|
||||
|
||||
def test_init_with_args(self):
|
||||
"""
|
||||
Test that a GetAttributeList response payload can be constructed with
|
||||
valid values.
|
||||
"""
|
||||
get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, self.attribute_names)
|
||||
|
||||
def test_validate_with_invalid_uid(self):
|
||||
"""
|
||||
Test that a TypeError exception is raised when an invalid ID is used
|
||||
to construct a GetAttributeList response payload.
|
||||
"""
|
||||
kwargs = {'uid': 0, 'attribute_names': self.attribute_names}
|
||||
self.assertRaisesRegexp(
|
||||
TypeError, "uid must be a string",
|
||||
get_attribute_list.GetAttributeListResponsePayload, **kwargs)
|
||||
|
||||
def test_validate_with_invalid_attribute_names(self):
|
||||
"""
|
||||
Test that a TypeError exception is raised when an invalid attribute
|
||||
name list is used to construct a GetAttributeList response payload.
|
||||
"""
|
||||
kwargs = {'uid': self.uid, 'attribute_names': 'invalid'}
|
||||
self.assertRaisesRegexp(
|
||||
TypeError, "attribute names must be a list",
|
||||
get_attribute_list.GetAttributeListResponsePayload, **kwargs)
|
||||
|
||||
def test_validate_with_invalid_attribute_name(self):
|
||||
"""
|
||||
Test that a TypeError exception is raised when an invalid attribute
|
||||
name is used to construct a GetAttributeList response payload object.
|
||||
"""
|
||||
kwargs = {'uid': self.uid, 'attribute_names': [0]}
|
||||
self.assertRaises(
|
||||
TypeError, get_attribute_list.GetAttributeListResponsePayload,
|
||||
**kwargs)
|
||||
|
||||
kwargs = {'uid': self.uid, 'attribute_names': ['', 0, '']}
|
||||
self.assertRaises(
|
||||
TypeError, get_attribute_list.GetAttributeListResponsePayload,
|
||||
**kwargs)
|
||||
|
||||
def test_read(self):
|
||||
"""
|
||||
Test that a GetAttributeList response payload can be read from a data
|
||||
stream.
|
||||
"""
|
||||
payload = get_attribute_list.GetAttributeListResponsePayload()
|
||||
payload.read(self.encoding_with_uid_with_names)
|
||||
|
||||
self.assertEqual(self.uid, payload.uid)
|
||||
self.assertEqual(self.attribute_names, payload.attribute_names)
|
||||
|
||||
def test_read_with_no_uid(self):
|
||||
"""
|
||||
Test that an InvalidKmipEncoding error gets raised when attempting to
|
||||
read a GetAttributeList response encoding with no ID data.
|
||||
"""
|
||||
payload = get_attribute_list.GetAttributeListResponsePayload()
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidKmipEncoding, "expected uid encoding not found",
|
||||
payload.read, self.encoding_without_uid_with_names)
|
||||
|
||||
def test_read_with_no_attribute_names(self):
|
||||
"""
|
||||
Test that a GetAttributeList response payload without attribute name
|
||||
data can be read from a data stream.
|
||||
"""
|
||||
payload = get_attribute_list.GetAttributeListResponsePayload()
|
||||
payload.read(self.encoding_with_uid_without_names)
|
||||
|
||||
self.assertEqual(self.uid, payload.uid)
|
||||
self.assertEqual(list(), payload.attribute_names)
|
||||
|
||||
def test_write(self):
|
||||
"""
|
||||
Test that a GetAttributeList response payload can be written to a data
|
||||
stream.
|
||||
"""
|
||||
payload = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, self.attribute_names)
|
||||
stream = utils.BytearrayStream()
|
||||
payload.write(stream)
|
||||
|
||||
self.assertEqual(len(self.encoding_with_uid_with_names), len(stream))
|
||||
self.assertEqual(self.encoding_with_uid_with_names, stream)
|
||||
|
||||
def test_write_with_no_attribute_names(self):
|
||||
"""
|
||||
Test that a GetAttributeList response payload with no attribute name
|
||||
data can be written to a data stream.
|
||||
"""
|
||||
payload = get_attribute_list.GetAttributeListResponsePayload(self.uid)
|
||||
stream = utils.BytearrayStream()
|
||||
payload.write(stream)
|
||||
|
||||
self.assertEqual(
|
||||
len(self.encoding_with_uid_without_names), len(stream))
|
||||
self.assertEqual(self.encoding_with_uid_without_names, stream)
|
||||
|
||||
def test_repr(self):
|
||||
"""
|
||||
Test that repr can be applied to a GetAttributeList response payload.
|
||||
"""
|
||||
payload = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, self.attribute_names)
|
||||
args = "uid={0}, attribute_names={1}".format(
|
||||
payload.uid, payload.attribute_names)
|
||||
expected = "GetAttributeListResponsePayload({0})".format(args)
|
||||
observed = repr(payload)
|
||||
self.assertEqual(expected, observed)
|
||||
|
||||
def test_str(self):
|
||||
"""
|
||||
Test that str can be applied to a GetAttributeList response payload.
|
||||
"""
|
||||
payload = get_attribute_list.GetAttributeListResponsePayload(self.uid)
|
||||
expected = str({'uid': payload.uid,
|
||||
'attribute_names': payload.attribute_names})
|
||||
observed = str(payload)
|
||||
self.assertEqual(expected, observed)
|
||||
|
||||
def test_equal_on_equal(self):
|
||||
"""
|
||||
Test that the equality operator returns True when comparing two
|
||||
GetAttributeList response payloads with the same data.
|
||||
"""
|
||||
a = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, self.attribute_names)
|
||||
b = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, self.attribute_names)
|
||||
|
||||
self.assertTrue(a == b)
|
||||
self.assertTrue(b == a)
|
||||
|
||||
def test_equal_on_not_equal_uid(self):
|
||||
"""
|
||||
Test that the equality operator returns False when comparing two
|
||||
GetAttributeList response payloads with different data.
|
||||
"""
|
||||
a = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, self.attribute_names)
|
||||
b = get_attribute_list.GetAttributeListResponsePayload(
|
||||
'invalid', self.attribute_names)
|
||||
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
||||
def test_equal_on_not_equal_attribute_names(self):
|
||||
"""
|
||||
Test that the equality operator returns False when comparing two
|
||||
GetAttributeList response payloads with different data.
|
||||
"""
|
||||
a = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, self.attribute_names)
|
||||
b = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, list())
|
||||
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
||||
def test_equal_on_not_equal_attribute_name(self):
|
||||
"""
|
||||
Test that the equality operator returns False when comparing two
|
||||
GetAttributeList response payloads with different data.
|
||||
"""
|
||||
alt_names = copy.deepcopy(self.attribute_names)
|
||||
alt_names[0] = 'invalid'
|
||||
a = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, self.attribute_names)
|
||||
b = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, alt_names)
|
||||
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
||||
def test_equal_on_type_mismatch(self):
|
||||
"""
|
||||
Test that the equality operator returns False when comparing a
|
||||
GetAttributeList response payload to a non-GetAttributeList response
|
||||
payload.
|
||||
"""
|
||||
a = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, self.attribute_names)
|
||||
b = 'invalid'
|
||||
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
||||
def test_not_equal_on_equal(self):
|
||||
"""
|
||||
Test that the inequality operator returns False when comparing
|
||||
two GetAttributeList response payloads with the same internal data.
|
||||
"""
|
||||
a = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, self.attribute_names)
|
||||
b = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, self.attribute_names)
|
||||
|
||||
self.assertFalse(a != b)
|
||||
self.assertFalse(b != a)
|
||||
|
||||
def test_not_equal_on_not_equal_uid(self):
|
||||
"""
|
||||
Test that the inequality operator returns True when comparing two
|
||||
GetAttributeList request payloads with different data.
|
||||
"""
|
||||
a = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, self.attribute_names)
|
||||
b = get_attribute_list.GetAttributeListResponsePayload(
|
||||
'invalid', self.attribute_names)
|
||||
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
def test_not_equal_on_not_equal_attribute_names(self):
|
||||
"""
|
||||
Test that the inequality operator returns False when comparing two
|
||||
GetAttributeList response payloads with different data.
|
||||
"""
|
||||
a = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, self.attribute_names)
|
||||
b = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, list())
|
||||
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
def test_not_equal_on_not_equal_attribute_name(self):
|
||||
"""
|
||||
Test that the inequality operator returns False when comparing two
|
||||
GetAttributeList response payloads with different data.
|
||||
"""
|
||||
alt_names = copy.deepcopy(self.attribute_names)
|
||||
alt_names[0] = 'Operation Policy Name'
|
||||
a = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, self.attribute_names)
|
||||
b = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, alt_names)
|
||||
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
def test_not_equal_on_type_mismatch(self):
|
||||
"""
|
||||
Test that the inequality operator returns True when comparing a
|
||||
GetAttributeList response payload to a non-GetAttributeList response
|
||||
payload.
|
||||
"""
|
||||
a = get_attribute_list.GetAttributeListResponsePayload(
|
||||
self.uid, self.attribute_names)
|
||||
b = "invalid"
|
||||
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
Loading…
Reference in New Issue