mirror of https://github.com/OpenKMIP/PyKMIP.git
Add AttestationCredential support
This change adds an implementation of the AttestationCredential introduced in KMIP 1.2. The underlying Nonce struct is included. Unit test suites for both objects are provided. Finally, the Credential struct has also been updated to support the new credential type.
This commit is contained in:
parent
c9c72e172e
commit
5c5012f524
|
@ -172,6 +172,172 @@ class Attribute(Struct):
|
|||
return NotImplemented
|
||||
|
||||
|
||||
class Nonce(primitives.Struct):
|
||||
"""
|
||||
A struct representing a Nonce object.
|
||||
|
||||
Attributes:
|
||||
nonce_id (bytes): A binary string representing the ID of the nonce
|
||||
value.
|
||||
nonce_value (bytes): A binary string representing a random value.
|
||||
"""
|
||||
|
||||
def __init__(self, nonce_id=None, nonce_value=None):
|
||||
"""
|
||||
Construct a Nonce struct.
|
||||
|
||||
Args:
|
||||
nonce_id (bytes): A binary string representing the ID of the nonce
|
||||
value. Optional, defaults to None. Required for encoding and
|
||||
decoding.
|
||||
nonce_value (bytes): A binary string representing a random value.
|
||||
Optional, defaults to None. Required for encoding and decoding.
|
||||
"""
|
||||
super(Nonce, self).__init__(tag=enums.Tags.NONCE)
|
||||
|
||||
self._nonce_id = None
|
||||
self._nonce_value = None
|
||||
|
||||
self.nonce_id = nonce_id
|
||||
self.nonce_value = nonce_value
|
||||
|
||||
@property
|
||||
def nonce_id(self):
|
||||
if self._nonce_id:
|
||||
return self._nonce_id.value
|
||||
else:
|
||||
return None
|
||||
|
||||
@nonce_id.setter
|
||||
def nonce_id(self, value):
|
||||
if value is None:
|
||||
self._nonce_id = None
|
||||
elif isinstance(value, six.binary_type):
|
||||
self._nonce_id = primitives.ByteString(
|
||||
value=value,
|
||||
tag=enums.Tags.NONCE_ID
|
||||
)
|
||||
else:
|
||||
raise TypeError("Nonce ID must be bytes.")
|
||||
|
||||
@property
|
||||
def nonce_value(self):
|
||||
if self._nonce_value:
|
||||
return self._nonce_value.value
|
||||
else:
|
||||
return None
|
||||
|
||||
@nonce_value.setter
|
||||
def nonce_value(self, value):
|
||||
if value is None:
|
||||
self._nonce_value = None
|
||||
elif isinstance(value, six.binary_type):
|
||||
self._nonce_value = primitives.ByteString(
|
||||
value=value,
|
||||
tag=enums.Tags.NONCE_VALUE
|
||||
)
|
||||
else:
|
||||
raise TypeError("Nonce value must be bytes.")
|
||||
|
||||
def read(self, input_stream):
|
||||
"""
|
||||
Read the data encoding the Nonce struct and decode it into its
|
||||
constituent parts.
|
||||
|
||||
Args:
|
||||
input_stream (stream): A data stream containing encoded object
|
||||
data, supporting a read method; usually a BytearrayStream
|
||||
object.
|
||||
|
||||
Raises:
|
||||
ValueError: Raised if the nonce ID or nonce value is missing from
|
||||
the encoding.
|
||||
"""
|
||||
super(Nonce, self).read(input_stream)
|
||||
local_stream = BytearrayStream(input_stream.read(self.length))
|
||||
|
||||
if self.is_tag_next(enums.Tags.NONCE_ID, local_stream):
|
||||
self._nonce_id = primitives.ByteString(
|
||||
tag=enums.Tags.NONCE_ID
|
||||
)
|
||||
self._nonce_id.read(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Nonce encoding missing the nonce ID."
|
||||
)
|
||||
|
||||
if self.is_tag_next(enums.Tags.NONCE_VALUE, local_stream):
|
||||
self._nonce_value = primitives.ByteString(
|
||||
tag=enums.Tags.NONCE_VALUE
|
||||
)
|
||||
self._nonce_value.read(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Nonce encoding missing the nonce value."
|
||||
)
|
||||
|
||||
self.is_oversized(local_stream)
|
||||
|
||||
def write(self, output_stream):
|
||||
"""
|
||||
Write the data encoding the Nonce struct to a stream.
|
||||
|
||||
Args:
|
||||
output_stream (stream): A data stream in which to encode object
|
||||
data, supporting a write method; usually a BytearrayStream
|
||||
object.
|
||||
|
||||
Raises:
|
||||
ValueError: Raised if the nonce ID or nonce value is not defined.
|
||||
"""
|
||||
local_stream = BytearrayStream()
|
||||
|
||||
if self._nonce_id:
|
||||
self._nonce_id.write(local_stream)
|
||||
else:
|
||||
raise ValueError("Nonce struct is missing the nonce ID.")
|
||||
|
||||
if self._nonce_value:
|
||||
self._nonce_value.write(local_stream)
|
||||
else:
|
||||
raise ValueError("Nonce struct is missing the nonce value.")
|
||||
|
||||
self.length = local_stream.length()
|
||||
super(Nonce, self).write(output_stream)
|
||||
output_stream.write(local_stream.buffer)
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, Nonce):
|
||||
if self.nonce_id != other.nonce_id:
|
||||
return False
|
||||
elif self.nonce_value != other.nonce_value:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __ne__(self, other):
|
||||
if isinstance(other, Nonce):
|
||||
return not (self == other)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __repr__(self):
|
||||
args = ", ".join([
|
||||
"nonce_id={}".format(self.nonce_id),
|
||||
"nonce_value={}".format(self.nonce_value)
|
||||
])
|
||||
return "Nonce({})".format(args)
|
||||
|
||||
def __str__(self):
|
||||
body = ", ".join([
|
||||
"'nonce_id': {}".format(self.nonce_id),
|
||||
"'nonce_value': {}".format(self.nonce_value)
|
||||
])
|
||||
return "{" + body + "}"
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CredentialValue(primitives.Struct):
|
||||
"""
|
||||
|
@ -634,6 +800,274 @@ class DeviceCredential(CredentialValue):
|
|||
})
|
||||
|
||||
|
||||
class AttestationCredential(CredentialValue):
|
||||
"""
|
||||
A struct representing an AttestationCredential object.
|
||||
|
||||
Attributes:
|
||||
nonce: A nonce value obtained from the key management server.
|
||||
attestation_type: The type of attestation being used.
|
||||
attestation_measurement: The attestation measurement of the client.
|
||||
attestation_assertion: The attestation assertion from a third party.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
nonce=None,
|
||||
attestation_type=None,
|
||||
attestation_measurement=None,
|
||||
attestation_assertion=None):
|
||||
"""
|
||||
Construct an AttestationCredential struct.
|
||||
|
||||
Args:
|
||||
nonce (Nonce): A Nonce structure containing nonce data obtained
|
||||
from the key management server. Optional, defaults to None.
|
||||
Required for encoding and decoding.
|
||||
attestation_type (enum): An AttestationType enumeration specifying
|
||||
the type of attestation being used. Optional, defaults to None.
|
||||
Required for encoding and decoding.
|
||||
attestation_measurement (bytes): The device identifier for the
|
||||
credential. Optional, defaults to None. Required for encoding
|
||||
and decoding if the attestation assertion is not provided.
|
||||
attestation_assertion (bytes): The network identifier for the
|
||||
credential. Optional, defaults to None. Required for encoding
|
||||
and decoding if the attestation measurement is not provided.
|
||||
"""
|
||||
super(AttestationCredential, self).__init__(tag=Tags.CREDENTIAL_VALUE)
|
||||
|
||||
self._nonce = None
|
||||
self._attestation_type = None
|
||||
self._attestation_measurement = None
|
||||
self._attestation_assertion = None
|
||||
|
||||
self.nonce = nonce
|
||||
self.attestation_type = attestation_type
|
||||
self.attestation_measurement = attestation_measurement
|
||||
self.attestation_assertion = attestation_assertion
|
||||
|
||||
@property
|
||||
def nonce(self):
|
||||
return self._nonce
|
||||
|
||||
@nonce.setter
|
||||
def nonce(self, value):
|
||||
if value is None:
|
||||
self._nonce = None
|
||||
elif isinstance(value, Nonce):
|
||||
self._nonce = value
|
||||
else:
|
||||
raise TypeError("Nonce must be a Nonce struct.")
|
||||
|
||||
@property
|
||||
def attestation_type(self):
|
||||
if self._attestation_type:
|
||||
return self._attestation_type.value
|
||||
else:
|
||||
return None
|
||||
|
||||
@attestation_type.setter
|
||||
def attestation_type(self, value):
|
||||
if value is None:
|
||||
self._attestation_type = None
|
||||
elif isinstance(value, enums.AttestationType):
|
||||
self._attestation_type = Enumeration(
|
||||
enums.AttestationType,
|
||||
value=value,
|
||||
tag=Tags.ATTESTATION_TYPE
|
||||
)
|
||||
else:
|
||||
raise TypeError(
|
||||
"Attestation type must be an AttestationType enumeration."
|
||||
)
|
||||
|
||||
@property
|
||||
def attestation_measurement(self):
|
||||
if self._attestation_measurement:
|
||||
return self._attestation_measurement.value
|
||||
else:
|
||||
return None
|
||||
|
||||
@attestation_measurement.setter
|
||||
def attestation_measurement(self, value):
|
||||
if value is None:
|
||||
self._attestation_measurement = None
|
||||
elif isinstance(value, six.binary_type):
|
||||
self._attestation_measurement = primitives.ByteString(
|
||||
value=value,
|
||||
tag=enums.Tags.ATTESTATION_MEASUREMENT
|
||||
)
|
||||
else:
|
||||
raise TypeError("Attestation measurement must be bytes.")
|
||||
|
||||
@property
|
||||
def attestation_assertion(self):
|
||||
if self._attestation_assertion:
|
||||
return self._attestation_assertion.value
|
||||
else:
|
||||
return None
|
||||
|
||||
@attestation_assertion.setter
|
||||
def attestation_assertion(self, value):
|
||||
if value is None:
|
||||
self._attestation_assertion = None
|
||||
elif isinstance(value, six.binary_type):
|
||||
self._attestation_assertion = primitives.ByteString(
|
||||
value=value,
|
||||
tag=enums.Tags.ATTESTATION_ASSERTION
|
||||
)
|
||||
else:
|
||||
raise TypeError("Attestation assertion must be bytes.")
|
||||
|
||||
def read(self, input_stream):
|
||||
"""
|
||||
Read the data encoding the AttestationCredential struct and decode it
|
||||
into its constituent parts.
|
||||
|
||||
Args:
|
||||
input_stream (stream): A data stream containing encoded object
|
||||
data, supporting a read method; usually a BytearrayStream
|
||||
object.
|
||||
|
||||
Raises:
|
||||
ValueError: Raised if either the nonce or attestation type are
|
||||
missing from the encoding. Also raised if neither the
|
||||
attestation measurement nor the attestation assertion are
|
||||
included in the encoding.
|
||||
|
||||
"""
|
||||
super(AttestationCredential, self).read(input_stream)
|
||||
local_stream = BytearrayStream(input_stream.read(self.length))
|
||||
|
||||
if self.is_tag_next(enums.Tags.NONCE, local_stream):
|
||||
self._nonce = Nonce()
|
||||
self._nonce.read(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Attestation credential encoding is missing the nonce."
|
||||
)
|
||||
|
||||
if self.is_tag_next(enums.Tags.ATTESTATION_TYPE, local_stream):
|
||||
self._attestation_type = primitives.Enumeration(
|
||||
enums.AttestationType,
|
||||
tag=enums.Tags.ATTESTATION_TYPE
|
||||
)
|
||||
self._attestation_type.read(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Attestation credential encoding is missing the attestation "
|
||||
"type."
|
||||
)
|
||||
|
||||
self._attestation_measurement = None
|
||||
if self.is_tag_next(enums.Tags.ATTESTATION_MEASUREMENT, local_stream):
|
||||
self._attestation_measurement = primitives.ByteString(
|
||||
tag=enums.Tags.ATTESTATION_MEASUREMENT
|
||||
)
|
||||
self._attestation_measurement.read(local_stream)
|
||||
|
||||
self._attestation_assertion = None
|
||||
if self.is_tag_next(enums.Tags.ATTESTATION_ASSERTION, local_stream):
|
||||
self._attestation_assertion = primitives.ByteString(
|
||||
tag=enums.Tags.ATTESTATION_ASSERTION
|
||||
)
|
||||
self._attestation_assertion.read(local_stream)
|
||||
|
||||
if ((self._attestation_measurement is None) and
|
||||
(self._attestation_assertion is None)):
|
||||
raise ValueError(
|
||||
"Attestation credential encoding is missing either the "
|
||||
"attestation measurement or the attestation assertion."
|
||||
)
|
||||
|
||||
self.is_oversized(local_stream)
|
||||
|
||||
def write(self, output_stream):
|
||||
"""
|
||||
Write the data encoding the AttestationCredential struct to a stream.
|
||||
|
||||
Args:
|
||||
output_stream (stream): A data stream in which to encode object
|
||||
data, supporting a write method; usually a BytearrayStream
|
||||
object.
|
||||
|
||||
Raises:
|
||||
ValueError: Raised if either the nonce or attestation type are
|
||||
not defined. Also raised if neither the attestation measurement
|
||||
nor the attestation assertion are defined.
|
||||
"""
|
||||
local_stream = BytearrayStream()
|
||||
|
||||
if self._nonce:
|
||||
self._nonce.write(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Attestation credential struct is missing the nonce."
|
||||
)
|
||||
|
||||
if self._attestation_type:
|
||||
self._attestation_type.write(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Attestation credential struct is missing the attestation "
|
||||
"type."
|
||||
)
|
||||
|
||||
if self._attestation_measurement:
|
||||
self._attestation_measurement.write(local_stream)
|
||||
if self._attestation_assertion:
|
||||
self._attestation_assertion.write(local_stream)
|
||||
|
||||
if ((self._attestation_measurement is None) and
|
||||
(self._attestation_assertion is None)):
|
||||
raise ValueError(
|
||||
"Attestation credential struct is missing either the "
|
||||
"attestation measurement or the attestation assertion."
|
||||
)
|
||||
|
||||
self.length = local_stream.length()
|
||||
super(AttestationCredential, self).write(output_stream)
|
||||
output_stream.write(local_stream.buffer)
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, AttestationCredential):
|
||||
if self.nonce != other.nonce:
|
||||
return False
|
||||
elif self.attestation_type != other.attestation_type:
|
||||
return False
|
||||
elif self.attestation_measurement != other.attestation_measurement:
|
||||
return False
|
||||
elif self.attestation_assertion != other.attestation_assertion:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __ne__(self, other):
|
||||
if isinstance(other, AttestationCredential):
|
||||
return not (self == other)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __repr__(self):
|
||||
args = ", ".join([
|
||||
"nonce={}".format(repr(self.nonce)),
|
||||
"attestation_type={}".format(self.attestation_type),
|
||||
"attestation_measurement={}".format(self.attestation_measurement),
|
||||
"attestation_assertion={}".format(self.attestation_assertion)
|
||||
])
|
||||
return "AttestationCredential({})".format(args)
|
||||
|
||||
def __str__(self):
|
||||
return "{" \
|
||||
"'nonce': " + str(self.nonce) + ", " \
|
||||
"'attestation_type': " + str(self.attestation_type) + ", " \
|
||||
"'attestation_measurement': " + \
|
||||
str(self.attestation_measurement) + ", " \
|
||||
"'attestation_assertion': " + \
|
||||
str(self.attestation_assertion) + "}"
|
||||
|
||||
|
||||
class Credential(primitives.Struct):
|
||||
"""
|
||||
A struct representing a Credential object.
|
||||
|
@ -734,8 +1168,9 @@ class Credential(primitives.Struct):
|
|||
self._credential_value = UsernamePasswordCredential()
|
||||
elif self.credential_type == enums.CredentialType.DEVICE:
|
||||
self._credential_value = DeviceCredential()
|
||||
elif self.credential_type == enums.CredentialType.ATTESTATION:
|
||||
self._credential_value = AttestationCredential()
|
||||
else:
|
||||
# TODO (peter-hamilton) Add case for Attestation.
|
||||
raise ValueError(
|
||||
"Credential encoding includes unrecognized credential "
|
||||
"type."
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue