Updating application attributes

This change updates the various Application attributes, adding inline
documentation, and reorganizing the different related test suites. Minor
updates to supporting primitive objects are included.
This commit is contained in:
Peter Hamilton 2015-02-23 14:13:21 -05:00
parent 0cd2d3dab6
commit 0c4f9cd9d0
9 changed files with 576 additions and 164 deletions

View File

@ -299,36 +299,97 @@ class ObjectGroup(TextString):
# 3.36
class ApplicationNamespace(TextString):
"""
The name of a namespace supported by the KMIP server.
A part of ApplicationSpecificInformation, sets of these are also potential
responses to a Query request. See Sections 3.36 and 4.25 of the KMIP v1.1
specification for more information.
"""
def __init__(self, value=None):
"""
Construct an ApplicationNamespace object.
Args:
value (str): A string representing a namespace. Optional, defaults
to None.
"""
super(ApplicationNamespace, self).__init__(
value, Tags.APPLICATION_NAMESPACE)
class ApplicationData(TextString):
"""
A string representing data specific to an application namespace.
A part of ApplicationSpecificInformation. See Section 3.36 of the KMIP v1.1
specification for more information.
"""
def __init__(self, value=None):
"""
Construct an ApplicationData object.
Args:
value (str): A string representing data for a particular namespace.
Optional, defaults to None.
"""
super(ApplicationData, self).__init__(value, Tags.APPLICATION_DATA)
class ApplicationSpecificInformation(Struct):
"""
A structure used to store data specific to the applications that use a
Managed Object.
class ApplicationNamespace(TextString):
An attribute of Managed Objects, it may be specified during the creation or
modification of any server Managed Object.
def __init__(self, value=None):
super(self.__class__,
self).__init__(value, Tags.APPLICATION_NAMESPACE)
Attributes:
application_namespace: The name of a namespace supported by the server.
application_data: String data relevant to the specified namespace.
class ApplicationData(TextString):
See Section 3.36 of the KMIP v1.1 specification for more information.
"""
def __init__(self, value=None):
super(self.__class__,
self).__init__(value, Tags.APPLICATION_DATA)
def __init__(self, application_namespace=None, application_data=None):
"""
Construct an ApplicationSpecificInformation object.
def __init__(self, application_namespace=None,
application_data=None):
super(self.__class__,
self).__init__(Tags.APPLICATION_SPECIFIC_INFORMATION)
Args:
application_namespace (ApplicationNamespace): The name of a
namespace supported by the server. Optional, defaults to None.
application_data (ApplicationData): String data relevant to the
specified namespace. Optional, defaults to None.
"""
super(ApplicationSpecificInformation, self).__init__(
Tags.APPLICATION_SPECIFIC_INFORMATION)
self.application_namespace = application_namespace
self.application_data = application_data
if application_namespace is None:
self.application_namespace = ApplicationNamespace()
else:
self.application_namespace = application_namespace
if application_data is None:
self.application_data = ApplicationData()
else:
self.application_data = application_data
self.validate()
def read(self, istream):
super(self.__class__, self).read(istream)
tstream = BytearrayStream(istream.read(self.length))
"""
Read the data encoding the ApplicationSpecificInformation object and
decode it into its constituent parts.
self.application_namespace = self.ApplicationNamespace()
self.application_data = self.ApplicationData()
Args:
istream (Stream): A data stream containing encoded object data,
supporting a read method; usually a BytearrayStream object.
"""
super(ApplicationSpecificInformation, self).read(istream)
tstream = BytearrayStream(istream.read(self.length))
self.application_namespace.read(tstream)
self.application_data.read(tstream)
@ -337,57 +398,68 @@ class ApplicationSpecificInformation(Struct):
self.validate()
def write(self, ostream):
"""
Write the data encoding the ApplicationSpecificInformation object to a
stream.
Args:
ostream (Stream): A data stream in which to encode object data,
supporting a write method; usually a BytearrayStream object.
"""
tstream = BytearrayStream()
self.application_namespace.write(tstream)
self.application_data.write(tstream)
# Write the length and value of the request payload
self.length = tstream.length()
super(self.__class__, self).write(ostream)
super(ApplicationSpecificInformation, self).write(ostream)
ostream.write(tstream.buffer)
def validate(self):
"""
Error check the types of the different attributes of the
ApplicationSpecificInformation object.
"""
self.__validate()
def __validate(self):
name = self.__class__.__name__
msg = ErrorStrings.BAD_EXP_RECV
if not isinstance(self.application_namespace, ApplicationNamespace):
msg = "invalid application namespace"
msg += "; expected {0}, received {1}".format(
ApplicationNamespace, self.application_namespace)
raise TypeError(msg)
if self.application_namespace is not None:
if self.application_data is None:
member = 'application_data'
raise ValueError(msg.format('{0}.{1}'.format(name, member),
'value', 'not None', 'None'))
else:
member = 'application_namespace'
exp_type = self.ApplicationNamespace
if not isinstance(self.application_namespace, exp_type):
rcv_type = type(self.application_namespace)
raise TypeError(msg.format('{0}.{1}'.format(name, member),
'type', exp_type, rcv_type))
if self.application_data is not None:
if self.application_namespace is None:
member = 'application_namespace'
raise ValueError(msg.format('{0}.{1}'.format(name, member),
'value', 'not None', 'None'))
else:
member = 'application_data'
exp_type = self.ApplicationData
if not isinstance(self.application_data, exp_type):
rcv_type = type(self.application_data)
raise TypeError(msg.format('{0}.{1}'.format(name, member),
'type', exp_type, rcv_type))
if not isinstance(self.application_data, ApplicationData):
msg = "invalid application data"
msg += "; expected {0}, received {1}".format(
ApplicationData, self.application_data)
raise TypeError(msg)
@classmethod
def create(cls, application_namespace, application_data):
namespace = ApplicationSpecificInformation.\
ApplicationNamespace(application_namespace)
data = ApplicationSpecificInformation.\
ApplicationData(application_data)
return ApplicationSpecificInformation(application_namespace=namespace,
application_data=data)
"""
Construct an ApplicationSpecificInformation object from provided data
and namespace values.
Args:
application_namespace (str): The name of the application namespace.
application_data (str): Application data related to the namespace.
Returns:
ApplicationSpecificInformation: The newly created set of
application information.
Example:
>>> x = ApplicationSpecificInformation.create('namespace', 'data')
>>> x.application_namespace.value
'namespace'
>>> x.application_data.value
'data'
"""
namespace = ApplicationNamespace(application_namespace)
data = ApplicationData(application_data)
return ApplicationSpecificInformation(
application_namespace=namespace, application_data=data)
# 3.37

View File

@ -81,15 +81,7 @@ class Base(object):
min_bytes = 'a minimum of {0} bytes'.format(self.LENGTH_SIZE)
raise errors.ReadValueError(Base.__name__, 'length', min_bytes,
'{0} bytes'.format(num_bytes))
length = unpack('!I', lst)[0]
# Verify that the length matches the expected length, if one exists
if self.length is not None:
if length is not self.length:
raise errors.ReadValueError(Base.__name__, 'length',
self.length, length)
else:
self.length = length
self.length = unpack('!I', lst)[0]
def read_value(self, istream):
raise NotImplementedError()
@ -487,7 +479,11 @@ class TextString(Base):
def __init__(self, value=None, tag=Tags.DEFAULT):
super(TextString, self).__init__(tag, type=Types.TEXT_STRING)
self.value = value
if value is None:
self.value = ''
else:
self.value = value
self.validate()

View File

@ -0,0 +1,14 @@
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,224 @@
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from testtools import TestCase
from kmip.core.attributes import ApplicationData
from kmip.core.attributes import ApplicationNamespace
from kmip.core.attributes import ApplicationSpecificInformation
from kmip.core.utils import BytearrayStream
class TestApplicationSpecificInformation(TestCase):
"""
A test suite for the ApplicationSpecificInformation class.
"""
def setUp(self):
super(TestApplicationSpecificInformation, self).setUp()
self.encoding_default = BytearrayStream((
b'\x42\x00\x04\x01\x00\x00\x00\x10\x42\x00\x03\x07\x00\x00\x00\x00'
b'\x42\x00\x02\x07\x00\x00\x00\x00'))
self.encoding = BytearrayStream((
b'\x42\x00\x04\x01\x00\x00\x00\x28\x42\x00\x03\x07\x00\x00\x00\x03'
b'\x73\x73\x6C\x00\x00\x00\x00\x00\x42\x00\x02\x07\x00\x00\x00\x0F'
b'\x77\x77\x77\x2E\x65\x78\x61\x6D\x70\x6C\x65\x2E\x63\x6F\x6D'
b'\x00'))
def tearDown(self):
super(TestApplicationSpecificInformation, self).tearDown()
def _test_init(self, application_namespace, application_data):
application_specific_information = ApplicationSpecificInformation(
application_namespace=application_namespace,
application_data=application_data)
if application_namespace is None:
self.assertEqual(
ApplicationNamespace(),
application_specific_information.application_namespace)
else:
self.assertEqual(
application_namespace,
application_specific_information.application_namespace)
if application_data is None:
self.assertEqual(
ApplicationData(),
application_specific_information.application_data)
else:
self.assertEqual(
application_data,
application_specific_information.application_data)
def test_init_with_none(self):
"""
Test that an ApplicationSpecificInformation object can be constructed
with no specified values.
"""
self._test_init(None, None)
def test_init_with_args(self):
"""
Test that an ApplicationSpecificInformation object can be constructed
with valid values.
"""
application_namespace = ApplicationNamespace("namespace")
application_data = ApplicationData("data")
self._test_init(application_namespace, application_data)
def test_validate_on_invalid_application_namespace(self):
"""
Test that a TypeError exception is raised when an invalid
ApplicationNamespace value is used to construct an
ApplicationSpecificInformation object.
"""
application_namespace = "invalid"
application_data = ApplicationData()
args = [application_namespace, application_data]
self.assertRaisesRegexp(
TypeError, "invalid application namespace",
ApplicationSpecificInformation, *args)
def test_validate_on_invalid_application_data(self):
"""
Test that a TypeError exception is raised when an invalid
ApplicationData value is used to construct an
ApplicationSpecificInformation object.
"""
application_namespace = ApplicationNamespace()
application_data = "invalid"
args = [application_namespace, application_data]
self.assertRaisesRegexp(
TypeError, "invalid application data",
ApplicationSpecificInformation, *args)
def _test_read(self, stream, application_namespace, application_data):
application_specific_information = ApplicationSpecificInformation()
application_specific_information.read(stream)
if application_namespace is None:
application_namespace = ApplicationNamespace()
if application_data is None:
application_data = ApplicationData()
msg = "application namespace encoding mismatch"
msg += "; expected {0}, observed {1}".format(
application_namespace,
application_specific_information.application_namespace)
self.assertEqual(
application_namespace,
application_specific_information.application_namespace, msg)
msg = "application data encoding mismatch"
msg += "; expected {0}, observed {1}".format(
application_data,
application_specific_information.application_data)
self.assertEqual(
application_data,
application_specific_information.application_data, msg)
def test_read_with_none(self):
"""
Test that an ApplicationSpecificInformation object with no data can be
read from a data stream.
"""
self._test_read(self.encoding_default, None, None)
def test_read_with_args(self):
"""
Test that an ApplicationSpecificInformation object with data can be
read from a data stream.
"""
application_namespace = ApplicationNamespace("ssl")
application_data = ApplicationData("www.example.com")
self._test_read(self.encoding, application_namespace, application_data)
def _test_write(self, stream_expected, application_namespace,
application_data):
stream_observed = BytearrayStream()
application_specific_information = ApplicationSpecificInformation(
application_namespace=application_namespace,
application_data=application_data)
application_specific_information.write(stream_observed)
length_expected = len(stream_expected)
length_observed = len(stream_observed)
msg = "encoding lengths not equal"
msg += "; expected {0}, observed {1}".format(
length_expected, length_observed)
self.assertEqual(length_expected, length_observed, msg)
msg = "encoding mismatch"
msg += ";\nexpected:\n{0}\nobserved:\n{1}".format(
stream_expected, stream_observed)
self.assertEqual(stream_expected, stream_observed, msg)
def test_write_with_none(self):
"""
Test that an ApplicationSpecificInformation object with no data can be
written to a data stream.
"""
self._test_write(self.encoding_default, None, None)
def test_write_with_args(self):
"""
Test that an ApplicationSpecificInformation object with data can be
written to a data stream.
"""
application_namespace = ApplicationNamespace("ssl")
application_data = ApplicationData("www.example.com")
self._test_write(self.encoding, application_namespace,
application_data)
def _test_create(self, application_namespace, application_data):
application_specific_info = ApplicationSpecificInformation.create(
application_namespace, application_data)
self.assertIsInstance(
application_specific_info, ApplicationSpecificInformation)
expected = ApplicationNamespace(application_namespace)
observed = application_specific_info.application_namespace
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
expected = ApplicationData(application_data)
observed = application_specific_info.application_data
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
def test_create_with_none(self):
"""
Test that an ApplicationSpecificInformation object with no data can be
created using the create class method.
"""
self._test_create(None, None)
def test_create_with_args(self):
"""
Test that an ApplicationSpecificInformation object with data can be
created using the create class method.
"""
application_namespace = "ssl"
application_data = "www.example.com"
self._test_create(application_namespace, application_data)

