Merge pull request #3 from wyllys66/master

Added support for LOCATE operation
This commit is contained in:
Peter Hamilton 2014-09-22 11:52:02 -04:00
commit d40e12905b
12 changed files with 604 additions and 11 deletions

View File

@ -25,6 +25,7 @@ from kmip.core.primitives import Enumeration
from kmip.core.primitives import TextString
from kmip.core.utils import BytearrayStream
from enum import Enum
# 3.1
@ -54,6 +55,7 @@ class Name(Struct):
super(self.__class__, self).__init__(tag=Tags.NAME)
self.name_value = name_value
self.name_type = name_type
self.validate()
def read(self, istream):
super(self.__class__, self).read(istream)
@ -83,13 +85,49 @@ class Name(Struct):
self.__validate()
def __validate(self):
# TODO (peter-hamilton) Finish implementation.
pass
name = self.__class__.__name__
msg = ErrorStrings.BAD_EXP_RECV
if self.name_value and \
not isinstance(self.name_value, Name.NameValue) and \
not isinstance(self.name_value, str):
member = 'name_value'
raise TypeError(msg.format('{}.{}'.format(name, member),
'name_value', type(Name.NameValue),
type(self.name_value)))
if self.name_type and \
not isinstance(self.name_type, Name.NameType) and \
not isinstance(self.name_type, str):
member = 'name_type'
raise TypeError(msg.format('{}.{}'.format(name, member),
'name_type', type(Name.NameType),
type(self.name_type)))
@classmethod
def create(cls, name_value, name_type):
value = cls.NameValue(name_value)
n_type = cls.NameType(name_type)
if isinstance(name_value, Name.NameValue):
value = name_value
elif isinstance(name_value, str):
value = cls.NameValue(name_value)
else:
name = 'Name'
msg = ErrorStrings.BAD_EXP_RECV
member = 'name_value'
raise TypeError(msg.format('{}.{}'.format(name, member),
'name_value', type(Name.NameValue),
type(name_value)))
if isinstance(name_type, Name.NameType):
n_type = name_type
elif isinstance(name_type, Enum):
n_type = cls.NameType(name_type)
else:
name = 'Name'
msg = ErrorStrings.BAD_EXP_RECV
member = 'name_type'
raise TypeError(msg.format('{}.{}'.format(name, member),
'name_type', type(Name.NameType),
type(name_type)))
return Name(name_value=value,
name_type=n_type)

View File

@ -559,3 +559,13 @@ class CryptographicUsageMask(Enum):
TRANSLATE_DECRYPT = 0x00020000
TRANSLATE_WRAP = 0x00040000
TRANSLATE_UNWRAP = 0x00080000
# 9.1.3.2.33
class ObjectGroupMember(Enum):
GROUP_MEMBER_FRESH = 0x00000001
GROUP_MEMBER_DEFAULT = 0x00000002
# 9.1.3.3.2
class StorageStatusMask(Enum):
ONLINE_STORAGE = 0x00000001
ARCHIVAL_STORAGE = 0x00000002

View File

@ -24,6 +24,7 @@ from kmip.core.attributes import CustomAttribute
from kmip.core.attributes import Name
from kmip.core.attributes import ObjectGroup
from kmip.core.attributes import UniqueIdentifier
from kmip.core.attributes import ObjectType
from kmip.core import utils
@ -125,15 +126,15 @@ class AttributeValueFactory(object):
def _create_name(self, name):
if name is not None:
name_value = name.get('name_value')
name_type = name.get('name_type')
name_value = name.name_value
name_type = name.name_type
return Name.create(name_value, name_type)
else:
return Name()
def _create_object_type(self, obj):
raise NotImplementedError()
return ObjectType()
def _create_cryptographic_algorithm(self, alg):
return CryptographicAlgorithm(alg)

View File

@ -21,9 +21,11 @@ from kmip.core.enums import Tags
from kmip.core.objects import KeyWrappingSpecification
from kmip.core.objects import TemplateAttribute
from kmip.core.objects import Attribute
from kmip.core.primitives import Struct
from kmip.core.primitives import Enumeration
from kmip.core.primitives import Integer
from kmip.core.utils import BytearrayStream
@ -429,11 +431,125 @@ class DestroyResponsePayload(Struct):
pass
class LocateRequestPayload(Struct):
# 9.1.3.2.33
class ObjectGroupMember(Enumeration):
ENUM_TYPE = enums.ObjectGroupMember
def __init__(self, value=None):
super(self.__class__, self).__init__(value,
Tags.OBJECT_GROUP_MEMBER)
class MaximumItems(Integer):
def __init__(self, value=None):
super(self.__class__, self).__init__(value,
Tags.MAXIMUM_ITEMS)
# 9.1.3.3.2
class StorageStatusMask(Enumeration):
ENUM_TYPE = enums.StorageStatusMask
def __init__(self, value=None):
super(self.__class__, self).__init__(value,
Tags.STORAGE_STATUS_MASK)
def __init__(self, maximum_items=None, storage_status_mask=None,
object_group_member=None, attributes=None):
super(self.__class__, self).__init__(enums.Tags.REQUEST_PAYLOAD)
self.maximum_items = maximum_items
self.storage_status_mask = storage_status_mask
self.object_group_member = object_group_member
self.attributes = attributes or []
self.validate()
def read(self, istream):
super(self.__class__, self).read(istream)
tstream = BytearrayStream(istream.read(self.length))
if self.is_tag_next(Tags.MAXIMUM_ITEMS, tstream):
self.maximum_items = LocateRequestPayload.MaximumItems()
self.maximum_items.read()
if self.is_tag_next(Tags.STORAGE_STATUS_MASK, tstream):
self.storage_status_mask = LocateRequestPayload.StorageStatusMask()
self.storage_status_mask.read()
if self.is_tag_next(Tags.OBJECT_GROUP_MEMBER, tstream):
self.object_group_member = LocateRequestPayload.ObjectGroupMember()
self.object_group_member.read(tstream)
while self.is_tag_next(Tags.ATTRIBUTE, tstream):
attr = Attribute()
attr.read(tstream)
self.attributes.append(attr)
self.validate()
def write(self, ostream):
tstream = BytearrayStream()
if self.maximum_items is not None:
self.maximum_items.write(tstream)
if self.storage_status_mask is not None:
self.storage_status_mask.write(tstream)
if self.attributes is not None:
for a in self.attributes:
a.write(tstream)
# Write the length and value of the request payload
self.length = tstream.length()
super(self.__class__, self).write(ostream)
ostream.write(tstream.buffer)
def validate(self):
self._validate()
def _validate(self):
# TODO Finish implementation.
pass
class LocateResponsePayload(Struct):
def __init__(self, unique_identifiers=[]):
super(self.__class__, self).__init__(enums.Tags.RESPONSE_PAYLOAD)
self.unique_identifiers = unique_identifiers or []
self.validate()
def read(self, istream):
super(self.__class__, self).read(istream)
tstream = BytearrayStream(istream.read(self.length))
while self.is_tag_next(Tags.UNIQUE_IDENTIFIER, tstream):
ui = attributes.UniqueIdentifier()
ui.read(tstream)
self.unique_identifiers.append(ui)
self.is_oversized(tstream)
self.validate()
def write(self, ostream):
tstream = BytearrayStream()
for ui in self.unique_identifiers:
ui.write(tstream)
# Write the length and value of the request payload
self.length = tstream.length()
super(self.__class__, self).write(ostream)
ostream.write(tstream.buffer)
def validate(self):
self.__validate()
def __validate(self):
# TODO Finish implementation.
pass
REQUEST_MAP = {enums.Operation.CREATE: CreateRequestPayload,
enums.Operation.GET: GetRequestPayload,
enums.Operation.DESTROY: DestroyRequestPayload,
enums.Operation.REGISTER: RegisterRequestPayload}
enums.Operation.REGISTER: RegisterRequestPayload,
enums.Operation.LOCATE: LocateRequestPayload}
RESPONSE_MAP = {enums.Operation.CREATE: CreateResponsePayload,
enums.Operation.GET: GetResponsePayload,
enums.Operation.REGISTER: RegisterResponsePayload,
enums.Operation.DESTROY: DestroyResponsePayload,
enums.Operation.REGISTER: RegisterResponsePayload}
enums.Operation.LOCATE: LocateResponsePayload}

View File

@ -45,3 +45,7 @@ class MemRepo(ManagedObjectRepo):
return False
del self.repo[uuid]
return True
def locate(self, maximum_items, storage_status_mask,
object_group_member, attributes):
raise NotImplementedError

View File

