Adding the "activate" opperation

This commit is contained in:
Tim Kelsey 2015-06-23 17:10:53 +01:00
parent 8516c52820
commit b1ac997d38
9 changed files with 429 additions and 4 deletions

View File

@ -12,6 +12,7 @@ The PyKMIP library provides a KMIP client supporting the following operations:
* Create * Create
* CreateKeyPair * CreateKeyPair
* Activate
* Destroy * Destroy
* DiscoverVersions * DiscoverVersions
* Get * Get

View File

@ -15,6 +15,7 @@
from kmip.core.factories.payloads import PayloadFactory from kmip.core.factories.payloads import PayloadFactory
from kmip.core.messages.payloads import activate
from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import destroy
@ -54,3 +55,6 @@ class RequestPayloadFactory(PayloadFactory):
def _create_discover_versions_payload(self): def _create_discover_versions_payload(self):
return discover_versions.DiscoverVersionsRequestPayload() return discover_versions.DiscoverVersionsRequestPayload()
def _create_activate_payload(self):
return activate.ActivateRequestPayload()

View File

@ -15,6 +15,7 @@
from kmip.core.factories.payloads import PayloadFactory from kmip.core.factories.payloads import PayloadFactory
from kmip.core.messages.payloads import activate
from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import destroy
@ -54,3 +55,6 @@ class ResponsePayloadFactory(PayloadFactory):
def _create_discover_versions_payload(self): def _create_discover_versions_payload(self):
return discover_versions.DiscoverVersionsResponsePayload() return discover_versions.DiscoverVersionsResponsePayload()
def _create_activate_payload(self):
return activate.ActivateResponsePayload()

View File

