mirror of https://github.com/OpenKMIP/PyKMIP.git
Adding ProxyKmipClient support for the GetAttributeList operation
This change adds support for the GetAttributeList operation to the ProxyKmipClient. It updates the Pie client API and provides a demo showing how to use the operation. All relevant test suites are updated accordingly.
This commit is contained in:
parent
c2e6ffa048
commit
3970c0f211
|
@ -0,0 +1,53 @@
|
|||
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
from kmip.core import enums
|
||||
from kmip.demos import utils
|
||||
from kmip.pie import client
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Build and parse arguments
|
||||
parser = utils.build_cli_parser(enums.Operation.GET_ATTRIBUTE_LIST)
|
||||
opts, args = parser.parse_args(sys.argv[1:])
|
||||
|
||||
config = opts.config
|
||||
uid = opts.uuid
|
||||
|
||||
# Exit early if the UUID is not specified
|
||||
if uid is None:
|
||||
logging.debug('No ID provided, exiting early from demo')
|
||||
sys.exit()
|
||||
|
||||
# Build and setup logging and needed factories
|
||||
f_log = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir,
|
||||
'logconfig.ini')
|
||||
logging.config.fileConfig(f_log)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Build the client and connect to the server
|
||||
with client.ProxyKmipClient(config=config) as client:
|
||||
try:
|
||||
attribute_names = client.get_attribute_list(uid)
|
||||
logger.info("Successfully retrieved {0} attribute names:".format(
|
||||
len(attribute_names)))
|
||||
for attribute_name in attribute_names:
|
||||
logger.info("Attribute name: {0}".format(attribute_name))
|
||||
except Exception as e:
|
||||
logger.error(e)
|
|
@ -161,6 +161,15 @@ def build_cli_parser(operation=None):
|
|||
dest="format",
|
||||
help=("Format in which to retrieve the secret. Supported formats "
|
||||
"include: RAW, PKCS_1, PKCS_8, X_509"))
|
||||
elif operation is Operation.GET_ATTRIBUTE_LIST:
|
||||
parser.add_option(
|
||||
"-i",
|
||||
"--uuid",
|
||||
action="store",
|
||||
type="str",
|
||||
default=None,
|
||||
dest="uuid",
|
||||
help="UID of a managed object")
|
||||
elif operation is Operation.LOCATE:
|
||||
parser.add_option(
|
||||
"-n",
|
||||
|
|
|
@ -70,6 +70,17 @@ class KmipClient:
|
|||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_attribute_list(self, uid):
|
||||
"""
|
||||
Get a list of attribute names for a managed object on a KMIP appliance.
|
||||
|
||||
Args:
|
||||
uid (string): The unique ID of the managed object whose attribute
|
||||
names should be retrieved.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def destroy(self, uid):
|
||||
"""
|
||||
|
|
|
@ -309,6 +309,39 @@ class ProxyKmipClient(api.KmipClient):
|
|||
message = result.result_message.value
|
||||
raise exceptions.KmipOperationFailure(status, reason, message)
|
||||
|
||||
def get_attribute_list(self, uid=None):
|
||||
"""
|
||||
Get the names of the attributes associated with a managed object.
|
||||
|
||||
If the uid is not specified, the appliance will use the ID placeholder
|
||||
by default.
|
||||
|
||||
Args:
|
||||
uid (string): The unique ID of the managed object with which the
|
||||
retrieved attribute names should be associated. Optional,
|
||||
defaults to None.
|
||||
"""
|
||||
# Check input
|
||||
if uid is not None:
|
||||
if not isinstance(uid, six.string_types):
|
||||
raise TypeError("uid must be a string")
|
||||
|
||||
# Verify that operations can be given at this time
|
||||
if not self._is_open:
|
||||
raise exceptions.ClientConnectionNotOpen()
|
||||
|
||||
# Get the list of attribute names for a managed object.
|
||||
result = self.proxy.get_attribute_list(uid)
|
||||
|
||||
status = result.result_status.enum
|
||||
if status == enums.ResultStatus.SUCCESS:
|
||||
attribute_names = sorted(result.names)
|
||||
return attribute_names
|
||||
else:
|
||||
reason = result.result_reason.enum
|
||||
message = result.result_message.value
|
||||
raise exceptions.KmipOperationFailure(status, reason, message)
|
||||
|
||||
def destroy(self, uid):
|
||||
"""
|
||||
Destroy a managed object stored by a KMIP appliance.
|
||||
|
|
|
@ -38,6 +38,9 @@ class DummyKmipClient(api.KmipClient):
|
|||
def get(self, uid, *args, **kwargs):
|
||||
super(DummyKmipClient, self).get(uid)
|
||||
|
||||
def get_attribute_list(self, uid, *args, **kwargs):
|
||||
super(DummyKmipClient, self).get_attribute_list(uid)
|
||||
|
||||
def destroy(self, uid):
|
||||
super(DummyKmipClient, self).destroy(uid)
|
||||
|
||||
|
@ -90,6 +93,13 @@ class TestKmipClient(testtools.TestCase):
|
|||
dummy = DummyKmipClient()
|
||||
dummy.get('uid')
|
||||
|
||||
def test_get_attribute_list(self):
|
||||
"""
|
||||
Test that the get_attribute_list method can be called without error.
|
||||
"""
|
||||
dummy = DummyKmipClient()
|
||||
dummy.get_attribute_list('uid')
|
||||
|
||||
def test_destroy(self):
|
||||
"""
|
||||
Test that the destroy method can be called without error.
|
||||
|
|
|
@ -450,6 +450,89 @@ class TestProxyKmipClient(testtools.TestCase):
|
|||
self.assertRaisesRegexp(
|
||||
KmipOperationFailure, error_msg, client.get, *args)
|
||||
|
||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||
mock.MagicMock(spec_set=KMIPProxy))
|
||||
def test_get_attribute_list(self):
|
||||
"""
|
||||
Test that the attribute names of a managed object can be retrieved
|
||||
with proper input.
|
||||
"""
|
||||
uid = 'b4faee10-aa2a-4446-8ad4-0881f3422959'
|
||||
attribute_names = [
|
||||
'Cryptographic Length',
|
||||
'Cryptographic Algorithm',
|
||||
'State',
|
||||
'Digest',
|
||||
'Lease Time',
|
||||
'Initial Date',
|
||||
'Unique Identifier',
|
||||
'Name',
|
||||
'Cryptographic Usage Mask',
|
||||
'Object Type',
|
||||
'Contact Information',
|
||||
'Last Change Date']
|
||||
result = results.GetAttributeListResult(
|
||||
contents.ResultStatus(enums.ResultStatus.SUCCESS),
|
||||
uid=uid,
|
||||
names=attribute_names)
|
||||
|
||||
with ProxyKmipClient() as client:
|
||||
client.proxy.get_attribute_list.return_value = result
|
||||
|
||||
result = client.get_attribute_list(uid)
|
||||
client.proxy.get_attribute_list.assert_called_with(uid)
|
||||
self.assertIsInstance(result, list)
|
||||
self.assertItemsEqual(attribute_names, result)
|
||||
|
||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||
mock.MagicMock(spec_set=KMIPProxy))
|
||||
def test_get_attribute_list_on_invalid_uid(self):
|
||||
"""
|
||||
Test that a TypeError exception is raised when trying to retrieve the
|
||||
attribute names of a managed object with an invalid ID.
|
||||
"""
|
||||
args = [0]
|
||||
with ProxyKmipClient() as client:
|
||||
self.assertRaises(TypeError, client.get_attribute_list, *args)
|
||||
|
||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||
mock.MagicMock(spec_set=KMIPProxy))
|
||||
def test_get_attribute_list_on_closed(self):
|
||||
"""
|
||||
Test that a ClientConnectionNotOpen exception is raised when trying
|
||||
to retrieve the attribute names of a managed object on an unopened
|
||||
client connection.
|
||||
"""
|
||||
client = ProxyKmipClient()
|
||||
args = ['aaaaaaaa-1111-2222-3333-ffffffffffff']
|
||||
self.assertRaises(
|
||||
ClientConnectionNotOpen, client.get_attribute_list, *args)
|
||||
|
||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||
mock.MagicMock(spec_set=KMIPProxy))
|
||||
def test_get_attribute_list_on_operation_failure(self):
|
||||
"""
|
||||
Test that a KmipOperationFailure exception is raised when the
|
||||
backend fails to retrieve the attribute names of a managed object.
|
||||
"""
|
||||
status = enums.ResultStatus.OPERATION_FAILED
|
||||
reason = enums.ResultReason.GENERAL_FAILURE
|
||||
message = "Test failure message"
|
||||
|
||||
result = results.OperationResult(
|
||||
contents.ResultStatus(status),
|
||||
contents.ResultReason(reason),
|
||||
contents.ResultMessage(message))
|
||||
error_msg = str(KmipOperationFailure(status, reason, message))
|
||||
|
||||
client = ProxyKmipClient()
|
||||
client.open()
|
||||
client.proxy.get_attribute_list.return_value = result
|
||||
args = ['id']
|
||||
|
||||
self.assertRaisesRegexp(
|
||||
KmipOperationFailure, error_msg, client.get_attribute_list, *args)
|
||||
|
||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||
mock.MagicMock(spec_set=KMIPProxy))
|
||||
def test_destroy(self):
|
||||
|
|
Loading…
Reference in New Issue