@ -47,6 +47,7 @@ from kmip.services.results import DestroyResult
from kmip.services.results import GetResult
from kmip.services.results import OperationResult
from kmip.services.results import RegisterResult
from kmip.services.results import LocateResult
class KMIP(object):
@ -68,6 +69,11 @@ class KMIP(object):
def destroy(self, uuid, credential=None):
raise NotImplementedError
def locate(self, maximum_items=None, storate_status_mask=None,
object_group_member=None, attributes=None,
credential=None):
raise NotImplementedError
class KMIPImpl(KMIP):
@ -248,6 +254,23 @@ class KMIPImpl(KMIP):
ret_value = RS.SUCCESS
return DestroyResult(ResultStatus(ret_value), uuid=uuid)
def locate(self, maximum_items=None, storage_status_mask=None,
object_group_member=None, attributes=None,
credential=None):
self.logger.debug('locate() called')
msg = 'locating object(s) from repo'
self.logger.debug(msg)
try:
uuids = self.repo.locate(maximum_items, storage_status_mask,
object_group_member, attributes)
return LocateResult(ResultStatus(RS.SUCCESS), uuids=uuids)
except NotImplementedError:
msg = ResultMessage('Locate Operation Not Supported')
reason = ResultReason(ResultReasonEnum.OPERATION_NOT_SUPPORTED)
return LocateResult(ResultStatus(RS.OPERATION_FAILED),
result_reason=reason,
result_message=msg)
def _validate_req_field(self, attrs, name, expected, msg, required=True):
self.logger.debug('Validating attribute %s' % name)
seen = False

80
kmip/demos/locate.py Normal file
View File

@ -0,0 +1,80 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kmip.core.enums import AttributeType
from kmip.core.enums import CredentialType
from kmip.core.enums import ObjectType
from kmip.core.enums import ResultStatus
from kmip.core.enums import CryptographicAlgorithm
from kmip.core.enums import CryptographicUsageMask
from kmip.core.enums import NameType
from kmip.core.attributes import Name
from kmip.core.factories.attributes import AttributeFactory
from kmip.core.factories.credentials import CredentialFactory
from kmip.core.objects import TemplateAttribute, Attribute
from kmip.services.kmip_client import KMIPProxy
import logging
import os
if __name__ == '__main__':
f_log = os.path.join(os.path.dirname(__file__), '..', 'logconfig.ini')
logging.config.fileConfig(f_log)
logger = logging.getLogger(__name__)
attribute_factory = AttributeFactory()
credential_factory = CredentialFactory()
credential_type = CredentialType.USERNAME_AND_PASSWORD
credential_value = {'Username': 'Peter', 'Password': 'abc123'}
credential = credential_factory.create_credential(credential_type,
credential_value)
client = KMIPProxy()
client.open()
object_type = ObjectType.SYMMETRIC_KEY
attribute_type = AttributeType.CRYPTOGRAPHIC_ALGORITHM
algorithm = attribute_factory.create_attribute(attribute_type,
CryptographicAlgorithm.AES)
mask_flags = [CryptographicUsageMask.ENCRYPT,
CryptographicUsageMask.DECRYPT]
attribute_type = AttributeType.CRYPTOGRAPHIC_USAGE_MASK
usage_mask = attribute_factory.create_attribute(attribute_type,
mask_flags)
name = Attribute.AttributeName('Name')
name_value = Name.NameValue('FOOBAR')
name_type = Name.NameType(NameType.UNINTERPRETED_TEXT_STRING)
value = Name(name_value=name_value, name_type=name_type)
nameattr = Attribute(attribute_name=name, attribute_value=value)
attributes = [algorithm, usage_mask, nameattr]
template_attribute = TemplateAttribute(attributes=attributes)
result = client.create(object_type, template_attribute,
credential)
attrs = [nameattr]
result = client.locate(attributes=attrs, credential=credential)
client.close()
logger.debug('locate() result status: {}'.
format(result.result_status.enum))
if result.result_status.enum == ResultStatus.SUCCESS:
logger.debug('retrieved object type: {}'.
format(result.object_type.enum))
logger.debug('Located UUIDs: {}'.format(','.join([u.value for u in
result.uuids])))

View File