@ -0,0 +1,163 @@
# Copyright (c) 2015 Hewlett Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kmip.core import attributes
from kmip.core import enums
from kmip.core.primitives import Struct
from kmip.core.utils import BytearrayStream
class ActivateRequestPayload(Struct):
"""
A request payload for the Activate operation.
The payload contains a UUID of a cryptographic object that that server
should activate. See Section 4.19 of the KMIP 1.1 specification for more
information.
Attributes:
unique_identifier: The UUID of a managed cryptographic object
"""
def __init__(self,
unique_identifier=None):
"""
Construct a ActivateRequestPayload object.
Args:
unique_identifier (UniqueIdentifier): The UUID of a managed
cryptographic object.
"""
super(ActivateRequestPayload, self).__init__(
tag=enums.Tags.REQUEST_PAYLOAD)
self.unique_identifier = unique_identifier
self.validate()
def read(self, istream):
"""
Read the data encoding the ActivateRequestPayload object and decode it
into its constituent parts.
Args:
istream (Stream): A data stream containing encoded object data,
supporting a read method; usually a BytearrayStream object.
"""
super(ActivateRequestPayload, self).read(istream)
tstream = BytearrayStream(istream.read(self.length))
self.unique_identifier = attributes.UniqueIdentifier()
self.unique_identifier.read(tstream)
self.is_oversized(tstream)
self.validate()
def write(self, ostream):
"""
Write the data encoding the ActivateRequestPayload object to a stream.
Args:
ostream (Stream): A data stream in which to encode object data,
supporting a write method; usually a BytearrayStream object.
"""
tstream = BytearrayStream()
# Write the contents of the request payload
if self.unique_identifier is not None:
self.unique_identifier.write(tstream)
# Write the length and value of the request payload
self.length = tstream.length()
super(ActivateRequestPayload, self).write(ostream)
ostream.write(tstream.buffer)
def validate(self):
"""
Error check the attributes of the ActivateRequestPayload object.
"""
if self.unique_identifier is not None:
if not isinstance(self.unique_identifier,
attributes.UniqueIdentifier):
msg = "invalid unique identifier"
raise TypeError(msg)
class ActivateResponsePayload(Struct):
"""
A response payload for the Activate operation.
The payload contains the server response to the initial Activate request.
See Section 4.19 of the KMIP 1.1 specification for more information.
Attributes:
unique_identifier: The UUID of a managed cryptographic object.
"""
def __init__(self,
unique_identifier=None):
"""
Construct a ActivateResponsePayload object.
Args:
unique_identifier (UniqueIdentifier): The UUID of a managed
cryptographic object.
"""
super(ActivateResponsePayload, self).__init__(
tag=enums.Tags.RESPONSE_PAYLOAD)
if unique_identifier is None:
self.unique_identifier = attributes.UniqueIdentifier()
else:
self.unique_identifier = unique_identifier
self.validate()
def read(self, istream):
"""
Read the data encoding the ActivateResponsePayload object and decode it
into its constituent parts.
Args:
istream (Stream): A data stream containing encoded object data,
supporting a read method; usually a BytearrayStream object.
"""
super(ActivateResponsePayload, self).read(istream)
tstream = BytearrayStream(istream.read(self.length))
self.unique_identifier = attributes.UniqueIdentifier()
self.unique_identifier.read(tstream)
self.is_oversized(tstream)
self.validate()
def write(self, ostream):
"""
Write the data encoding the ActivateResponsePayload object to a stream.
Args:
ostream (Stream): A data stream in which to encode object data,
supporting a write method; usually a BytearrayStream object.
"""
tstream = BytearrayStream()
# Write the contents of the response payload
self.unique_identifier.write(tstream)
# Write the length and value of the request payload
self.length = tstream.length()
super(ActivateResponsePayload, self).write(ostream)
ostream.write(tstream.buffer)
def validate(self):
"""
Error check the attributes of the ActivateRequestPayload object.
"""
if not isinstance(self.unique_identifier, attributes.UniqueIdentifier):
msg = "invalid unique identifier"
raise TypeError(msg)

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from kmip.services.results import ActivateResult
from kmip.services.results import CreateResult from kmip.services.results import CreateResult
from kmip.services.results import CreateKeyPairResult from kmip.services.results import CreateKeyPairResult
from kmip.services.results import DestroyResult from kmip.services.results import DestroyResult
@ -42,6 +43,7 @@ from kmip.core.messages.contents import ProtocolVersion
from kmip.core.messages import messages from kmip.core.messages import messages
from kmip.core.messages.payloads import activate
from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import destroy
@ -239,6 +241,19 @@ class KMIPProxy(KMIP):
results = self._process_batch_items(response) results = self._process_batch_items(response)
return results[0] return results[0]
def activate(self, uuid=None, credential=None):
"""
Send an Activate request to the server.
Args:
uuid (string): The unique identifier of a managed cryptographic
object that should be activated.
credential (Credential): A Credential object containing
authentication information for the server. Optional, defaults
to None.
"""
return self._activate(uuid, credential=credential)
def get(self, uuid=None, key_format_type=None, key_compression_type=None, def get(self, uuid=None, key_format_type=None, key_compression_type=None,
key_wrapping_specification=None, credential=None): key_wrapping_specification=None, credential=None):
return self._get( return self._get(
@ -552,6 +567,37 @@ class KMIPProxy(KMIP):
payload_secret) payload_secret)
return result return result
def _activate(self, unique_identifier=None, credential=None):
operation = Operation(OperationEnum.ACTIVATE)
uuid = None
if unique_identifier is not None:
uuid = attr.UniqueIdentifier(unique_identifier)
payload = activate.ActivateRequestPayload(unique_identifier=uuid)
batch_item = messages.RequestBatchItem(operation=operation,
request_payload=payload)
message = self._build_request_message(credential, [batch_item])
self._send_message(message)
message = messages.ResponseMessage()
data = self._receive_message()
message.read(data)
batch_items = message.batch_items
batch_item = batch_items[0]
payload = batch_item.response_payload
if payload is None:
payload_unique_identifier = None
else:
payload_unique_identifier = payload.unique_identifier
result = ActivateResult(batch_item.result_status,
batch_item.result_reason,
batch_item.result_message,
payload_unique_identifier)
return result
def _destroy(self, def _destroy(self,
unique_identifier=None, unique_identifier=None,
credential=None): credential=None):

View File