View File

@ -0,0 +1,188 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from testtools import TestCase
from kmip.core.attributes import ApplicationData
from kmip.core.attributes import ApplicationNamespace
from kmip.core.attributes import OperationPolicyName
from kmip.core.utils import BytearrayStream
class TestNameValue(TestCase):
def setUp(self):
super(TestNameValue, self).setUp()
self.stream = BytearrayStream()
def tearDown(self):
super(TestNameValue, self).tearDown()
def test_write_no_padding(self):
self.skip('Not implemented')
def test_write_with_padding(self):
self.skip('Not implemented')
def test_read_no_padding(self):
self.skip('Not implemented')
def test_read_with_padding(self):
self.skip('Not implemented')
class TestName(TestCase):
def setUp(self):
super(TestName, self).setUp()
self.stream = BytearrayStream()
def tearDown(self):
super(TestName, self).tearDown()
def test_minimum_write(self):
self.skip('Not implemented')
def test_maximum_write(self):
self.skip('Not implemented')
def test_minimum_read(self):
self.skip('Not implemented')
def test_maximum_read(self):
self.skip('Not implemented')
class TestOperationPolicyName(TestCase):
def setUp(self):
super(TestOperationPolicyName, self).setUp()
def tearDown(self):
super(TestOperationPolicyName, self).tearDown()
def _test_operation_policy_name(self, value):
opn = OperationPolicyName(value)
if value is None:
value = ''
msg = "expected {0}, received {1}".format(value, opn.value)
self.assertEqual(value, opn.value, msg)
def test_operation_policy_name(self):
self._test_operation_policy_name('test')
def test_operation_policy_name_on_none(self):
self._test_operation_policy_name(None)
class TestApplicationNamespace(TestCase):
"""
A test suite for the ApplicationNamespace class.
Since ApplicationNamespace is a simple wrapper for the TextString
primitive, only a few tests pertaining to construction are needed.
"""
def setUp(self):
super(TestApplicationNamespace, self).setUp()
def tearDown(self):
super(TestApplicationNamespace, self).tearDown()
def _test_init(self, value):
if (isinstance(value, str)) or (value is None):
application_namespace = ApplicationNamespace(value)
if value is None:
value = ''
msg = "expected {0}, observed {1}".format(
value, application_namespace.value)
self.assertEqual(value, application_namespace.value, msg)
else:
self.assertRaises(TypeError, ApplicationNamespace, value)
def test_init_with_none(self):
"""
Test that an ApplicationNamespace object can be constructed with no
specified value.
"""
self._test_init(None)
def test_init_with_valid(self):
"""
Test that an ApplicationNamespace object can be constructed with a
valid, string-type value.
"""
self._test_init("valid")
def test_init_with_invalid(self):
"""
Test that a TypeError exception is raised when a non-string value is
used to construct an ApplicationNamespace object.
"""
self._test_init(0)
class TestApplicationData(TestCase):
"""
A test suite for the ApplicationData class.
Since ApplicationData is a simple wrapper for the TextString primitive,
only a few tests pertaining to construction are needed.
"""
def setUp(self):
super(TestApplicationData, self).setUp()
def tearDown(self):
super(TestApplicationData, self).tearDown()
def _test_init(self, value):
if (isinstance(value, str)) or (value is None):
application_data = ApplicationData(value)
if value is None:
value = ''
msg = "expected {0}, observed {1}".format(
value, application_data.value)
self.assertEqual(value, application_data.value, msg)
else:
self.assertRaises(TypeError, ApplicationData, value)
def test_init_with_none(self):
"""
Test that an ApplicationData object can be constructed with no
specified value.
"""
self._test_init(None)
def test_init_with_valid(self):
"""
Test that an ApplicationData object can be constructed with a
valid, string-type value.
"""
self._test_init("valid")
def test_init_with_invalid(self):
"""
Test that a TypeError exception is raised when a non-string value is
used to construct an ApplicationData object.
"""
self._test_init(0)