@ -17,6 +17,7 @@ from kmip.services.results import CreateResult
from kmip.services.results import GetResult
from kmip.services.results import DestroyResult
from kmip.services.results import RegisterResult
from kmip.services.results import LocateResult
from kmip.core import attributes as attr
@ -93,6 +94,13 @@ class KMIPProxy(KMIP):
secret=secret,
credential=credential)
def locate(self, maximum_items=None, storage_status_mask=None,
object_group_member=None, attributes=None, credential=None):
return self._locate(maximum_items=maximum_items,
storage_status_mask=storage_status_mask,
object_group_member=object_group_member,
attributes=attributes, credential=credential)
def _create(self,
object_type=None,
template_attribute=None,
@ -263,6 +271,53 @@ class KMIPProxy(KMIP):
payload_template_attribute)
return result
def _locate(self, maximum_items=None, storage_status_mask=None,
object_group_member=None, attributes=[], credential=None):
operation = Operation(OperationEnum.LOCATE)
mxi = None
ssmask = None
objgrp = None
if maximum_items is not None:
mxi = operations.LocateRequestPayload.MaximumItems(maximum_items)
if storage_status_mask is not None:
m = storage_status_mask
ssmask = operations.LocateRequestPayload.StorageStatusMask(m)
if object_group_member is not None:
o = object_group_member
objgrp = operations.LocateRequestPayload.ObjectGroupMember(o)
payload = operations.LocateRequestPayload(maximum_items=mxi,
storage_status_mask=ssmask,
object_group_member=objgrp,
attributes=attributes)
batch_item = messages.RequestBatchItem(operation=operation,
request_payload=payload)
message = self._build_request_message(credential, [batch_item])
self._send_message(message)
message = messages.ResponseMessage()
data = self._receive_message()
message.read(data)
batch_items = message.batch_items
batch_item = batch_items[0]
payload = batch_item.response_payload
if payload is None:
uuids = None
else:
uuids = payload.unique_identifiers
result = LocateResult(batch_item.result_status,
batch_item.result_reason,
batch_item.result_message,
uuids)
return result
def _build_request_message(self, credential, batch_items):
protocol_version = ProtocolVersion.create(1, 1)

22
kmip/services/processor.py Executable file → Normal file
View File

@ -31,6 +31,7 @@ from kmip.core.messages.operations import CreateResponsePayload
from kmip.core.messages.operations import GetResponsePayload
from kmip.core.messages.operations import DestroyResponsePayload
from kmip.core.messages.operations import RegisterResponsePayload
from kmip.core.messages.operations import LocateResponsePayload
from kmip.core.enums import Operation
from kmip.core.enums import ResultStatus as RS
@ -168,6 +169,8 @@ class Processor(object):
return self._process_destroy_request(payload)
elif op is Operation.REGISTER:
return self._process_register_request(payload)
elif op is Operation.LOCATE:
return self._process_locate_request(payload)
else:
raise NotImplementedError()
@ -252,3 +255,22 @@ class Processor(object):
template_attribute=template_attr)
return (result_status, result_reason, result_message, resp_pl)
def _process_locate_request(self, payload):
max_items = payload.maximum_items
storage_mask = payload.status_storage_mask
objgrp_member = payload.object_group_member
attributes = payload.attributes
result = self._handler.locate(max_items, storage_mask,
objgrp_member, attributes)
result_status = result.result_status
result_reason = result.result_reason
result_message = result.result_message
uuids = result.uuids
resp_pl = LocateResponsePayload(unique_identifiers=uuids)
return (result_status, result_reason, result_message, resp_pl)

View File

@ -125,3 +125,16 @@ class DestroyResult(OperationResult):
self.uuid = uuid
else:
self.uuid = None
class LocateResult(OperationResult):
def __init__(self,
result_status,
result_reason=None,
result_message=None,
uuids=None):
super(self.__class__, self).__init__(result_status,
result_reason,
result_message)
self.uuids = uuids

View File

@ -47,6 +47,7 @@ from kmip.core.messages.operations import DestroyRequestPayload
from kmip.core.messages.operations import RegisterRequestPayload
from kmip.core.messages.operations import DestroyResponsePayload
from kmip.core.messages.operations import RegisterResponsePayload
from kmip.core.messages.operations import LocateResponsePayload
from kmip.core.primitives import TextString
@ -136,7 +137,24 @@ class TestRequestMessage(TestCase):
b'\x42\x00\x79\x01\x00\x00\x00\x30\x42\x00\x94\x07\x00\x00\x00\x24'
b'\x66\x62\x34\x62\x35\x62\x39\x63\x2D\x36\x31\x38\x38\x2D\x34\x63'
b'\x36\x33\x2D\x38\x31\x34\x32\x2D\x66\x65\x39\x63\x33\x32\x38\x31'
b'\x32\x39\x66\x63\x00\x00\x00\x00')
b'\x32\x39\x66\x63\x00\x00\x00\x00'
)
# kmip-testcases-v1.1 section 3.1.3
self.locate = (
b'\x42\x00\x78\x01\x00\x00\x00\xd0\x42\x00\x77\x01\x00\x00\x00\x38'
b'\x42\x00\x69\x01\x00\x00\x00\x20\x42\x00\x6a\x02\x00\x00\x00\x04'
b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x6b\x02\x00\x00\x00\x04'
b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x0d\x02\x00\x00\x00\x04'
b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x0f\x01\x00\x00\x00\x88'
b'\x42\x00\x5c\x05\x00\x00\x00\x04\x00\x00\x00\x08\x00\x00\x00\x00'
b'\x42\x00\x79\x01\x00\x00\x00\x70\x42\x00\x08\x01\x00\x00\x00\x28'
b'\x42\x00\x0a\x07\x00\x00\x00\x0b\x4f\x62\x6a\x65\x63\x74\x20\x54'
b'\x79\x70\x65\x00\x00\x00\x00\x00\x42\x00\x0b\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x02\x00\x00\x00\x00\x42\x00\x08\x01\x00\x00\x00\x38'
b'\x42\x00\x0a\x07\x00\x00\x00\x04\x4e\x61\x6d\x65\x00\x00\x00\x00'
b'\x42\x00\x0b\x01\x00\x00\x00\x20\x42\x00\x55\x07\x00\x00\x00\x04'
b'\x4b\x65\x79\x31\x00\x00\x00\x00\x42\x00\x54\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x01\x00\x00\x00\x00')
def tearDown(self):
super(TestRequestMessage, self).tearDown()
@ -802,6 +820,149 @@ class TestRequestMessage(TestCase):
msg = "Bad request message write: encoding mismatch"
self.assertEqual(self.register, result, msg)
def test_locate_request_read(self):
self.stream = BytearrayStream(self.locate)
request_message = messages.RequestMessage()
request_message.read(self.stream)
request_header = request_message.request_header
msg = "Bad request header type: expected {0}, received{0}"
self.assertIsInstance(request_header, messages.RequestHeader,
msg.format(messages.RequestHeader,
type(request_header)))
protocol_version = request_header.protocol_version
msg = "Bad protocol version type: expected {0}, received {1}"
self.assertIsInstance(protocol_version, contents.ProtocolVersion,
msg.format(contents.ProtocolVersion,
type(protocol_version)))
protocol_version_major = protocol_version.protocol_version_major
msg = "Bad protocol version major type: expected {0}, received {1}"
exp_type = contents.ProtocolVersion.ProtocolVersionMajor
rcv_type = type(protocol_version_major)
self.assertIsInstance(protocol_version_major, exp_type,
msg.format(exp_type, rcv_type))
msg = "Bad protocol version major value: expected {0}, received {1}"
self.assertEqual(1, protocol_version_major.value,
msg.format(1, protocol_version_major.value))
protocol_version_minor = protocol_version.protocol_version_minor
msg = "Bad protocol version minor type: expected {0}, received {1}"
exp_type = contents.ProtocolVersion.ProtocolVersionMinor
rcv_type = type(protocol_version_minor)
self.assertIsInstance(protocol_version_minor, exp_type,
msg.format(exp_type, rcv_type))
msg = "Bad protocol version minor value: expected {0}, received {1}"
self.assertEqual(1, protocol_version_minor.value,
msg.format(1, protocol_version_minor.value))
batch_count = request_header.batch_count
msg = "Bad batch count type: expected {0}, received {1}"
self.assertIsInstance(batch_count, contents.BatchCount,
msg.format(contents.BatchCount,
type(batch_count)))
msg = "Bad batch count value: expected {0}, received {1}"
self.assertEqual(1, batch_count.value,
msg.format(1, batch_count.value))
batch_items = request_message.batch_items
msg = "Bad batch items type: expected {0}, received {1}"
self.assertEquals(1, len(batch_items),
self.msg.format('batch items', 'length',
1, len(batch_items)))
batch_item = batch_items[0]
msg = "Bad batch item type: expected {0}, received {1}"
self.assertIsInstance(batch_item, messages.RequestBatchItem,
msg.format(messages.RequestBatchItem,
type(batch_item)))
operation = batch_item.operation
msg = "Bad operation type: expected {0}, received {1}"
self.assertIsInstance(operation, contents.Operation,
msg.format(contents.Operation,
type(operation)))
msg = "Bad operation value: expected {0}, received {1}"
exp_value = enums.Operation.LOCATE
rcv_value = operation.enum
self.assertEqual(exp_value, rcv_value,
msg.format(exp_value, rcv_value))
request_payload = batch_item.request_payload
msg = "Bad request payload type: expected {0}, received {1}"
exp_type = operations.LocateRequestPayload
rcv_type = type(request_payload)
self.assertIsInstance(request_payload, exp_type,
msg.format(exp_type, rcv_type))
attributes = request_payload.attributes
msg = "Bad attributes type: expected {0}, received {1}"
exp_type = list
rcv_type = type(attributes)
self.assertIsInstance(attributes, exp_type,
msg.format(exp_type, rcv_type))
self.assertEqual(2, len(attributes),
self.msg.format('attribute', 'length',
2, len(attributes)))
attribute_a = attributes[0]
self.assertIsInstance(attribute_a, objects.Attribute,
self.msg.format('attribute', 'type',
objects.Attribute,
type(attribute_a)))
attribute_name = attribute_a.attribute_name
self.assertIsInstance(attribute_name, objects.Attribute.AttributeName,
self.msg.format('attribute name', 'type',
objects.Attribute.AttributeName,
type(attribute_name)))
self.assertEquals('Object Type', attribute_name.value,
self.msg.format('attribute name', 'value',
'Object Type',
attribute_name.value))
attribute_value = attribute_a.attribute_value
exp_type = attr.Enumeration
rcv_type = type(attribute_value)
self.assertIsInstance(attribute_value, exp_type,
self.msg.format('attribute value', 'type',
exp_type, rcv_type))
self.assertEquals(attribute_value.enum, enums.ObjectType.SYMMETRIC_KEY,
self.msg.format('ObjectType', 'value',
enums.ObjectType.SYMMETRIC_KEY,
attribute_value.enum))
attribute_b = attributes[1]
self.assertIsInstance(attribute_b, objects.Attribute,
self.msg.format('attribute', 'type',
objects.Attribute,
type(attribute_a)))
attribute_name = attribute_b.attribute_name
self.assertIsInstance(attribute_name, objects.Attribute.AttributeName,
self.msg.format('attribute name', 'type',
objects.Attribute.AttributeName,
type(attribute_name)))
self.assertEquals('Name', attribute_name.value,
self.msg.format('attribute name', 'value',
'Name',
attribute_name.value))
attribute_value = attribute_b.attribute_value
exp_type = Name
rcv_type = type(attribute_value)
self.assertIsInstance(attribute_value, exp_type,
self.msg.format('attribute value', 'type',
exp_type, rcv_type))
self.assertEquals('Key1', attribute_value.name_value.value,
self.msg.format('name value', 'value',
'Key1',
attribute_value.name_value.value))
class TestResponseMessage(TestCase):
@ -872,6 +1033,20 @@ class TestResponseMessage(TestCase):
b'\x66\x62\x34\x62\x35\x62\x39\x63\x2D\x36\x31\x38\x38\x2D\x34\x63'
b'\x36\x33\x2D\x38\x31\x34\x32\x2D\x66\x65\x39\x63\x33\x32\x38\x31'
b'\x32\x39\x66\x63\x00\x00\x00\x00')
# kmip-testcases-v1.1 section 3.1.3
self.locate = (
b'\x42\x00\x7b\x01\x00\x00\x00\xb0\x42\x00\x7a\x01\x00\x00\x00\x48'
b'\x42\x00\x69\x01\x00\x00\x00\x20\x42\x00\x6a\x02\x00\x00\x00\x04'
b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x6b\x02\x00\x00\x00\x04'
b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x92\x09\x00\x00\x00\x08'
b'\x00\x00\x00\x00\x4f\x9a\x54\xe6\x42\x00\x0d\x02\x00\x00\x00\x04'
b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x0f\x01\x00\x00\x00\x58'
b'\x42\x00\x5c\x05\x00\x00\x00\x04\x00\x00\x00\x08\x00\x00\x00\x00'
b'\x42\x00\x7f\x05\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x42\x00\x7c\x01\x00\x00\x00\x30\x42\x00\x94\x07\x00\x00\x00\x24'
b'\x34\x39\x61\x31\x63\x61\x38\x38\x2d\x36\x62\x65\x61\x2d\x34\x66'
b'\x62\x32\x2d\x62\x34\x35\x30\x2d\x37\x65\x35\x38\x38\x30\x32\x63'
b'\x33\x30\x33\x38\x00\x00\x00\x00')
def tearDown(self):
super(TestResponseMessage, self).tearDown()
@ -1537,3 +1712,38 @@ class TestResponseMessage(TestCase):
msg = "Bad response message write: encoding mismatch"
self.assertEqual(self.register, result, msg)
def test_locate_response_write(self):
prot_ver = contents.ProtocolVersion.create(1, 1)
# Fri Apr 27 10:12:22 CEST 2012
time_stamp = contents.TimeStamp(0x4f9a54e6)
batch_count = contents.BatchCount(1)
resp_hdr = messages.ResponseHeader(protocol_version=prot_ver,
time_stamp=time_stamp,
batch_count=batch_count)
operation = contents.Operation(enums.Operation.LOCATE)
result_status = contents.ResultStatus(enums.ResultStatus.SUCCESS)
uuid = attr.UniqueIdentifier('49a1ca88-6bea-4fb2-b450-7e58802c3038')
resp_pl = LocateResponsePayload(unique_identifiers=[uuid])
batch_item = messages.ResponseBatchItem(operation=operation,
result_status=result_status,
response_payload=resp_pl)
response_message = messages.ResponseMessage(response_header=resp_hdr,
batch_items=[batch_item])
response_message.write(self.stream)
result = self.stream.read()
len_exp = len(self.locate)
len_rcv = len(result)
self.assertEqual(len_exp, len_rcv,
self.msg.format('response message', 'write',
len_exp, len_rcv))
msg = "Bad response message write: encoding mismatch"
self.assertEqual(self.locate, result, msg)

