mirror of https://github.com/OpenKMIP/PyKMIP.git
Merge pull request #394 from OpenKMIP/feat/update-credential
Update the Credential objects
This commit is contained in:
commit
f72995490f
|
@ -13,43 +13,41 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from kmip.core.enums import CredentialType
|
||||
|
||||
from kmip.core.objects import Credential
|
||||
from kmip.core import enums
|
||||
from kmip.core import objects
|
||||
|
||||
|
||||
class CredentialFactory(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def _create_credential(self, credential_type, credential_value):
|
||||
credential_type = Credential.CredentialType(credential_type)
|
||||
return Credential(credential_type=credential_type,
|
||||
credential_value=credential_value)
|
||||
|
||||
def create_credential(self, cred_type, value):
|
||||
def create_credential(self, credential_type, credential_value):
|
||||
# Switch on the type of the credential
|
||||
if cred_type is CredentialType.USERNAME_AND_PASSWORD:
|
||||
value = self._create_username_password_credential(value)
|
||||
elif cred_type is CredentialType.DEVICE:
|
||||
value = self._create_device_credential(value)
|
||||
if credential_type is enums.CredentialType.USERNAME_AND_PASSWORD:
|
||||
credential_value = self.create_username_password_credential(
|
||||
credential_value
|
||||
)
|
||||
elif credential_type is enums.CredentialType.DEVICE:
|
||||
credential_value = self.create_device_credential(credential_value)
|
||||
else:
|
||||
msg = 'Unrecognized credential type: {0}'
|
||||
raise ValueError(msg.format(cred_type))
|
||||
raise ValueError(msg.format(credential_type))
|
||||
|
||||
return self._create_credential(cred_type, value)
|
||||
return objects.Credential(
|
||||
credential_type=credential_type,
|
||||
credential_value=credential_value
|
||||
)
|
||||
|
||||
def _create_username_password_credential(self, value):
|
||||
@staticmethod
|
||||
def create_username_password_credential(value):
|
||||
username = value.get('Username')
|
||||
password = value.get('Password')
|
||||
|
||||
username = Credential.UsernamePasswordCredential.Username(username)
|
||||
password = Credential.UsernamePasswordCredential.Password(password)
|
||||
return objects.UsernamePasswordCredential(
|
||||
username=username,
|
||||
password=password
|
||||
)
|
||||
|
||||
return Credential.UsernamePasswordCredential(username=username,
|
||||
password=password)
|
||||
|
||||
def _create_device_credential(self, value):
|
||||
@staticmethod
|
||||
def create_device_credential(value):
|
||||
dsn = value.get('Device Serial Number')
|
||||
password = value.get('Password')
|
||||
dev_id = value.get('Device Identifier')
|
||||
|
@ -57,16 +55,11 @@ class CredentialFactory(object):
|
|||
mach_id = value.get('Machine Identifier')
|
||||
med_id = value.get('Media Identifier')
|
||||
|
||||
dsn = Credential.DeviceCredential.DeviceSerialNumber(dsn)
|
||||
password = Credential.DeviceCredential.Password(password)
|
||||
dev_id = Credential.DeviceCredential.DeviceIdentifier(dev_id)
|
||||
net_id = Credential.DeviceCredential.NetworkIdentifier(net_id)
|
||||
mach_id = Credential.DeviceCredential.MachineIdentifier(mach_id)
|
||||
med_id = Credential.DeviceCredential.MediaIdentifier(med_id)
|
||||
|
||||
return Credential.DeviceCredential(device_serial_number=dsn,
|
||||
password=password,
|
||||
device_identifier=dev_id,
|
||||
network_identifier=net_id,
|
||||
machine_identifier=mach_id,
|
||||
media_identifier=med_id)
|
||||
return objects.DeviceCredential(
|
||||
device_serial_number=dsn,
|
||||
password=password,
|
||||
device_identifier=dev_id,
|
||||
network_identifier=net_id,
|
||||
machine_identifier=mach_id,
|
||||
media_identifier=med_id
|
||||
)
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import six
|
||||
from six.moves import xrange
|
||||
|
||||
|
@ -25,7 +26,6 @@ from kmip.core import enums
|
|||
from kmip.core.enums import AttributeType
|
||||
from kmip.core.enums import Tags
|
||||
from kmip.core.enums import Types
|
||||
from kmip.core.enums import CredentialType
|
||||
from kmip.core.enums import RevocationReasonCode as RevocationReasonCodeEnum
|
||||
from kmip.core import exceptions
|
||||
|
||||
|
@ -172,224 +172,646 @@ class Attribute(Struct):
|
|||
return NotImplemented
|
||||
|
||||
|
||||
# 2.1.2
|
||||
class Credential(Struct):
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CredentialValue(primitives.Struct):
|
||||
"""
|
||||
An empty, abstract base class to be used by Credential objects to easily
|
||||
group and type-check credential values.
|
||||
"""
|
||||
|
||||
class CredentialType(Enumeration):
|
||||
|
||||
def __init__(self, value=None):
|
||||
super(Credential.CredentialType, self).__init__(
|
||||
CredentialType, value, Tags.CREDENTIAL_TYPE)
|
||||
class UsernamePasswordCredential(CredentialValue):
|
||||
"""
|
||||
A struct representing a UsernamePasswordCredential object.
|
||||
|
||||
class UsernamePasswordCredential(Struct):
|
||||
Attributes:
|
||||
username: The username identifying the credential.
|
||||
password: The password associated with the username.
|
||||
"""
|
||||
|
||||
class Username(TextString):
|
||||
def __init__(self, value=None):
|
||||
super(Credential.UsernamePasswordCredential.Username,
|
||||
self).__init__(
|
||||
value, Tags.USERNAME)
|
||||
def __init__(self, username=None, password=None):
|
||||
"""
|
||||
Construct a UsernamePasswordCredential struct.
|
||||
|
||||
class Password(TextString):
|
||||
def __init__(self, value=None):
|
||||
super(Credential.UsernamePasswordCredential.Password,
|
||||
self).__init__(
|
||||
value, Tags.PASSWORD)
|
||||
Args:
|
||||
username (string): The username identifying the credential.
|
||||
Optional, defaults to None. Required for encoding and decoding.
|
||||
password (string): The password associated with the username.
|
||||
Optional, defaults to None.
|
||||
"""
|
||||
super(UsernamePasswordCredential, self).__init__(
|
||||
tag=Tags.CREDENTIAL_VALUE
|
||||
)
|
||||
|
||||
def __init__(self, username=None, password=None):
|
||||
super(Credential.UsernamePasswordCredential, self).__init__(
|
||||
tag=Tags.CREDENTIAL_VALUE)
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.validate()
|
||||
self._username = None
|
||||
self._password = None
|
||||
|
||||
def read(self, istream):
|
||||
super(Credential.UsernamePasswordCredential, self).read(istream)
|
||||
tstream = BytearrayStream(istream.read(self.length))
|
||||
self.username = username
|
||||
self.password = password
|
||||
|
||||
# Read the username of the credential
|
||||
self.username = self.Username()
|
||||
self.username.read(tstream)
|
||||
@property
|
||||
def username(self):
|
||||
if self._username:
|
||||
return self._username.value
|
||||
else:
|
||||
return None
|
||||
|
||||
# Read the password if it is next
|
||||
if self.is_tag_next(Tags.PASSWORD, tstream):
|
||||
self.password = self.Password()
|
||||
self.password.read(tstream)
|
||||
@username.setter
|
||||
def username(self, value):
|
||||
if value is None:
|
||||
self._username = None
|
||||
elif isinstance(value, six.string_types):
|
||||
self._username = primitives.TextString(
|
||||
value=value,
|
||||
tag=enums.Tags.USERNAME
|
||||
)
|
||||
else:
|
||||
raise TypeError("Username must be a string.")
|
||||
|
||||
self.is_oversized(tstream)
|
||||
self.validate()
|
||||
@property
|
||||
def password(self):
|
||||
if self._password:
|
||||
return self._password.value
|
||||
else:
|
||||
return None
|
||||
|
||||
def write(self, ostream):
|
||||
tstream = BytearrayStream()
|
||||
@password.setter
|
||||
def password(self, value):
|
||||
if value is None:
|
||||
self._password = None
|
||||
elif isinstance(value, six.string_types):
|
||||
self._password = primitives.TextString(
|
||||
value=value,
|
||||
tag=enums.Tags.PASSWORD
|
||||
)
|
||||
else:
|
||||
raise TypeError("Password must be a string.")
|
||||
|
||||
self.username.write(tstream)
|
||||
if self.password is not None:
|
||||
self.password.write(tstream)
|
||||
def read(self, input_stream):
|
||||
"""
|
||||
Read the data encoding the UsernamePasswordCredential struct and
|
||||
decode it into its constituent parts.
|
||||
|
||||
# Write the length and value of the credential
|
||||
self.length = tstream.length()
|
||||
super(Credential.UsernamePasswordCredential, self).write(ostream)
|
||||
ostream.write(tstream.buffer)
|
||||
Args:
|
||||
input_stream (stream): A data stream containing encoded object
|
||||
data, supporting a read method; usually a BytearrayStream
|
||||
object.
|
||||
|
||||
def validate(self):
|
||||
pass
|
||||
Raises:
|
||||
ValueError: Raised if the username is missing from the encoding.
|
||||
"""
|
||||
super(UsernamePasswordCredential, self).read(input_stream)
|
||||
local_stream = BytearrayStream(input_stream.read(self.length))
|
||||
|
||||
class DeviceCredential(Struct):
|
||||
if self.is_tag_next(enums.Tags.USERNAME, local_stream):
|
||||
self._username = primitives.TextString(
|
||||
tag=enums.Tags.USERNAME
|
||||
)
|
||||
self._username.read(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Username/password credential encoding missing the username."
|
||||
)
|
||||
|
||||
class DeviceSerialNumber(TextString):
|
||||
if self.is_tag_next(enums.Tags.PASSWORD, local_stream):
|
||||
self._password = primitives.TextString(
|
||||
tag=enums.Tags.PASSWORD
|
||||
)
|
||||
self._password.read(local_stream)
|
||||
|
||||
def __init__(self, value=None):
|
||||
super(Credential.DeviceCredential.DeviceSerialNumber, self).\
|
||||
__init__(value, Tags.DEVICE_SERIAL_NUMBER)
|
||||
self.is_oversized(local_stream)
|
||||
|
||||
class Password(TextString):
|
||||
def write(self, output_stream):
|
||||
"""
|
||||
Write the data encoding the UsernamePasswordCredential struct to a
|
||||
stream.
|
||||
|
||||
def __init__(self, value=None):
|
||||
super(Credential.DeviceCredential.Password, self).\
|
||||
__init__(value, Tags.PASSWORD)
|
||||
Args:
|
||||
output_stream (stream): A data stream in which to encode object
|
||||
data, supporting a write method; usually a BytearrayStream
|
||||
object.
|
||||
|
||||
class DeviceIdentifier(TextString):
|
||||
Raises:
|
||||
ValueError: Raised if the username is not defined.
|
||||
"""
|
||||
local_stream = BytearrayStream()
|
||||
|
||||
def __init__(self, value=None):
|
||||
super(Credential.DeviceCredential.DeviceIdentifier, self).\
|
||||
__init__(value, Tags.DEVICE_IDENTIFIER)
|
||||
if self._username:
|
||||
self._username.write(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Username/password credential struct missing the username."
|
||||
)
|
||||
|
||||
class NetworkIdentifier(TextString):
|
||||
if self._password:
|
||||
self._password.write(local_stream)
|
||||
|
||||
def __init__(self, value=None):
|
||||
super(Credential.DeviceCredential.NetworkIdentifier, self).\
|
||||
__init__(value, Tags.NETWORK_IDENTIFIER)
|
||||
self.length = local_stream.length()
|
||||
super(UsernamePasswordCredential, self).write(output_stream)
|
||||
output_stream.write(local_stream.buffer)
|
||||
|
||||
class MachineIdentifier(TextString):
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, UsernamePasswordCredential):
|
||||
if self.username != other.username:
|
||||
return False
|
||||
elif self.password != other.password:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __init__(self, value=None):
|
||||
super(Credential.DeviceCredential.MachineIdentifier, self).\
|
||||
__init__(value, Tags.MACHINE_IDENTIFIER)
|
||||
def __ne__(self, other):
|
||||
if isinstance(other, UsernamePasswordCredential):
|
||||
return not (self == other)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
class MediaIdentifier(TextString):
|
||||
def __repr__(self):
|
||||
args = ", ".join([
|
||||
"username='{}'".format(self.username),
|
||||
"password='{}'".format(self.password)
|
||||
])
|
||||
return "UsernamePasswordCredential({})".format(args)
|
||||
|
||||
def __init__(self, value=None):
|
||||
super(Credential.DeviceCredential.MediaIdentifier, self).\
|
||||
__init__(value, Tags.MEDIA_IDENTIFIER)
|
||||
def __str__(self):
|
||||
return str({
|
||||
"username": self.username,
|
||||
"password": self.password
|
||||
})
|
||||
|
||||
def __init__(self,
|
||||
device_serial_number=None,
|
||||
password=None,
|
||||
device_identifier=None,
|
||||
network_identifier=None,
|
||||
machine_identifier=None,
|
||||
media_identifier=None):
|
||||
super(Credential.DeviceCredential, self).__init__(
|
||||
tag=Tags.CREDENTIAL_VALUE)
|
||||
self.device_serial_number = device_serial_number
|
||||
self.password = password
|
||||
self.device_identifier = device_identifier
|
||||
self.network_identifier = network_identifier
|
||||
self.machine_identifier = machine_identifier
|
||||
self.media_identifier = media_identifier
|
||||
|
||||
def read(self, istream):
|
||||
super(Credential.DeviceCredential, self).read(istream)
|
||||
tstream = BytearrayStream(istream.read(self.length))
|
||||
class DeviceCredential(CredentialValue):
|
||||
"""
|
||||
A struct representing a DeviceCredential object.
|
||||
|
||||
# Read the password if it is next
|
||||
if self.is_tag_next(Tags.DEVICE_SERIAL_NUMBER, tstream):
|
||||
self.device_serial_number = self.DeviceSerialNumber()
|
||||
self.device_serial_number.read(tstream)
|
||||
Attributes:
|
||||
device_serial_number: The device serial number for the credential.
|
||||
password: The password associated with the credential.
|
||||
device_identifier: The device identifier for the credential.
|
||||
network_identifier: The network identifier for the credential.
|
||||
machine_identifier: The machine identifier for the credential.
|
||||
media_identifier: The media identifier for the credential.
|
||||
"""
|
||||
|
||||
# Read the password if it is next
|
||||
if self.is_tag_next(Tags.PASSWORD, tstream):
|
||||
self.password = self.Password()
|
||||
self.password.read(tstream)
|
||||
def __init__(self,
|
||||
device_serial_number=None,
|
||||
password=None,
|
||||
device_identifier=None,
|
||||
network_identifier=None,
|
||||
machine_identifier=None,
|
||||
media_identifier=None):
|
||||
"""
|
||||
Construct a DeviceCredential struct.
|
||||
|
||||
# Read the password if it is next
|
||||
if self.is_tag_next(Tags.DEVICE_IDENTIFIER, tstream):
|
||||
self.device_identifier = self.DeviceIdentifier()
|
||||
self.device_identifier.read(tstream)
|
||||
Args:
|
||||
device_serial_number (string): The device serial number for the
|
||||
credential. Optional, defaults to None.
|
||||
password (string): The password associated with the credential.
|
||||
Optional, defaults to None.
|
||||
device_identifier (string): The device identifier for the
|
||||
credential. Optional, defaults to None.
|
||||
network_identifier (string): The network identifier for the
|
||||
credential. Optional, defaults to None.
|
||||
machine_identifier (string): The machine identifier for the
|
||||
credential. Optional, defaults to None.
|
||||
media_identifier (string): The media identifier for the
|
||||
credential. Optional, defaults to None.
|
||||
"""
|
||||
super(DeviceCredential, self).__init__(tag=Tags.CREDENTIAL_VALUE)
|
||||
|
||||
# Read the password if it is next
|
||||
if self.is_tag_next(Tags.NETWORK_IDENTIFIER, tstream):
|
||||
self.network_identifier = self.NetworkIdentifier()
|
||||
self.network_identifier.read(tstream)
|
||||
self._device_serial_number = None
|
||||
self._password = None
|
||||
self._device_identifier = None
|
||||
self._network_identifier = None
|
||||
self._machine_identifier = None
|
||||
self._media_identifier = None
|
||||
|
||||
# Read the password if it is next
|
||||
if self.is_tag_next(Tags.MACHINE_IDENTIFIER, tstream):
|
||||
self.machine_identifier = self.MachineIdentifier()
|
||||
self.machine_identifier.read(tstream)
|
||||
self.device_serial_number = device_serial_number
|
||||
self.password = password
|
||||
self.device_identifier = device_identifier
|
||||
self.network_identifier = network_identifier
|
||||
self.machine_identifier = machine_identifier
|
||||
self.media_identifier = media_identifier
|
||||
|
||||
# Read the password if it is next
|
||||
if self.is_tag_next(Tags.MEDIA_IDENTIFIER, tstream):
|
||||
self.media_identifier = self.MediaIdentifier()
|
||||
self.media_identifier.read(tstream)
|
||||
@property
|
||||
def device_serial_number(self):
|
||||
if self._device_serial_number:
|
||||
return self._device_serial_number.value
|
||||
else:
|
||||
return None
|
||||
|
||||
self.is_oversized(tstream)
|
||||
self.validate()
|
||||
@device_serial_number.setter
|
||||
def device_serial_number(self, value):
|
||||
if value is None:
|
||||
self._device_serial_number = None
|
||||
elif isinstance(value, six.string_types):
|
||||
self._device_serial_number = primitives.TextString(
|
||||
value=value,
|
||||
tag=enums.Tags.DEVICE_SERIAL_NUMBER
|
||||
)
|
||||
else:
|
||||
raise TypeError("Device serial number must be a string.")
|
||||
|
||||
def write(self, ostream):
|
||||
tstream = BytearrayStream()
|
||||
@property
|
||||
def password(self):
|
||||
if self._password:
|
||||
return self._password.value
|
||||
else:
|
||||
return None
|
||||
|
||||
if self.device_serial_number is not None:
|
||||
self.device_serial_number.write(tstream)
|
||||
if self.password is not None:
|
||||
self.password.write(tstream)
|
||||
if self.device_identifier is not None:
|
||||
self.device_identifier.write(tstream)
|
||||
if self.network_identifier is not None:
|
||||
self.network_identifier.write(tstream)
|
||||
if self.machine_identifier is not None:
|
||||
self.machine_identifier.write(tstream)
|
||||
if self.media_identifier is not None:
|
||||
self.media_identifier.write(tstream)
|
||||
@password.setter
|
||||
def password(self, value):
|
||||
if value is None:
|
||||
self._password = None
|
||||
elif isinstance(value, six.string_types):
|
||||
self._password = primitives.TextString(
|
||||
value=value,
|
||||
tag=enums.Tags.PASSWORD
|
||||
)
|
||||
else:
|
||||
raise TypeError("Password must be a string.")
|
||||
|
||||
# Write the length and value of the credential
|
||||
self.length = tstream.length()
|
||||
super(Credential.DeviceCredential, self).write(ostream)
|
||||
ostream.write(tstream.buffer)
|
||||
@property
|
||||
def device_identifier(self):
|
||||
if self._device_identifier:
|
||||
return self._device_identifier.value
|
||||
else:
|
||||
return None
|
||||
|
||||
def validate(self):
|
||||
pass
|
||||
@device_identifier.setter
|
||||
def device_identifier(self, value):
|
||||
if value is None:
|
||||
self._device_identifier = None
|
||||
elif isinstance(value, six.string_types):
|
||||
self._device_identifier = primitives.TextString(
|
||||
value=value,
|
||||
tag=enums.Tags.DEVICE_IDENTIFIER
|
||||
)
|
||||
else:
|
||||
raise TypeError("Device identifier must be a string.")
|
||||
|
||||
@property
|
||||
def network_identifier(self):
|
||||
if self._network_identifier:
|
||||
return self._network_identifier.value
|
||||
else:
|
||||
return None
|
||||
|
||||
@network_identifier.setter
|
||||
def network_identifier(self, value):
|
||||
if value is None:
|
||||
self._network_identifier = None
|
||||
elif isinstance(value, six.string_types):
|
||||
self._network_identifier = primitives.TextString(
|
||||
value=value,
|
||||
tag=enums.Tags.NETWORK_IDENTIFIER
|
||||
)
|
||||
else:
|
||||
raise TypeError("Network identifier must be a string.")
|
||||
|
||||
@property
|
||||
def machine_identifier(self):
|
||||
if self._machine_identifier:
|
||||
return self._machine_identifier.value
|
||||
else:
|
||||
return None
|
||||
|
||||
@machine_identifier.setter
|
||||
def machine_identifier(self, value):
|
||||
if value is None:
|
||||
self._machine_identifier = None
|
||||
elif isinstance(value, six.string_types):
|
||||
self._machine_identifier = primitives.TextString(
|
||||
value=value,
|
||||
tag=enums.Tags.MACHINE_IDENTIFIER
|
||||
)
|
||||
else:
|
||||
raise TypeError("Machine identifier must be a string.")
|
||||
|
||||
@property
|
||||
def media_identifier(self):
|
||||
if self._media_identifier:
|
||||
return self._media_identifier.value
|
||||
else:
|
||||
return None
|
||||
|
||||
@media_identifier.setter
|
||||
def media_identifier(self, value):
|
||||
if value is None:
|
||||
self._media_identifier = None
|
||||
elif isinstance(value, six.string_types):
|
||||
self._media_identifier = primitives.TextString(
|
||||
value=value,
|
||||
tag=enums.Tags.MEDIA_IDENTIFIER
|
||||
)
|
||||
else:
|
||||
raise TypeError("Media identifier must be a string.")
|
||||
|
||||
def read(self, input_stream):
|
||||
"""
|
||||
Read the data encoding the DeviceCredential struct and decode it into
|
||||
its constituent parts.
|
||||
|
||||
Args:
|
||||
input_stream (stream): A data stream containing encoded object
|
||||
data, supporting a read method; usually a BytearrayStream
|
||||
object..
|
||||
"""
|
||||
super(DeviceCredential, self).read(input_stream)
|
||||
local_stream = BytearrayStream(input_stream.read(self.length))
|
||||
|
||||
if self.is_tag_next(enums.Tags.DEVICE_SERIAL_NUMBER, local_stream):
|
||||
self._device_serial_number = primitives.TextString(
|
||||
tag=enums.Tags.DEVICE_SERIAL_NUMBER
|
||||
)
|
||||
self._device_serial_number.read(local_stream)
|
||||
|
||||
if self.is_tag_next(enums.Tags.PASSWORD, local_stream):
|
||||
self._password = primitives.TextString(
|
||||
tag=enums.Tags.PASSWORD
|
||||
)
|
||||
self._password.read(local_stream)
|
||||
|
||||
if self.is_tag_next(enums.Tags.DEVICE_IDENTIFIER, local_stream):
|
||||
self._device_identifier = primitives.TextString(
|
||||
tag=enums.Tags.DEVICE_IDENTIFIER
|
||||
)
|
||||
self._device_identifier.read(local_stream)
|
||||
|
||||
if self.is_tag_next(enums.Tags.NETWORK_IDENTIFIER, local_stream):
|
||||
self._network_identifier = primitives.TextString(
|
||||
tag=enums.Tags.NETWORK_IDENTIFIER
|
||||
)
|
||||
self._network_identifier.read(local_stream)
|
||||
|
||||
if self.is_tag_next(enums.Tags.MACHINE_IDENTIFIER, local_stream):
|
||||
self._machine_identifier = primitives.TextString(
|
||||
tag=enums.Tags.MACHINE_IDENTIFIER
|
||||
)
|
||||
self._machine_identifier.read(local_stream)
|
||||
|
||||
if self.is_tag_next(enums.Tags.MEDIA_IDENTIFIER, local_stream):
|
||||
self._media_identifier = primitives.TextString(
|
||||
tag=enums.Tags.MEDIA_IDENTIFIER
|
||||
)
|
||||
self._media_identifier.read(local_stream)
|
||||
|
||||
self.is_oversized(local_stream)
|
||||
|
||||
def write(self, output_stream):
|
||||
"""
|
||||
Write the data encoding the DeviceCredential struct to a stream.
|
||||
|
||||
Args:
|
||||
output_stream (stream): A data stream in which to encode object
|
||||
data, supporting a write method; usually a BytearrayStream
|
||||
object.
|
||||
"""
|
||||
local_stream = BytearrayStream()
|
||||
|
||||
if self._device_serial_number is not None:
|
||||
self._device_serial_number.write(local_stream)
|
||||
if self._password is not None:
|
||||
self._password.write(local_stream)
|
||||
if self._device_identifier is not None:
|
||||
self._device_identifier.write(local_stream)
|
||||
if self._network_identifier is not None:
|
||||
self._network_identifier.write(local_stream)
|
||||
if self._machine_identifier is not None:
|
||||
self._machine_identifier.write(local_stream)
|
||||
if self._media_identifier is not None:
|
||||
self._media_identifier.write(local_stream)
|
||||
|
||||
self.length = local_stream.length()
|
||||
super(DeviceCredential, self).write(output_stream)
|
||||
output_stream.write(local_stream.buffer)
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, DeviceCredential):
|
||||
if self.device_serial_number != other.device_serial_number:
|
||||
return False
|
||||
elif self.password != other.password:
|
||||
return False
|
||||
elif self.device_identifier != other.device_identifier:
|
||||
return False
|
||||
elif self.network_identifier != other.network_identifier:
|
||||
return False
|
||||
elif self.machine_identifier != other.machine_identifier:
|
||||
return False
|
||||
elif self.media_identifier != other.media_identifier:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __ne__(self, other):
|
||||
if isinstance(other, DeviceCredential):
|
||||
return not (self == other)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __repr__(self):
|
||||
args = ", ".join([
|
||||
"device_serial_number='{}'".format(self.device_serial_number),
|
||||
"password='{}'".format(self.password),
|
||||
"device_identifier='{}'".format(self.device_identifier),
|
||||
"network_identifier='{}'".format(self.network_identifier),
|
||||
"machine_identifier='{}'".format(self.machine_identifier),
|
||||
"media_identifier='{}'".format(self.media_identifier),
|
||||
])
|
||||
return "DeviceCredential({})".format(args)
|
||||
|
||||
def __str__(self):
|
||||
return str({
|
||||
"device_serial_number": self.device_serial_number,
|
||||
"password": self.password,
|
||||
"device_identifier": self.device_identifier,
|
||||
"network_identifier": self.network_identifier,
|
||||
"machine_identifier": self.machine_identifier,
|
||||
"media_identifier": self.media_identifier
|
||||
})
|
||||
|
||||
|
||||
class Credential(primitives.Struct):
|
||||
"""
|
||||
A struct representing a Credential object.
|
||||
|
||||
Attributes:
|
||||
credential_type: The credential type, a CredentialType enumeration.
|
||||
credential_value: The credential value, a CredentialValue instance.
|
||||
"""
|
||||
|
||||
def __init__(self, credential_type=None, credential_value=None):
|
||||
"""
|
||||
Construct a Credential struct.
|
||||
|
||||
Args:
|
||||
credential_type (CredentialType): An enumeration value that
|
||||
specifies the type of the credential struct. Optional,
|
||||
defaults to None. Required for encoding and decoding.
|
||||
credential_value (CredentialValue): The credential value
|
||||
corresponding to the credential type. Optional, defaults to
|
||||
None. Required for encoding and decoding.
|
||||
"""
|
||||
super(Credential, self).__init__(tag=Tags.CREDENTIAL)
|
||||
|
||||
self._credential_type = None
|
||||
self._credential_value = None
|
||||
|
||||
self.credential_type = credential_type
|
||||
self.credential_value = credential_value
|
||||
|
||||
def read(self, istream):
|
||||
super(Credential, self).read(istream)
|
||||
tstream = BytearrayStream(istream.read(self.length))
|
||||
|
||||
# Read the type of the credential
|
||||
self.credential_type = self.CredentialType()
|
||||
self.credential_type.read(tstream)
|
||||
|
||||
# Use the type to determine what credential value to read
|
||||
if self.credential_type.value is CredentialType.USERNAME_AND_PASSWORD:
|
||||
self.credential_value = self.UsernamePasswordCredential()
|
||||
elif self.credential_type.value is CredentialType.DEVICE:
|
||||
self.credential_value = self.DeviceCredential()
|
||||
@property
|
||||
def credential_type(self):
|
||||
if self._credential_type:
|
||||
return self._credential_type.value
|
||||
else:
|
||||
# TODO (peter-hamilton) Use more descriptive error here
|
||||
raise NotImplementedError()
|
||||
self.credential_value.read(tstream)
|
||||
return None
|
||||
|
||||
self.is_oversized(tstream)
|
||||
self.validate()
|
||||
@credential_type.setter
|
||||
def credential_type(self, value):
|
||||
if value is None:
|
||||
self._credential_type = None
|
||||
elif isinstance(value, enums.CredentialType):
|
||||
self._credential_type = Enumeration(
|
||||
enums.CredentialType,
|
||||
value=value,
|
||||
tag=Tags.CREDENTIAL_TYPE
|
||||
)
|
||||
else:
|
||||
raise TypeError(
|
||||
"Credential type must be a CredentialType enumeration."
|
||||
)
|
||||
|
||||
def write(self, ostream):
|
||||
tstream = BytearrayStream()
|
||||
@property
|
||||
def credential_value(self):
|
||||
return self._credential_value
|
||||
|
||||
self.credential_type.write(tstream)
|
||||
self.credential_value.write(tstream)
|
||||
@credential_value.setter
|
||||
def credential_value(self, value):
|
||||
if value is None:
|
||||
self._credential_value = None
|
||||
elif isinstance(value, CredentialValue):
|
||||
self._credential_value = value
|
||||
else:
|
||||
raise TypeError(
|
||||
"Credential value must be a CredentialValue struct."
|
||||
)
|
||||
|
||||
# Write the length and value of the credential
|
||||
self.length = tstream.length()
|
||||
super(Credential, self).write(ostream)
|
||||
ostream.write(tstream.buffer)
|
||||
def read(self, input_stream):
|
||||
"""
|
||||
Read the data encoding the Credential struct and decode it into its
|
||||
constituent parts.
|
||||
|
||||
def validate(self):
|
||||
pass
|
||||
Args:
|
||||
input_stream (stream): A data stream containing encoded object
|
||||
data, supporting a read method; usually a BytearrayStream
|
||||
object.
|
||||
|
||||
Raises:
|
||||
ValueError: Raised if either the credential type or value are
|
||||
missing from the encoding.
|
||||
"""
|
||||
super(Credential, self).read(input_stream)
|
||||
local_stream = BytearrayStream(input_stream.read(self.length))
|
||||
|
||||
if self.is_tag_next(enums.Tags.CREDENTIAL_TYPE, local_stream):
|
||||
self._credential_type = primitives.Enumeration(
|
||||
enum=enums.CredentialType,
|
||||
tag=enums.Tags.CREDENTIAL_TYPE
|
||||
)
|
||||
self._credential_type.read(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Credential encoding missing the credential type."
|
||||
)
|
||||
|
||||
if self.is_tag_next(enums.Tags.CREDENTIAL_VALUE, local_stream):
|
||||
if self.credential_type == \
|
||||
enums.CredentialType.USERNAME_AND_PASSWORD:
|
||||
self._credential_value = UsernamePasswordCredential()
|
||||
elif self.credential_type == enums.CredentialType.DEVICE:
|
||||
self._credential_value = DeviceCredential()
|
||||
else:
|
||||
# TODO (peter-hamilton) Add case for Attestation.
|
||||
raise ValueError(
|
||||
"Credential encoding includes unrecognized credential "
|
||||
"type."
|
||||
)
|
||||
self._credential_value.read(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Credential encoding missing the credential value."
|
||||
)
|
||||
|
||||
self.is_oversized(local_stream)
|
||||
|
||||
def write(self, output_stream):
|
||||
"""
|
||||
Write the data encoding the Credential struct to a stream.
|
||||
|
||||
Args:
|
||||
output_stream (stream): A data stream in which to encode object
|
||||
data, supporting a write method; usually a BytearrayStream
|
||||
object.
|
||||
|
||||
Raises:
|
||||
ValueError: Raised if either the credential type or value are not
|
||||
defined.
|
||||
"""
|
||||
local_stream = BytearrayStream()
|
||||
|
||||
if self._credential_type:
|
||||
self._credential_type.write(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Credential struct missing the credential type."
|
||||
)
|
||||
|
||||
if self._credential_value:
|
||||
self._credential_value.write(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Credential struct missing the credential value."
|
||||
)
|
||||
|
||||
self.length = local_stream.length()
|
||||
super(Credential, self).write(output_stream)
|
||||
output_stream.write(local_stream.buffer)
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, Credential):
|
||||
if self.credential_type != other.credential_type:
|
||||
return False
|
||||
elif self.credential_value != other.credential_value:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __ne__(self, other):
|
||||
if isinstance(other, Credential):
|
||||
return not (self == other)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __repr__(self):
|
||||
args = ", ".join([
|
||||
"credential_type={}".format(self.credential_type),
|
||||
"credential_value={}".format(repr(self.credential_value))
|
||||
])
|
||||
return "Credential({})".format(args)
|
||||
|
||||
def __str__(self):
|
||||
return str({
|
||||
"credential_type": self.credential_type,
|
||||
"credential_value": str(self.credential_value)
|
||||
})
|
||||
|
||||
|
||||
# 2.1.3
|
||||
class KeyBlock(Struct):
|
||||
|
||||
class KeyCompressionType(Enumeration):
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -64,8 +64,6 @@ from kmip.services.results import OperationResult
|
|||
from kmip.services.results import QueryResult
|
||||
from kmip.services.results import RekeyKeyPairResult
|
||||
|
||||
import kmip.core.utils as utils
|
||||
|
||||
import mock
|
||||
import os
|
||||
import socket
|
||||
|
@ -144,31 +142,17 @@ class TestKMIPClient(TestCase):
|
|||
def test_build_credential(self):
|
||||
username = 'username'
|
||||
password = 'password'
|
||||
cred_type = CredentialType.USERNAME_AND_PASSWORD
|
||||
self.client.username = username
|
||||
self.client.password = password
|
||||
|
||||
credential = self.client._build_credential()
|
||||
|
||||
message = utils.build_er_error(credential.__class__, 'type',
|
||||
cred_type,
|
||||
credential.credential_type.value,
|
||||
'value')
|
||||
self.assertEqual(CredentialType.USERNAME_AND_PASSWORD,
|
||||
credential.credential_type.value,
|
||||
message)
|
||||
|
||||
message = utils.build_er_error(
|
||||
credential.__class__, 'type', username,
|
||||
credential.credential_value.username.value, 'value')
|
||||
self.assertEqual(username, credential.credential_value.username.value,
|
||||
message)
|
||||
|
||||
message = utils.build_er_error(
|
||||
credential.__class__, 'type', password,
|
||||
credential.credential_value.password.value, 'value')
|
||||
self.assertEqual(password, credential.credential_value.password.value,
|
||||
message)
|
||||
self.assertEqual(
|
||||
CredentialType.USERNAME_AND_PASSWORD,
|
||||
credential.credential_type
|
||||
)
|
||||
self.assertEqual(username, credential.credential_value.username)
|
||||
self.assertEqual(password, credential.credential_value.password)
|
||||
|
||||
def test_build_credential_no_username(self):
|
||||
username = None
|
||||
|
|
Loading…
Reference in New Issue