View File

@ -31,6 +31,9 @@ class TestAttributeValueFactory(TestCase):
# TODO (peter-hamilton) Consider even further modularity
def _test_operation_policy_name(self, opn, value):
if value is None:
value = ''
msg = "expected {0}, received {1}".format(OperationPolicyName, opn)
self.assertIsInstance(opn, OperationPolicyName, msg)

View File

@ -20,6 +20,8 @@ from kmip.core.factories.secrets import SecretFactory
from kmip.core.factories.attributes import AttributeFactory
from kmip.core import attributes as attr
from kmip.core.attributes import ApplicationData
from kmip.core.attributes import ApplicationNamespace
from kmip.core.attributes import ApplicationSpecificInformation
from kmip.core.attributes import ContactInformation
from kmip.core.attributes import Name
@ -769,8 +771,8 @@ class TestRequestMessage(TestCase):
'Information')
ap_n_name = 'ssl'
ap_n_value = 'www.example.com'
ap_n = ApplicationSpecificInformation.ApplicationNamespace(ap_n_name)
ap_d = ApplicationSpecificInformation.ApplicationData(ap_n_value)
ap_n = ApplicationNamespace(ap_n_name)
ap_d = ApplicationData(ap_n_value)
value = ApplicationSpecificInformation(application_namespace=ap_n,
application_data=ap_d)
attribute = objects.Attribute(attribute_name=name,

View File

@ -1,85 +0,0 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from testtools import TestCase
from kmip.core.attributes import OperationPolicyName
from kmip.core.utils import BytearrayStream
class TestNameValue(TestCase):
def setUp(self):
super(TestNameValue, self).setUp()
self.stream = BytearrayStream()
def tearDown(self):
super(TestNameValue, self).tearDown()
def test_write_no_padding(self):
self.skip('Not implemented')
def test_write_with_padding(self):
self.skip('Not implemented')
def test_read_no_padding(self):
self.skip('Not implemented')
def test_read_with_padding(self):
self.skip('Not implemented')
class TestName(TestCase):
def setUp(self):
super(TestName, self).setUp()
self.stream = BytearrayStream()
def tearDown(self):
super(TestName, self).tearDown()
def test_minimum_write(self):
self.skip('Not implemented')
def test_maximum_write(self):
self.skip('Not implemented')
def test_minimum_read(self):
self.skip('Not implemented')
def test_maximum_read(self):
self.skip('Not implemented')
class TestOperationPolicyName(TestCase):
def setUp(self):
super(TestOperationPolicyName, self).setUp()
def tearDown(self):
super(TestOperationPolicyName, self).tearDown()
def _test_operation_policy_name(self, value):
opn = OperationPolicyName(value)
msg = "expected {0}, received {1}".format(value, opn.value)
self.assertEqual(value, opn.value, msg)
def test_operation_policy_name(self):
self._test_operation_policy_name('test')
def test_operation_policy_name_on_none(self):
self._test_operation_policy_name(None)

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from six import string_types
from testtools import TestCase
from kmip.core.enums import Tags
@ -111,14 +112,6 @@ class TestBase(TestCase):
# Check no exception thrown
base.read_length(self.stream)
def test_read_length_error(self):
self.stream.write(b'\x00\x00\x00\x00')
base = Base()
base.length = 4
self.assertRaises(errors.ReadValueError, base.read_length,
self.stream)
def test_read_length_underflow(self):
self.stream.write(b'\x00')
base = Base()
@ -985,13 +978,18 @@ class TestTextString(TestCase):
self.bad_value.format('value', value, ts.value))
def test_init_unset(self):
ts = TextString()
text_string = TextString()
self.assertIsInstance(ts.value, type(None),
self.bad_type.format('value', type(None),
type(ts.value)))
self.assertEqual(None, ts.value,
self.bad_value.format('value', None, ts.value))
expected = string_types
observed = text_string.value
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertIsInstance(observed, expected, msg)
expected = ''
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
def test_validate_on_valid(self):
ts = TextString()