mirror of https://github.com/OpenKMIP/PyKMIP.git
Merge pull request #397 from OpenKMIP/feat/add-attestation-credential
Add AttestationCredential support
This commit is contained in:
commit
3bc9a610c6
|
@ -172,6 +172,172 @@ class Attribute(Struct):
|
|||
return NotImplemented
|
||||
|
||||
|
||||
class Nonce(primitives.Struct):
|
||||
"""
|
||||
A struct representing a Nonce object.
|
||||
|
||||
Attributes:
|
||||
nonce_id (bytes): A binary string representing the ID of the nonce
|
||||
value.
|
||||
nonce_value (bytes): A binary string representing a random value.
|
||||
"""
|
||||
|
||||
def __init__(self, nonce_id=None, nonce_value=None):
|
||||
"""
|
||||
Construct a Nonce struct.
|
||||
|
||||
Args:
|
||||
nonce_id (bytes): A binary string representing the ID of the nonce
|
||||
value. Optional, defaults to None. Required for encoding and
|
||||
decoding.
|
||||
nonce_value (bytes): A binary string representing a random value.
|
||||
Optional, defaults to None. Required for encoding and decoding.
|
||||
"""
|
||||
super(Nonce, self).__init__(tag=enums.Tags.NONCE)
|
||||
|
||||
self._nonce_id = None
|
||||
self._nonce_value = None
|
||||
|
||||
self.nonce_id = nonce_id
|
||||
self.nonce_value = nonce_value
|
||||
|
||||
@property
|
||||
def nonce_id(self):
|
||||
if self._nonce_id:
|
||||
return self._nonce_id.value
|
||||
else:
|
||||
return None
|
||||
|
||||
@nonce_id.setter
|
||||
def nonce_id(self, value):
|
||||
if value is None:
|
||||
self._nonce_id = None
|
||||
elif isinstance(value, six.binary_type):
|
||||
self._nonce_id = primitives.ByteString(
|
||||
value=value,
|
||||
tag=enums.Tags.NONCE_ID
|
||||
)
|
||||
else:
|
||||
raise TypeError("Nonce ID must be bytes.")
|
||||
|
||||
@property
|
||||
def nonce_value(self):
|
||||
if self._nonce_value:
|
||||
return self._nonce_value.value
|
||||
else:
|
||||
return None
|
||||
|
||||
@nonce_value.setter
|
||||
def nonce_value(self, value):
|
||||
if value is None:
|
||||
self._nonce_value = None
|
||||
elif isinstance(value, six.binary_type):
|
||||
self._nonce_value = primitives.ByteString(
|
||||
value=value,
|
||||
tag=enums.Tags.NONCE_VALUE
|
||||
)
|
||||
else:
|
||||
raise TypeError("Nonce value must be bytes.")
|
||||
|
||||
def read(self, input_stream):
|
||||
"""
|
||||
Read the data encoding the Nonce struct and decode it into its
|
||||
constituent parts.
|
||||
|
||||
Args:
|
||||
input_stream (stream): A data stream containing encoded object
|
||||
data, supporting a read method; usually a BytearrayStream
|
||||
object.
|
||||
|
||||
Raises:
|
||||
ValueError: Raised if the nonce ID or nonce value is missing from
|
||||
the encoding.
|
||||
"""
|
||||
super(Nonce, self).read(input_stream)
|
||||
local_stream = BytearrayStream(input_stream.read(self.length))
|
||||
|
||||
if self.is_tag_next(enums.Tags.NONCE_ID, local_stream):
|
||||
self._nonce_id = primitives.ByteString(
|
||||
tag=enums.Tags.NONCE_ID
|
||||
)
|
||||
self._nonce_id.read(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Nonce encoding missing the nonce ID."
|
||||
)
|
||||
|
||||
if self.is_tag_next(enums.Tags.NONCE_VALUE, local_stream):
|
||||
self._nonce_value = primitives.ByteString(
|
||||
tag=enums.Tags.NONCE_VALUE
|
||||
)
|
||||
self._nonce_value.read(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Nonce encoding missing the nonce value."
|
||||
)
|
||||
|
||||
self.is_oversized(local_stream)
|
||||
|
||||
def write(self, output_stream):
|
||||
"""
|
||||
Write the data encoding the Nonce struct to a stream.
|
||||
|
||||
Args:
|
||||
output_stream (stream): A data stream in which to encode object
|
||||
data, supporting a write method; usually a BytearrayStream
|
||||
object.
|
||||
|
||||
Raises:
|
||||
ValueError: Raised if the nonce ID or nonce value is not defined.
|
||||
"""
|
||||
local_stream = BytearrayStream()
|
||||
|
||||
if self._nonce_id:
|
||||
self._nonce_id.write(local_stream)
|
||||
else:
|
||||
raise ValueError("Nonce struct is missing the nonce ID.")
|
||||
|
||||
if self._nonce_value:
|
||||
self._nonce_value.write(local_stream)
|
||||
else:
|
||||
raise ValueError("Nonce struct is missing the nonce value.")
|
||||
|
||||
self.length = local_stream.length()
|
||||
super(Nonce, self).write(output_stream)
|
||||
output_stream.write(local_stream.buffer)
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, Nonce):
|
||||
if self.nonce_id != other.nonce_id:
|
||||
return False
|
||||
elif self.nonce_value != other.nonce_value:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __ne__(self, other):
|
||||
if isinstance(other, Nonce):
|
||||
return not (self == other)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __repr__(self):
|
||||
args = ", ".join([
|
||||
"nonce_id={}".format(self.nonce_id),
|
||||
"nonce_value={}".format(self.nonce_value)
|
||||
])
|
||||
return "Nonce({})".format(args)
|
||||
|
||||
def __str__(self):
|
||||
body = ", ".join([
|
||||
"'nonce_id': {}".format(self.nonce_id),
|
||||
"'nonce_value': {}".format(self.nonce_value)
|
||||
])
|
||||
return "{" + body + "}"
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CredentialValue(primitives.Struct):
|
||||
"""
|
||||
|
@ -634,6 +800,274 @@ class DeviceCredential(CredentialValue):
|
|||
})
|
||||
|
||||
|
||||
class AttestationCredential(CredentialValue):
|
||||
"""
|
||||
A struct representing an AttestationCredential object.
|
||||
|
||||
Attributes:
|
||||
nonce: A nonce value obtained from the key management server.
|
||||
attestation_type: The type of attestation being used.
|
||||
attestation_measurement: The attestation measurement of the client.
|
||||
attestation_assertion: The attestation assertion from a third party.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
nonce=None,
|
||||
attestation_type=None,
|
||||
attestation_measurement=None,
|
||||
attestation_assertion=None):
|
||||
"""
|
||||
Construct an AttestationCredential struct.
|
||||
|
||||
Args:
|
||||
nonce (Nonce): A Nonce structure containing nonce data obtained
|
||||
from the key management server. Optional, defaults to None.
|
||||
Required for encoding and decoding.
|
||||
attestation_type (enum): An AttestationType enumeration specifying
|
||||
the type of attestation being used. Optional, defaults to None.
|
||||
Required for encoding and decoding.
|
||||
attestation_measurement (bytes): The device identifier for the
|
||||
credential. Optional, defaults to None. Required for encoding
|
||||
and decoding if the attestation assertion is not provided.
|
||||
attestation_assertion (bytes): The network identifier for the
|
||||
credential. Optional, defaults to None. Required for encoding
|
||||
and decoding if the attestation measurement is not provided.
|
||||
"""
|
||||
super(AttestationCredential, self).__init__(tag=Tags.CREDENTIAL_VALUE)
|
||||
|
||||
self._nonce = None
|
||||
self._attestation_type = None
|
||||
self._attestation_measurement = None
|
||||
self._attestation_assertion = None
|
||||
|
||||
self.nonce = nonce
|
||||
self.attestation_type = attestation_type
|
||||
self.attestation_measurement = attestation_measurement
|
||||
self.attestation_assertion = attestation_assertion
|
||||
|
||||
@property
|
||||
def nonce(self):
|
||||
return self._nonce
|
||||
|
||||
@nonce.setter
|
||||
def nonce(self, value):
|
||||
if value is None:
|
||||
self._nonce = None
|
||||
elif isinstance(value, Nonce):
|
||||
self._nonce = value
|
||||
else:
|
||||
raise TypeError("Nonce must be a Nonce struct.")
|
||||
|
||||
@property
|
||||
def attestation_type(self):
|
||||
if self._attestation_type:
|
||||
return self._attestation_type.value
|
||||
else:
|
||||
return None
|
||||
|
||||
@attestation_type.setter
|
||||
def attestation_type(self, value):
|
||||
if value is None:
|
||||
self._attestation_type = None
|
||||
elif isinstance(value, enums.AttestationType):
|
||||
self._attestation_type = Enumeration(
|
||||
enums.AttestationType,
|
||||
value=value,
|
||||
tag=Tags.ATTESTATION_TYPE
|
||||
)
|
||||
else:
|
||||
raise TypeError(
|
||||
"Attestation type must be an AttestationType enumeration."
|
||||
)
|
||||
|
||||
@property
|
||||
def attestation_measurement(self):
|
||||
if self._attestation_measurement:
|
||||
return self._attestation_measurement.value
|
||||
else:
|
||||
return None
|
||||
|
||||
@attestation_measurement.setter
|
||||
def attestation_measurement(self, value):
|
||||
if value is None:
|
||||
self._attestation_measurement = None
|
||||
elif isinstance(value, six.binary_type):
|
||||
self._attestation_measurement = primitives.ByteString(
|
||||
value=value,
|
||||
tag=enums.Tags.ATTESTATION_MEASUREMENT
|
||||
)
|
||||
else:
|
||||
raise TypeError("Attestation measurement must be bytes.")
|
||||
|
||||
@property
|
||||
def attestation_assertion(self):
|
||||
if self._attestation_assertion:
|
||||
return self._attestation_assertion.value
|
||||
else:
|
||||
return None
|
||||
|
||||
@attestation_assertion.setter
|
||||
def attestation_assertion(self, value):
|
||||
if value is None:
|
||||
self._attestation_assertion = None
|
||||
elif isinstance(value, six.binary_type):
|
||||
self._attestation_assertion = primitives.ByteString(
|
||||
value=value,
|
||||
tag=enums.Tags.ATTESTATION_ASSERTION
|
||||
)
|
||||
else:
|
||||
raise TypeError("Attestation assertion must be bytes.")
|
||||
|
||||
def read(self, input_stream):
|
||||
"""
|
||||
Read the data encoding the AttestationCredential struct and decode it
|
||||
into its constituent parts.
|
||||
|
||||
Args:
|
||||
input_stream (stream): A data stream containing encoded object
|
||||
data, supporting a read method; usually a BytearrayStream
|
||||
object.
|
||||
|
||||
Raises:
|
||||
ValueError: Raised if either the nonce or attestation type are
|
||||
missing from the encoding. Also raised if neither the
|
||||
attestation measurement nor the attestation assertion are
|
||||
included in the encoding.
|
||||
|
||||
"""
|
||||
super(AttestationCredential, self).read(input_stream)
|
||||
local_stream = BytearrayStream(input_stream.read(self.length))
|
||||
|
||||
if self.is_tag_next(enums.Tags.NONCE, local_stream):
|
||||
self._nonce = Nonce()
|
||||
self._nonce.read(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Attestation credential encoding is missing the nonce."
|
||||
)
|
||||
|
||||
if self.is_tag_next(enums.Tags.ATTESTATION_TYPE, local_stream):
|
||||
self._attestation_type = primitives.Enumeration(
|
||||
enums.AttestationType,
|
||||
tag=enums.Tags.ATTESTATION_TYPE
|
||||
)
|
||||
self._attestation_type.read(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Attestation credential encoding is missing the attestation "
|
||||
"type."
|
||||
)
|
||||
|
||||
self._attestation_measurement = None
|
||||
if self.is_tag_next(enums.Tags.ATTESTATION_MEASUREMENT, local_stream):
|
||||
self._attestation_measurement = primitives.ByteString(
|
||||
tag=enums.Tags.ATTESTATION_MEASUREMENT
|
||||
)
|
||||
self._attestation_measurement.read(local_stream)
|
||||
|
||||
self._attestation_assertion = None
|
||||
if self.is_tag_next(enums.Tags.ATTESTATION_ASSERTION, local_stream):
|
||||
self._attestation_assertion = primitives.ByteString(
|
||||
tag=enums.Tags.ATTESTATION_ASSERTION
|
||||
)
|
||||
self._attestation_assertion.read(local_stream)
|
||||
|
||||
if ((self._attestation_measurement is None) and
|
||||
(self._attestation_assertion is None)):
|
||||
raise ValueError(
|
||||
"Attestation credential encoding is missing either the "
|
||||
"attestation measurement or the attestation assertion."
|
||||
)
|
||||
|
||||
self.is_oversized(local_stream)
|
||||
|
||||
def write(self, output_stream):
|
||||
"""
|
||||
Write the data encoding the AttestationCredential struct to a stream.
|
||||
|
||||
Args:
|
||||
output_stream (stream): A data stream in which to encode object
|
||||
data, supporting a write method; usually a BytearrayStream
|
||||
object.
|
||||
|
||||
Raises:
|
||||
ValueError: Raised if either the nonce or attestation type are
|
||||
not defined. Also raised if neither the attestation measurement
|
||||
nor the attestation assertion are defined.
|
||||
"""
|
||||
local_stream = BytearrayStream()
|
||||
|
||||
if self._nonce:
|
||||
self._nonce.write(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Attestation credential struct is missing the nonce."
|
||||
)
|
||||
|
||||
if self._attestation_type:
|
||||
self._attestation_type.write(local_stream)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Attestation credential struct is missing the attestation "
|
||||
"type."
|
||||
)
|
||||
|
||||
if self._attestation_measurement:
|
||||
self._attestation_measurement.write(local_stream)
|
||||
if self._attestation_assertion:
|
||||
self._attestation_assertion.write(local_stream)
|
||||
|
||||
if ((self._attestation_measurement is None) and
|
||||
(self._attestation_assertion is None)):
|
||||
raise ValueError(
|
||||
"Attestation credential struct is missing either the "
|
||||
"attestation measurement or the attestation assertion."
|
||||
)
|
||||
|
||||
self.length = local_stream.length()
|
||||
super(AttestationCredential, self).write(output_stream)
|
||||
output_stream.write(local_stream.buffer)
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, AttestationCredential):
|
||||
if self.nonce != other.nonce:
|
||||
return False
|
||||
elif self.attestation_type != other.attestation_type:
|
||||
return False
|
||||
elif self.attestation_measurement != other.attestation_measurement:
|
||||
return False
|
||||
elif self.attestation_assertion != other.attestation_assertion:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __ne__(self, other):
|
||||
if isinstance(other, AttestationCredential):
|
||||
return not (self == other)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __repr__(self):
|
||||
args = ", ".join([
|
||||
"nonce={}".format(repr(self.nonce)),
|
||||
"attestation_type={}".format(self.attestation_type),
|
||||
"attestation_measurement={}".format(self.attestation_measurement),
|
||||
"attestation_assertion={}".format(self.attestation_assertion)
|
||||
])
|
||||
return "AttestationCredential({})".format(args)
|
||||
|
||||
def __str__(self):
|
||||
return "{" \
|
||||
"'nonce': " + str(self.nonce) + ", " \
|
||||
"'attestation_type': " + str(self.attestation_type) + ", " \
|
||||
"'attestation_measurement': " + \
|
||||
str(self.attestation_measurement) + ", " \
|
||||
"'attestation_assertion': " + \
|
||||
str(self.attestation_assertion) + "}"
|
||||
|
||||
|
||||
class Credential(primitives.Struct):
|
||||
"""
|
||||
A struct representing a Credential object.
|
||||
|
@ -734,8 +1168,9 @@ class Credential(primitives.Struct):
|
|||
self._credential_value = UsernamePasswordCredential()
|
||||
elif self.credential_type == enums.CredentialType.DEVICE:
|
||||
self._credential_value = DeviceCredential()
|
||||
elif self.credential_type == enums.CredentialType.ATTESTATION:
|
||||
self._credential_value = AttestationCredential()
|
||||
else:
|
||||
# TODO (peter-hamilton) Add case for Attestation.
|
||||
raise ValueError(
|
||||
"Credential encoding includes unrecognized credential "
|
||||
"type."
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue