mirror of https://github.com/OpenKMIP/PyKMIP.git
Update the Credential objects
This change updates the implementation of the Credential objects. The UsernamePassword and Device credentials are now first-class objects and, along with the base Credential, have been restructured to match the current struct style. Comprehensive unit test suites for each class have been added. Additionally, the credential factory code and its usage in the KMIPProxy class and associated test suites have been updated to reflect this change.
This commit is contained in:
parent
08f22b424c
commit
86b23a9d53
|
@ -13,43 +13,41 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from kmip.core.enums import CredentialType
|
from kmip.core import enums
|
||||||
|
from kmip.core import objects
|
||||||
from kmip.core.objects import Credential
|
|
||||||
|
|
||||||
|
|
||||||
class CredentialFactory(object):
|
class CredentialFactory(object):
|
||||||
def __init__(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def _create_credential(self, credential_type, credential_value):
|
def create_credential(self, credential_type, credential_value):
|
||||||
credential_type = Credential.CredentialType(credential_type)
|
|
||||||
return Credential(credential_type=credential_type,
|
|
||||||
credential_value=credential_value)
|
|
||||||
|
|
||||||
def create_credential(self, cred_type, value):
|
|
||||||
# Switch on the type of the credential
|
# Switch on the type of the credential
|
||||||
if cred_type is CredentialType.USERNAME_AND_PASSWORD:
|
if credential_type is enums.CredentialType.USERNAME_AND_PASSWORD:
|
||||||
value = self._create_username_password_credential(value)
|
credential_value = self.create_username_password_credential(
|
||||||
elif cred_type is CredentialType.DEVICE:
|
credential_value
|
||||||
value = self._create_device_credential(value)
|
)
|
||||||
|
elif credential_type is enums.CredentialType.DEVICE:
|
||||||
|
credential_value = self.create_device_credential(credential_value)
|
||||||
else:
|
else:
|
||||||
msg = 'Unrecognized credential type: {0}'
|
msg = 'Unrecognized credential type: {0}'
|
||||||
raise ValueError(msg.format(cred_type))
|
raise ValueError(msg.format(credential_type))
|
||||||
|
|
||||||
return self._create_credential(cred_type, value)
|
return objects.Credential(
|
||||||
|
credential_type=credential_type,
|
||||||
|
credential_value=credential_value
|
||||||
|
)
|
||||||
|
|
||||||
def _create_username_password_credential(self, value):
|
@staticmethod
|
||||||
|
def create_username_password_credential(value):
|
||||||
username = value.get('Username')
|
username = value.get('Username')
|
||||||
password = value.get('Password')
|
password = value.get('Password')
|
||||||
|
|
||||||
username = Credential.UsernamePasswordCredential.Username(username)
|
return objects.UsernamePasswordCredential(
|
||||||
password = Credential.UsernamePasswordCredential.Password(password)
|
username=username,
|
||||||
|
password=password
|
||||||
|
)
|
||||||
|
|
||||||
return Credential.UsernamePasswordCredential(username=username,
|
@staticmethod
|
||||||
password=password)
|
def create_device_credential(value):
|
||||||
|
|
||||||
def _create_device_credential(self, value):
|
|
||||||
dsn = value.get('Device Serial Number')
|
dsn = value.get('Device Serial Number')
|
||||||
password = value.get('Password')
|
password = value.get('Password')
|
||||||
dev_id = value.get('Device Identifier')
|
dev_id = value.get('Device Identifier')
|
||||||
|
@ -57,16 +55,11 @@ class CredentialFactory(object):
|
||||||
mach_id = value.get('Machine Identifier')
|
mach_id = value.get('Machine Identifier')
|
||||||
med_id = value.get('Media Identifier')
|
med_id = value.get('Media Identifier')
|
||||||
|
|
||||||
dsn = Credential.DeviceCredential.DeviceSerialNumber(dsn)
|
return objects.DeviceCredential(
|
||||||
password = Credential.DeviceCredential.Password(password)
|
device_serial_number=dsn,
|
||||||
dev_id = Credential.DeviceCredential.DeviceIdentifier(dev_id)
|
password=password,
|
||||||
net_id = Credential.DeviceCredential.NetworkIdentifier(net_id)
|
device_identifier=dev_id,
|
||||||
mach_id = Credential.DeviceCredential.MachineIdentifier(mach_id)
|
network_identifier=net_id,
|
||||||
med_id = Credential.DeviceCredential.MediaIdentifier(med_id)
|
machine_identifier=mach_id,
|
||||||
|
media_identifier=med_id
|
||||||
return Credential.DeviceCredential(device_serial_number=dsn,
|
)
|
||||||
password=password,
|
|
||||||
device_identifier=dev_id,
|
|
||||||
network_identifier=net_id,
|
|
||||||
machine_identifier=mach_id,
|
|
||||||
media_identifier=med_id)
|
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import abc
|
||||||
import six
|
import six
|
||||||
from six.moves import xrange
|
from six.moves import xrange
|
||||||
|
|
||||||
|
@ -25,7 +26,6 @@ from kmip.core import enums
|
||||||
from kmip.core.enums import AttributeType
|
from kmip.core.enums import AttributeType
|
||||||
from kmip.core.enums import Tags
|
from kmip.core.enums import Tags
|
||||||
from kmip.core.enums import Types
|
from kmip.core.enums import Types
|
||||||
from kmip.core.enums import CredentialType
|
|
||||||
from kmip.core.enums import RevocationReasonCode as RevocationReasonCodeEnum
|
from kmip.core.enums import RevocationReasonCode as RevocationReasonCodeEnum
|
||||||
from kmip.core import exceptions
|
from kmip.core import exceptions
|
||||||
|
|
||||||
|
@ -172,224 +172,646 @@ class Attribute(Struct):
|
||||||
return NotImplemented
|
return NotImplemented
|
||||||
|
|
||||||
|
|
||||||
# 2.1.2
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
class Credential(Struct):
|
class CredentialValue(primitives.Struct):
|
||||||
|
"""
|
||||||
|
An empty, abstract base class to be used by Credential objects to easily
|
||||||
|
group and type-check credential values.
|
||||||
|
"""
|
||||||
|
|
||||||
class CredentialType(Enumeration):
|
|
||||||
|
|
||||||
def __init__(self, value=None):
|
class UsernamePasswordCredential(CredentialValue):
|
||||||
super(Credential.CredentialType, self).__init__(
|
"""
|
||||||
CredentialType, value, Tags.CREDENTIAL_TYPE)
|
A struct representing a UsernamePasswordCredential object.
|
||||||
|
|
||||||
class UsernamePasswordCredential(Struct):
|
Attributes:
|
||||||
|
username: The username identifying the credential.
|
||||||
|
password: The password associated with the username.
|
||||||
|
"""
|
||||||
|
|
||||||
class Username(TextString):
|
def __init__(self, username=None, password=None):
|
||||||
def __init__(self, value=None):
|
"""
|
||||||
super(Credential.UsernamePasswordCredential.Username,
|
Construct a UsernamePasswordCredential struct.
|
||||||
self).__init__(
|
|
||||||
value, Tags.USERNAME)
|
|
||||||
|
|
||||||
class Password(TextString):
|
Args:
|
||||||
def __init__(self, value=None):
|
username (string): The username identifying the credential.
|
||||||
super(Credential.UsernamePasswordCredential.Password,
|
Optional, defaults to None. Required for encoding and decoding.
|
||||||
self).__init__(
|
password (string): The password associated with the username.
|
||||||
value, Tags.PASSWORD)
|
Optional, defaults to None.
|
||||||
|
"""
|
||||||
|
super(UsernamePasswordCredential, self).__init__(
|
||||||
|
tag=Tags.CREDENTIAL_VALUE
|
||||||
|
)
|
||||||
|
|
||||||
def __init__(self, username=None, password=None):
|
self._username = None
|
||||||
super(Credential.UsernamePasswordCredential, self).__init__(
|
self._password = None
|
||||||
tag=Tags.CREDENTIAL_VALUE)
|
|
||||||
self.username = username
|
|
||||||
self.password = password
|
|
||||||
self.validate()
|
|
||||||
|
|
||||||
def read(self, istream):
|
self.username = username
|
||||||
super(Credential.UsernamePasswordCredential, self).read(istream)
|
self.password = password
|
||||||
tstream = BytearrayStream(istream.read(self.length))
|
|
||||||
|
|
||||||
# Read the username of the credential
|
@property
|
||||||
self.username = self.Username()
|
def username(self):
|
||||||
self.username.read(tstream)
|
if self._username:
|
||||||
|
return self._username.value
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
# Read the password if it is next
|
@username.setter
|
||||||
if self.is_tag_next(Tags.PASSWORD, tstream):
|
def username(self, value):
|
||||||
self.password = self.Password()
|
if value is None:
|
||||||
self.password.read(tstream)
|
self._username = None
|
||||||
|
elif isinstance(value, six.string_types):
|
||||||
|
self._username = primitives.TextString(
|
||||||
|
value=value,
|
||||||
|
tag=enums.Tags.USERNAME
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise TypeError("Username must be a string.")
|
||||||
|
|
||||||
self.is_oversized(tstream)
|
@property
|
||||||
self.validate()
|
def password(self):
|
||||||
|
if self._password:
|
||||||
|
return self._password.value
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
def write(self, ostream):
|
@password.setter
|
||||||
tstream = BytearrayStream()
|
def password(self, value):
|
||||||
|
if value is None:
|
||||||
|
self._password = None
|
||||||
|
elif isinstance(value, six.string_types):
|
||||||
|
self._password = primitives.TextString(
|
||||||
|
value=value,
|
||||||
|
tag=enums.Tags.PASSWORD
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise TypeError("Password must be a string.")
|
||||||
|
|
||||||
self.username.write(tstream)
|
def read(self, input_stream):
|
||||||
if self.password is not None:
|
"""
|
||||||
self.password.write(tstream)
|
Read the data encoding the UsernamePasswordCredential struct and
|
||||||
|
decode it into its constituent parts.
|
||||||
|
|
||||||
# Write the length and value of the credential
|
Args:
|
||||||
self.length = tstream.length()
|
input_stream (stream): A data stream containing encoded object
|
||||||
super(Credential.UsernamePasswordCredential, self).write(ostream)
|
data, supporting a read method; usually a BytearrayStream
|
||||||
ostream.write(tstream.buffer)
|
object.
|
||||||
|
|
||||||
def validate(self):
|
Raises:
|
||||||
pass
|
ValueError: Raised if the username is missing from the encoding.
|
||||||
|
"""
|
||||||
|
super(UsernamePasswordCredential, self).read(input_stream)
|
||||||
|
local_stream = BytearrayStream(input_stream.read(self.length))
|
||||||
|
|
||||||
class DeviceCredential(Struct):
|
if self.is_tag_next(enums.Tags.USERNAME, local_stream):
|
||||||
|
self._username = primitives.TextString(
|
||||||
|
tag=enums.Tags.USERNAME
|
||||||
|
)
|
||||||
|
self._username.read(local_stream)
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
"Username/password credential encoding missing the username."
|
||||||
|
)
|
||||||
|
|
||||||
class DeviceSerialNumber(TextString):
|
if self.is_tag_next(enums.Tags.PASSWORD, local_stream):
|
||||||
|
self._password = primitives.TextString(
|
||||||
|
tag=enums.Tags.PASSWORD
|
||||||
|
)
|
||||||
|
self._password.read(local_stream)
|
||||||
|
|
||||||
def __init__(self, value=None):
|
self.is_oversized(local_stream)
|
||||||
super(Credential.DeviceCredential.DeviceSerialNumber, self).\
|
|
||||||
__init__(value, Tags.DEVICE_SERIAL_NUMBER)
|
|
||||||
|
|
||||||
class Password(TextString):
|
def write(self, output_stream):
|
||||||
|
"""
|
||||||
|
Write the data encoding the UsernamePasswordCredential struct to a
|
||||||
|
stream.
|
||||||
|
|
||||||
def __init__(self, value=None):
|
Args:
|
||||||
super(Credential.DeviceCredential.Password, self).\
|
output_stream (stream): A data stream in which to encode object
|
||||||
__init__(value, Tags.PASSWORD)
|
data, supporting a write method; usually a BytearrayStream
|
||||||
|
object.
|
||||||
|
|
||||||
class DeviceIdentifier(TextString):
|
Raises:
|
||||||
|
ValueError: Raised if the username is not defined.
|
||||||
|
"""
|
||||||
|
local_stream = BytearrayStream()
|
||||||
|
|
||||||
def __init__(self, value=None):
|
if self._username:
|
||||||
super(Credential.DeviceCredential.DeviceIdentifier, self).\
|
self._username.write(local_stream)
|
||||||
__init__(value, Tags.DEVICE_IDENTIFIER)
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
"Username/password credential struct missing the username."
|
||||||
|
)
|
||||||
|
|
||||||
class NetworkIdentifier(TextString):
|
if self._password:
|
||||||
|
self._password.write(local_stream)
|
||||||
|
|
||||||
def __init__(self, value=None):
|
self.length = local_stream.length()
|
||||||
super(Credential.DeviceCredential.NetworkIdentifier, self).\
|
super(UsernamePasswordCredential, self).write(output_stream)
|
||||||
__init__(value, Tags.NETWORK_IDENTIFIER)
|
output_stream.write(local_stream.buffer)
|
||||||
|
|
||||||
class MachineIdentifier(TextString):
|
def __eq__(self, other):
|
||||||
|
if isinstance(other, UsernamePasswordCredential):
|
||||||
|
if self.username != other.username:
|
||||||
|
return False
|
||||||
|
elif self.password != other.password:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return NotImplemented
|
||||||
|
|
||||||
def __init__(self, value=None):
|
def __ne__(self, other):
|
||||||
super(Credential.DeviceCredential.MachineIdentifier, self).\
|
if isinstance(other, UsernamePasswordCredential):
|
||||||
__init__(value, Tags.MACHINE_IDENTIFIER)
|
return not (self == other)
|
||||||
|
else:
|
||||||
|
return NotImplemented
|
||||||
|
|
||||||
class MediaIdentifier(TextString):
|
def __repr__(self):
|
||||||
|
args = ", ".join([
|
||||||
|
"username='{}'".format(self.username),
|
||||||
|
"password='{}'".format(self.password)
|
||||||
|
])
|
||||||
|
return "UsernamePasswordCredential({})".format(args)
|
||||||
|
|
||||||
def __init__(self, value=None):
|
def __str__(self):
|
||||||
super(Credential.DeviceCredential.MediaIdentifier, self).\
|
return str({
|
||||||
__init__(value, Tags.MEDIA_IDENTIFIER)
|
"username": self.username,
|
||||||
|
"password": self.password
|
||||||
|
})
|
||||||
|
|
||||||
def __init__(self,
|
|
||||||
device_serial_number=None,
|
|
||||||
password=None,
|
|
||||||
device_identifier=None,
|
|
||||||
network_identifier=None,
|
|
||||||
machine_identifier=None,
|
|
||||||
media_identifier=None):
|
|
||||||
super(Credential.DeviceCredential, self).__init__(
|
|
||||||
tag=Tags.CREDENTIAL_VALUE)
|
|
||||||
self.device_serial_number = device_serial_number
|
|
||||||
self.password = password
|
|
||||||
self.device_identifier = device_identifier
|
|
||||||
self.network_identifier = network_identifier
|
|
||||||
self.machine_identifier = machine_identifier
|
|
||||||
self.media_identifier = media_identifier
|
|
||||||
|
|
||||||
def read(self, istream):
|
class DeviceCredential(CredentialValue):
|
||||||
super(Credential.DeviceCredential, self).read(istream)
|
"""
|
||||||
tstream = BytearrayStream(istream.read(self.length))
|
A struct representing a DeviceCredential object.
|
||||||
|
|
||||||
# Read the password if it is next
|
Attributes:
|
||||||
if self.is_tag_next(Tags.DEVICE_SERIAL_NUMBER, tstream):
|
device_serial_number: The device serial number for the credential.
|
||||||
self.device_serial_number = self.DeviceSerialNumber()
|
password: The password associated with the credential.
|
||||||
self.device_serial_number.read(tstream)
|
device_identifier: The device identifier for the credential.
|
||||||
|
network_identifier: The network identifier for the credential.
|
||||||
|
machine_identifier: The machine identifier for the credential.
|
||||||
|
media_identifier: The media identifier for the credential.
|
||||||
|
"""
|
||||||
|
|
||||||
# Read the password if it is next
|
def __init__(self,
|
||||||
if self.is_tag_next(Tags.PASSWORD, tstream):
|
device_serial_number=None,
|
||||||
self.password = self.Password()
|
password=None,
|
||||||
self.password.read(tstream)
|
device_identifier=None,
|
||||||
|
network_identifier=None,
|
||||||
|
machine_identifier=None,
|
||||||
|
media_identifier=None):
|
||||||
|
"""
|
||||||
|
Construct a DeviceCredential struct.
|
||||||
|
|
||||||
# Read the password if it is next
|
Args:
|
||||||
if self.is_tag_next(Tags.DEVICE_IDENTIFIER, tstream):
|
device_serial_number (string): The device serial number for the
|
||||||
self.device_identifier = self.DeviceIdentifier()
|
credential. Optional, defaults to None.
|
||||||
self.device_identifier.read(tstream)
|
password (string): The password associated with the credential.
|
||||||
|
Optional, defaults to None.
|
||||||
|
device_identifier (string): The device identifier for the
|
||||||
|
credential. Optional, defaults to None.
|
||||||
|
network_identifier (string): The network identifier for the
|
||||||
|
credential. Optional, defaults to None.
|
||||||
|
machine_identifier (string): The machine identifier for the
|
||||||
|
credential. Optional, defaults to None.
|
||||||
|
media_identifier (string): The media identifier for the
|
||||||
|
credential. Optional, defaults to None.
|
||||||
|
"""
|
||||||
|
super(DeviceCredential, self).__init__(tag=Tags.CREDENTIAL_VALUE)
|
||||||
|
|
||||||
# Read the password if it is next
|
self._device_serial_number = None
|
||||||
if self.is_tag_next(Tags.NETWORK_IDENTIFIER, tstream):
|
self._password = None
|
||||||
self.network_identifier = self.NetworkIdentifier()
|
self._device_identifier = None
|
||||||
self.network_identifier.read(tstream)
|
self._network_identifier = None
|
||||||
|
self._machine_identifier = None
|
||||||
|
self._media_identifier = None
|
||||||
|
|
||||||
# Read the password if it is next
|
self.device_serial_number = device_serial_number
|
||||||
if self.is_tag_next(Tags.MACHINE_IDENTIFIER, tstream):
|
self.password = password
|
||||||
self.machine_identifier = self.MachineIdentifier()
|
self.device_identifier = device_identifier
|
||||||
self.machine_identifier.read(tstream)
|
self.network_identifier = network_identifier
|
||||||
|
self.machine_identifier = machine_identifier
|
||||||
|
self.media_identifier = media_identifier
|
||||||
|
|
||||||
# Read the password if it is next
|
@property
|
||||||
if self.is_tag_next(Tags.MEDIA_IDENTIFIER, tstream):
|
def device_serial_number(self):
|
||||||
self.media_identifier = self.MediaIdentifier()
|
if self._device_serial_number:
|
||||||
self.media_identifier.read(tstream)
|
return self._device_serial_number.value
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
self.is_oversized(tstream)
|
@device_serial_number.setter
|
||||||
self.validate()
|
def device_serial_number(self, value):
|
||||||
|
if value is None:
|
||||||
|
self._device_serial_number = None
|
||||||
|
elif isinstance(value, six.string_types):
|
||||||
|
self._device_serial_number = primitives.TextString(
|
||||||
|
value=value,
|
||||||
|
tag=enums.Tags.DEVICE_SERIAL_NUMBER
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise TypeError("Device serial number must be a string.")
|
||||||
|
|
||||||
def write(self, ostream):
|
@property
|
||||||
tstream = BytearrayStream()
|
def password(self):
|
||||||
|
if self._password:
|
||||||
|
return self._password.value
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
if self.device_serial_number is not None:
|
@password.setter
|
||||||
self.device_serial_number.write(tstream)
|
def password(self, value):
|
||||||
if self.password is not None:
|
if value is None:
|
||||||
self.password.write(tstream)
|
self._password = None
|
||||||
if self.device_identifier is not None:
|
elif isinstance(value, six.string_types):
|
||||||
self.device_identifier.write(tstream)
|
self._password = primitives.TextString(
|
||||||
if self.network_identifier is not None:
|
value=value,
|
||||||
self.network_identifier.write(tstream)
|
tag=enums.Tags.PASSWORD
|
||||||
if self.machine_identifier is not None:
|
)
|
||||||
self.machine_identifier.write(tstream)
|
else:
|
||||||
if self.media_identifier is not None:
|
raise TypeError("Password must be a string.")
|
||||||
self.media_identifier.write(tstream)
|
|
||||||
|
|
||||||
# Write the length and value of the credential
|
@property
|
||||||
self.length = tstream.length()
|
def device_identifier(self):
|
||||||
super(Credential.DeviceCredential, self).write(ostream)
|
if self._device_identifier:
|
||||||
ostream.write(tstream.buffer)
|
return self._device_identifier.value
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
def validate(self):
|
@device_identifier.setter
|
||||||
pass
|
def device_identifier(self, value):
|
||||||
|
if value is None:
|
||||||
|
self._device_identifier = None
|
||||||
|
elif isinstance(value, six.string_types):
|
||||||
|
self._device_identifier = primitives.TextString(
|
||||||
|
value=value,
|
||||||
|
tag=enums.Tags.DEVICE_IDENTIFIER
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise TypeError("Device identifier must be a string.")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def network_identifier(self):
|
||||||
|
if self._network_identifier:
|
||||||
|
return self._network_identifier.value
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
@network_identifier.setter
|
||||||
|
def network_identifier(self, value):
|
||||||
|
if value is None:
|
||||||
|
self._network_identifier = None
|
||||||
|
elif isinstance(value, six.string_types):
|
||||||
|
self._network_identifier = primitives.TextString(
|
||||||
|
value=value,
|
||||||
|
tag=enums.Tags.NETWORK_IDENTIFIER
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise TypeError("Network identifier must be a string.")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def machine_identifier(self):
|
||||||
|
if self._machine_identifier:
|
||||||
|
return self._machine_identifier.value
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
@machine_identifier.setter
|
||||||
|
def machine_identifier(self, value):
|
||||||
|
if value is None:
|
||||||
|
self._machine_identifier = None
|
||||||
|
elif isinstance(value, six.string_types):
|
||||||
|
self._machine_identifier = primitives.TextString(
|
||||||
|
value=value,
|
||||||
|
tag=enums.Tags.MACHINE_IDENTIFIER
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise TypeError("Machine identifier must be a string.")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def media_identifier(self):
|
||||||
|
if self._media_identifier:
|
||||||
|
return self._media_identifier.value
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
@media_identifier.setter
|
||||||
|
def media_identifier(self, value):
|
||||||
|
if value is None:
|
||||||
|
self._media_identifier = None
|
||||||
|
elif isinstance(value, six.string_types):
|
||||||
|
self._media_identifier = primitives.TextString(
|
||||||
|
value=value,
|
||||||
|
tag=enums.Tags.MEDIA_IDENTIFIER
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise TypeError("Media identifier must be a string.")
|
||||||
|
|
||||||
|
def read(self, input_stream):
|
||||||
|
"""
|
||||||
|
Read the data encoding the DeviceCredential struct and decode it into
|
||||||
|
its constituent parts.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
input_stream (stream): A data stream containing encoded object
|
||||||
|
data, supporting a read method; usually a BytearrayStream
|
||||||
|
object..
|
||||||
|
"""
|
||||||
|
super(DeviceCredential, self).read(input_stream)
|
||||||
|
local_stream = BytearrayStream(input_stream.read(self.length))
|
||||||
|
|
||||||
|
if self.is_tag_next(enums.Tags.DEVICE_SERIAL_NUMBER, local_stream):
|
||||||
|
self._device_serial_number = primitives.TextString(
|
||||||
|
tag=enums.Tags.DEVICE_SERIAL_NUMBER
|
||||||
|
)
|
||||||
|
self._device_serial_number.read(local_stream)
|
||||||
|
|
||||||
|
if self.is_tag_next(enums.Tags.PASSWORD, local_stream):
|
||||||
|
self._password = primitives.TextString(
|
||||||
|
tag=enums.Tags.PASSWORD
|
||||||
|
)
|
||||||
|
self._password.read(local_stream)
|
||||||
|
|
||||||
|
if self.is_tag_next(enums.Tags.DEVICE_IDENTIFIER, local_stream):
|
||||||
|
self._device_identifier = primitives.TextString(
|
||||||
|
tag=enums.Tags.DEVICE_IDENTIFIER
|
||||||
|
)
|
||||||
|
self._device_identifier.read(local_stream)
|
||||||
|
|
||||||
|
if self.is_tag_next(enums.Tags.NETWORK_IDENTIFIER, local_stream):
|
||||||
|
self._network_identifier = primitives.TextString(
|
||||||
|
tag=enums.Tags.NETWORK_IDENTIFIER
|
||||||
|
)
|
||||||
|
self._network_identifier.read(local_stream)
|
||||||
|
|
||||||
|
if self.is_tag_next(enums.Tags.MACHINE_IDENTIFIER, local_stream):
|
||||||
|
self._machine_identifier = primitives.TextString(
|
||||||
|
tag=enums.Tags.MACHINE_IDENTIFIER
|
||||||
|
)
|
||||||
|
self._machine_identifier.read(local_stream)
|
||||||
|
|
||||||
|
if self.is_tag_next(enums.Tags.MEDIA_IDENTIFIER, local_stream):
|
||||||
|
self._media_identifier = primitives.TextString(
|
||||||
|
tag=enums.Tags.MEDIA_IDENTIFIER
|
||||||
|
)
|
||||||
|
self._media_identifier.read(local_stream)
|
||||||
|
|
||||||
|
self.is_oversized(local_stream)
|
||||||
|
|
||||||
|
def write(self, output_stream):
|
||||||
|
"""
|
||||||
|
Write the data encoding the DeviceCredential struct to a stream.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
output_stream (stream): A data stream in which to encode object
|
||||||
|
data, supporting a write method; usually a BytearrayStream
|
||||||
|
object.
|
||||||
|
"""
|
||||||
|
local_stream = BytearrayStream()
|
||||||
|
|
||||||
|
if self._device_serial_number is not None:
|
||||||
|
self._device_serial_number.write(local_stream)
|
||||||
|
if self._password is not None:
|
||||||
|
self._password.write(local_stream)
|
||||||
|
if self._device_identifier is not None:
|
||||||
|
self._device_identifier.write(local_stream)
|
||||||
|
if self._network_identifier is not None:
|
||||||
|
self._network_identifier.write(local_stream)
|
||||||
|
if self._machine_identifier is not None:
|
||||||
|
self._machine_identifier.write(local_stream)
|
||||||
|
if self._media_identifier is not None:
|
||||||
|
self._media_identifier.write(local_stream)
|
||||||
|
|
||||||
|
self.length = local_stream.length()
|
||||||
|
super(DeviceCredential, self).write(output_stream)
|
||||||
|
output_stream.write(local_stream.buffer)
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
if isinstance(other, DeviceCredential):
|
||||||
|
if self.device_serial_number != other.device_serial_number:
|
||||||
|
return False
|
||||||
|
elif self.password != other.password:
|
||||||
|
return False
|
||||||
|
elif self.device_identifier != other.device_identifier:
|
||||||
|
return False
|
||||||
|
elif self.network_identifier != other.network_identifier:
|
||||||
|
return False
|
||||||
|
elif self.machine_identifier != other.machine_identifier:
|
||||||
|
return False
|
||||||
|
elif self.media_identifier != other.media_identifier:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return NotImplemented
|
||||||
|
|
||||||
|
def __ne__(self, other):
|
||||||
|
if isinstance(other, DeviceCredential):
|
||||||
|
return not (self == other)
|
||||||
|
else:
|
||||||
|
return NotImplemented
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
args = ", ".join([
|
||||||
|
"device_serial_number='{}'".format(self.device_serial_number),
|
||||||
|
"password='{}'".format(self.password),
|
||||||
|
"device_identifier='{}'".format(self.device_identifier),
|
||||||
|
"network_identifier='{}'".format(self.network_identifier),
|
||||||
|
"machine_identifier='{}'".format(self.machine_identifier),
|
||||||
|
"media_identifier='{}'".format(self.media_identifier),
|
||||||
|
])
|
||||||
|
return "DeviceCredential({})".format(args)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return str({
|
||||||
|
"device_serial_number": self.device_serial_number,
|
||||||
|
"password": self.password,
|
||||||
|
"device_identifier": self.device_identifier,
|
||||||
|
"network_identifier": self.network_identifier,
|
||||||
|
"machine_identifier": self.machine_identifier,
|
||||||
|
"media_identifier": self.media_identifier
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
class Credential(primitives.Struct):
|
||||||
|
"""
|
||||||
|
A struct representing a Credential object.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
credential_type: The credential type, a CredentialType enumeration.
|
||||||
|
credential_value: The credential value, a CredentialValue instance.
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, credential_type=None, credential_value=None):
|
def __init__(self, credential_type=None, credential_value=None):
|
||||||
|
"""
|
||||||
|
Construct a Credential struct.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
credential_type (CredentialType): An enumeration value that
|
||||||
|
specifies the type of the credential struct. Optional,
|
||||||
|
defaults to None. Required for encoding and decoding.
|
||||||
|
credential_value (CredentialValue): The credential value
|
||||||
|
corresponding to the credential type. Optional, defaults to
|
||||||
|
None. Required for encoding and decoding.
|
||||||
|
"""
|
||||||
super(Credential, self).__init__(tag=Tags.CREDENTIAL)
|
super(Credential, self).__init__(tag=Tags.CREDENTIAL)
|
||||||
|
|
||||||
|
self._credential_type = None
|
||||||
|
self._credential_value = None
|
||||||
|
|
||||||
self.credential_type = credential_type
|
self.credential_type = credential_type
|
||||||
self.credential_value = credential_value
|
self.credential_value = credential_value
|
||||||
|
|
||||||
def read(self, istream):
|
@property
|
||||||
super(Credential, self).read(istream)
|
def credential_type(self):
|
||||||
tstream = BytearrayStream(istream.read(self.length))
|
if self._credential_type:
|
||||||
|
return self._credential_type.value
|
||||||
# Read the type of the credential
|
|
||||||
self.credential_type = self.CredentialType()
|
|
||||||
self.credential_type.read(tstream)
|
|
||||||
|
|
||||||
# Use the type to determine what credential value to read
|
|
||||||
if self.credential_type.value is CredentialType.USERNAME_AND_PASSWORD:
|
|
||||||
self.credential_value = self.UsernamePasswordCredential()
|
|
||||||
elif self.credential_type.value is CredentialType.DEVICE:
|
|
||||||
self.credential_value = self.DeviceCredential()
|
|
||||||
else:
|
else:
|
||||||
# TODO (peter-hamilton) Use more descriptive error here
|
return None
|
||||||
raise NotImplementedError()
|
|
||||||
self.credential_value.read(tstream)
|
|
||||||
|
|
||||||
self.is_oversized(tstream)
|
@credential_type.setter
|
||||||
self.validate()
|
def credential_type(self, value):
|
||||||
|
if value is None:
|
||||||
|
self._credential_type = None
|
||||||
|
elif isinstance(value, enums.CredentialType):
|
||||||
|
self._credential_type = Enumeration(
|
||||||
|
enums.CredentialType,
|
||||||
|
value=value,
|
||||||
|
tag=Tags.CREDENTIAL_TYPE
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise TypeError(
|
||||||
|
"Credential type must be a CredentialType enumeration."
|
||||||
|
)
|
||||||
|
|
||||||
def write(self, ostream):
|
@property
|
||||||
tstream = BytearrayStream()
|
def credential_value(self):
|
||||||
|
return self._credential_value
|
||||||
|
|
||||||
self.credential_type.write(tstream)
|
@credential_value.setter
|
||||||
self.credential_value.write(tstream)
|
def credential_value(self, value):
|
||||||
|
if value is None:
|
||||||
|
self._credential_value = None
|
||||||
|
elif isinstance(value, CredentialValue):
|
||||||
|
self._credential_value = value
|
||||||
|
else:
|
||||||
|
raise TypeError(
|
||||||
|
"Credential value must be a CredentialValue struct."
|
||||||
|
)
|
||||||
|
|
||||||
# Write the length and value of the credential
|
def read(self, input_stream):
|
||||||
self.length = tstream.length()
|
"""
|
||||||
super(Credential, self).write(ostream)
|
Read the data encoding the Credential struct and decode it into its
|
||||||
ostream.write(tstream.buffer)
|
constituent parts.
|
||||||
|
|
||||||
def validate(self):
|
Args:
|
||||||
pass
|
input_stream (stream): A data stream containing encoded object
|
||||||
|
data, supporting a read method; usually a BytearrayStream
|
||||||
|
object.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: Raised if either the credential type or value are
|
||||||
|
missing from the encoding.
|
||||||
|
"""
|
||||||
|
super(Credential, self).read(input_stream)
|
||||||
|
local_stream = BytearrayStream(input_stream.read(self.length))
|
||||||
|
|
||||||
|
if self.is_tag_next(enums.Tags.CREDENTIAL_TYPE, local_stream):
|
||||||
|
self._credential_type = primitives.Enumeration(
|
||||||
|
enum=enums.CredentialType,
|
||||||
|
tag=enums.Tags.CREDENTIAL_TYPE
|
||||||
|
)
|
||||||
|
self._credential_type.read(local_stream)
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
"Credential encoding missing the credential type."
|
||||||
|
)
|
||||||
|
|
||||||
|
if self.is_tag_next(enums.Tags.CREDENTIAL_VALUE, local_stream):
|
||||||
|
if self.credential_type == \
|
||||||
|
enums.CredentialType.USERNAME_AND_PASSWORD:
|
||||||
|
self._credential_value = UsernamePasswordCredential()
|
||||||
|
elif self.credential_type == enums.CredentialType.DEVICE:
|
||||||
|
self._credential_value = DeviceCredential()
|
||||||
|
else:
|
||||||
|
# TODO (peter-hamilton) Add case for Attestation.
|
||||||
|
raise ValueError(
|
||||||
|
"Credential encoding includes unrecognized credential "
|
||||||
|
"type."
|
||||||
|
)
|
||||||
|
self._credential_value.read(local_stream)
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
"Credential encoding missing the credential value."
|
||||||
|
)
|
||||||
|
|
||||||
|
self.is_oversized(local_stream)
|
||||||
|
|
||||||
|
def write(self, output_stream):
|
||||||
|
"""
|
||||||
|
Write the data encoding the Credential struct to a stream.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
output_stream (stream): A data stream in which to encode object
|
||||||
|
data, supporting a write method; usually a BytearrayStream
|
||||||
|
object.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: Raised if either the credential type or value are not
|
||||||
|
defined.
|
||||||
|
"""
|
||||||
|
local_stream = BytearrayStream()
|
||||||
|
|
||||||
|
if self._credential_type:
|
||||||
|
self._credential_type.write(local_stream)
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
"Credential struct missing the credential type."
|
||||||
|
)
|
||||||
|
|
||||||
|
if self._credential_value:
|
||||||
|
self._credential_value.write(local_stream)
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
"Credential struct missing the credential value."
|
||||||
|
)
|
||||||
|
|
||||||
|
self.length = local_stream.length()
|
||||||
|
super(Credential, self).write(output_stream)
|
||||||
|
output_stream.write(local_stream.buffer)
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
if isinstance(other, Credential):
|
||||||
|
if self.credential_type != other.credential_type:
|
||||||
|
return False
|
||||||
|
elif self.credential_value != other.credential_value:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return NotImplemented
|
||||||
|
|
||||||
|
def __ne__(self, other):
|
||||||
|
if isinstance(other, Credential):
|
||||||
|
return not (self == other)
|
||||||
|
else:
|
||||||
|
return NotImplemented
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
args = ", ".join([
|
||||||
|
"credential_type={}".format(self.credential_type),
|
||||||
|
"credential_value={}".format(repr(self.credential_value))
|
||||||
|
])
|
||||||
|
return "Credential({})".format(args)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return str({
|
||||||
|
"credential_type": self.credential_type,
|
||||||
|
"credential_value": str(self.credential_value)
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
# 2.1.3
|
|
||||||
class KeyBlock(Struct):
|
class KeyBlock(Struct):
|
||||||
|
|
||||||
class KeyCompressionType(Enumeration):
|
class KeyCompressionType(Enumeration):
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -64,8 +64,6 @@ from kmip.services.results import OperationResult
|
||||||
from kmip.services.results import QueryResult
|
from kmip.services.results import QueryResult
|
||||||
from kmip.services.results import RekeyKeyPairResult
|
from kmip.services.results import RekeyKeyPairResult
|
||||||
|
|
||||||
import kmip.core.utils as utils
|
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
import os
|
import os
|
||||||
import socket
|
import socket
|
||||||
|
@ -144,31 +142,17 @@ class TestKMIPClient(TestCase):
|
||||||
def test_build_credential(self):
|
def test_build_credential(self):
|
||||||
username = 'username'
|
username = 'username'
|
||||||
password = 'password'
|
password = 'password'
|
||||||
cred_type = CredentialType.USERNAME_AND_PASSWORD
|
|
||||||
self.client.username = username
|
self.client.username = username
|
||||||
self.client.password = password
|
self.client.password = password
|
||||||
|
|
||||||
credential = self.client._build_credential()
|
credential = self.client._build_credential()
|
||||||
|
|
||||||
message = utils.build_er_error(credential.__class__, 'type',
|
self.assertEqual(
|
||||||
cred_type,
|
CredentialType.USERNAME_AND_PASSWORD,
|
||||||
credential.credential_type.value,
|
credential.credential_type
|
||||||
'value')
|
)
|
||||||
self.assertEqual(CredentialType.USERNAME_AND_PASSWORD,
|
self.assertEqual(username, credential.credential_value.username)
|
||||||
credential.credential_type.value,
|
self.assertEqual(password, credential.credential_value.password)
|
||||||
message)
|
|
||||||
|
|
||||||
message = utils.build_er_error(
|
|
||||||
credential.__class__, 'type', username,
|
|
||||||
credential.credential_value.username.value, 'value')
|
|
||||||
self.assertEqual(username, credential.credential_value.username.value,
|
|
||||||
message)
|
|
||||||
|
|
||||||
message = utils.build_er_error(
|
|
||||||
credential.__class__, 'type', password,
|
|
||||||
credential.credential_value.password.value, 'value')
|
|
||||||
self.assertEqual(password, credential.credential_value.password.value,
|
|
||||||
message)
|
|
||||||
|
|
||||||
def test_build_credential_no_username(self):
|
def test_build_credential_no_username(self):
|
||||||
username = None
|
username = None
|
||||||
|
|
Loading…
Reference in New Issue