View File

@ -20,6 +20,7 @@ from kmip.core.attributes import CryptographicLength
from kmip.core.attributes import CryptographicUsageMask
from kmip.core.attributes import UniqueIdentifier
from kmip.core.attributes import ObjectType
from kmip.core.attributes import Name
from kmip.core.enums import AttributeType
from kmip.core.enums import CryptographicAlgorithm as CryptoAlgorithmEnum
from kmip.core.enums import CryptographicUsageMask as CryptoUsageMaskEnum
@ -28,6 +29,7 @@ from kmip.core.enums import KeyFormatType as KeyFormatTypeEnum
from kmip.core.enums import ObjectType as ObjectTypeEnum
from kmip.core.enums import ResultReason
from kmip.core.enums import ResultStatus
from kmip.core.enums import NameType
from kmip.core.factories.attributes import AttributeFactory
from kmip.core.keys import RawKey
from kmip.core.messages.contents import KeyCompressionType
@ -483,7 +485,11 @@ class TestKMIPServer(TestCase):
CryptoUsageMaskEnum.DECRYPT]
usage_mask = attr_factory.create_attribute(attribute_type,
mask_flags)
return [algorithm, usage_mask, length]
name_value = Name.NameValue(value='TESTNAME')
name_type = Name.NameType(value=NameType.UNINTERPRETED_TEXT_STRING)
value = Name.create(name_value, name_type)
nameattr = attr_factory.create_attribute(AttributeType.NAME, value)
return [algorithm, usage_mask, length, nameattr]
def _get_alg_attr(self, alg=None):
if alg is None:
@ -506,3 +512,18 @@ class TestKMIPServer(TestCase):
return attribute.attribute_value.value ==\
attr_expected.attribute_value.value
return False
def test_locate(self):
self._create()
name_value = Name.NameValue(value='TESTNAME')
name_type = Name.NameType(value=NameType.UNINTERPRETED_TEXT_STRING)
value = Name.create(name_value, name_type)
attr_factory = AttributeFactory()
nameattr = attr_factory.create_attribute(AttributeType.NAME, value)
attrs = [nameattr]
res = self.kmip.locate(attributes=attrs)
self.assertEqual(ResultStatus.OPERATION_FAILED, res.result_status.enum,
'locate result status did not return success')