@ -78,6 +78,22 @@ class CreateKeyPairResult(OperationResult):
self.public_key_template_attribute = public_key_template_attribute self.public_key_template_attribute = public_key_template_attribute
class ActivateResult(OperationResult):
def __init__(self,
result_status,
result_reason=None,
result_message=None,
uuid=None):
super(ActivateResult, self).__init__(
result_status, result_reason, result_message)
if uuid is not None:
self.uuid = uuid
else:
self.uuid = None
class RegisterResult(OperationResult): class RegisterResult(OperationResult):
def __init__(self, def __init__(self,

View File

@ -18,6 +18,7 @@ import testtools
from kmip.core.enums import Operation from kmip.core.enums import Operation
from kmip.core.factories.payloads.request import RequestPayloadFactory from kmip.core.factories.payloads.request import RequestPayloadFactory
from kmip.core.messages.payloads import activate
from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import destroy
@ -115,8 +116,8 @@ class TestRequestPayloadFactory(testtools.TestCase):
self.factory.create, Operation.GET_USAGE_ALLOCATION) self.factory.create, Operation.GET_USAGE_ALLOCATION)
def test_create_activate_payload(self): def test_create_activate_payload(self):
self._test_not_implemented( payload = self.factory.create(Operation.ACTIVATE)
self.factory.create, Operation.ACTIVATE) self._test_payload_type(payload, activate.ActivateRequestPayload)
def test_create_revoke_payload(self): def test_create_revoke_payload(self):
self._test_not_implemented( self._test_not_implemented(

View File

@ -18,6 +18,7 @@ import testtools
from kmip.core.enums import Operation from kmip.core.enums import Operation
from kmip.core.factories.payloads.response import ResponsePayloadFactory from kmip.core.factories.payloads.response import ResponsePayloadFactory
from kmip.core.messages.payloads import activate
from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import destroy
@ -115,8 +116,8 @@ class TestResponsePayloadFactory(testtools.TestCase):
self.factory.create, Operation.GET_USAGE_ALLOCATION) self.factory.create, Operation.GET_USAGE_ALLOCATION)
def test_create_activate_payload(self): def test_create_activate_payload(self):
self._test_not_implemented( payload = self.factory.create(Operation.ACTIVATE)
self.factory.create, Operation.ACTIVATE) self._test_payload_type(payload, activate.ActivateResponsePayload)
def test_create_revoke_payload(self): def test_create_revoke_payload(self):
self._test_not_implemented( self._test_not_implemented(

View File

@ -0,0 +1,189 @@
# Copyright (c) 2015 Hewlett Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from testtools import TestCase
from kmip.core import utils
from kmip.core import attributes
from kmip.core.messages.payloads import activate
class TestActivateRequestPayload(TestCase):
"""
Test suite for the ActivateRequestPayload class.
Test encodings obtained from Sections 4.2 of the KMIP 1.1 Test
Cases documentation.
"""
def setUp(self):
super(TestActivateRequestPayload, self).setUp()
self.uuid = attributes.UniqueIdentifier(
'668eff89-3010-4258-bc0e-8c402309c746')
self.encoding_a = utils.BytearrayStream((
b'\x42\x00\x79\x01\x00\x00\x00\x30\x42\x00\x94\x07\x00\x00\x00\x24'
b'\x36\x36\x38\x65\x66\x66\x38\x39\x2D\x33\x30\x31\x30\x2D\x34\x32'
b'\x35\x38\x2D\x62\x63\x30\x65\x2D\x38\x63\x34\x30\x32\x33\x30\x39'
b'\x63\x37\x34\x36\x00\x00\x00\x00'))
def tearDown(self):
super(TestActivateRequestPayload, self).tearDown()
def test_init_with_none(self):
"""
Test that a ActivateRequestPayload object can be constructed with no
specified value.
"""
activate.ActivateRequestPayload()
def test_init_with_args(self):
"""
Test that a ActivateRequestPayload object can be constructed with valid
values.
"""
activate.ActivateRequestPayload(unique_identifier=self.uuid)
def test_validate_with_bad_uuid_type(self):
"""
Test that a TypeError exception is raised when an invalid UUID type
is used to construct a ActivateRequestPayload object.
"""
self.assertRaisesRegexp(
TypeError, "invalid unique identifier",
activate.ActivateRequestPayload, "not-a-uuid")
def test_read_with_known_uuid(self):
"""
Test that a ActivateRequestPayload object with known UUID can be read
from a data stream.
"""
payload = activate.ActivateRequestPayload()
payload.read(self.encoding_a)
expected = '668eff89-3010-4258-bc0e-8c402309c746'
observed = payload.unique_identifier.value
msg = "activate UUID value mismatch"
msg += "; expected {0}, received {1}".format(
expected, observed)
self.assertEqual(expected, observed, msg)
def test_write_with_known_uuid(self):
"""
Test that a ActivateRequestPayload object with a known UUID can be
written to a data stream.
"""
stream = utils.BytearrayStream()
payload = activate.ActivateRequestPayload(self.uuid)
payload.write(stream)
length_expected = len(self.encoding_a)
length_received = len(stream)
msg = "encoding lengths not equal"
msg += "; expected {0}, received {1}".format(
length_expected, length_received)
self.assertEqual(length_expected, length_received, msg)
msg = "encoding mismatch"
msg += ";\nexpected:\n{0}\nreceived:\n{1}".format(self.encoding_a,
stream)
self.assertEqual(self.encoding_a, stream, msg)
class TestActivateResponsePayload(TestCase):
"""
Test encodings obtained from Sections 4.2 of the KMIP 1.1 Test
Cases documentation.
"""
def setUp(self):
super(TestActivateResponsePayload, self).setUp()
self.uuid = attributes.UniqueIdentifier(
'668eff89-3010-4258-bc0e-8c402309c746')
self.encoding_a = utils.BytearrayStream((
b'\x42\x00\x7C\x01\x00\x00\x00\x30\x42\x00\x94\x07\x00\x00\x00\x24'
b'\x36\x36\x38\x65\x66\x66\x38\x39\x2D\x33\x30\x31\x30\x2D\x34\x32'
b'\x35\x38\x2D\x62\x63\x30\x65\x2D\x38\x63\x34\x30\x32\x33\x30\x39'
b'\x63\x37\x34\x36\x00\x00\x00\x00'))
def tearDown(self):
super(TestActivateResponsePayload, self).tearDown()
def test_init_with_none(self):
"""
Test that a ActivateResponsePayload object can be constructed with no
specified value.
"""
activate.ActivateResponsePayload()
def test_init_with_args(self):
"""
Test that a ActivateResponsePayload object can be constructed with
valid values.
"""
activate.ActivateResponsePayload(unique_identifier=self.uuid)
def test_validate_with_invalid_uuid(self):
"""
Test that a TypeError exception is raised when an invalid Operations
list is used to construct a ActivateResponsePayload object.
"""
self.assertRaisesRegexp(
TypeError, "invalid unique identifier",
activate.ActivateResponsePayload, "not-a-uuid")
def test_read_with_known_uuid(self):
"""
Test that a ActivateResponsePayload object with known UUID can be read
from a data stream.
"""
payload = activate.ActivateResponsePayload()
payload.read(self.encoding_a)
expected = '668eff89-3010-4258-bc0e-8c402309c746'
observed = payload.unique_identifier.value
msg = "activate UUID value mismatch"
msg += "; expected {0}, received {1}".format(
expected, observed)
self.assertEqual(expected, observed, msg)
def test_write_with_known_uuid(self):
"""
Test that a ActivateResponsePayload object with a known UUID can be
written to a data stream.
"""
stream = utils.BytearrayStream()
payload = activate.ActivateResponsePayload(self.uuid)
payload.write(stream)
length_expected = len(self.encoding_a)
length_received = len(stream)
msg = "encoding lengths not equal"
msg += "; expected {0}, received {1}".format(
length_expected, length_received)
self.assertEqual(length_expected, length_received, msg)
msg = "encoding mismatch"
msg += ";\nexpected:\n{0}\nreceived:\n{1}".format(self.encoding_a,
stream)
self.assertEqual(self.encoding_a, stream, msg)