mirror of
https://github.com/OpenKMIP/PyKMIP.git
synced 2025-05-02 22:10:07 +02:00
This change fixes a bug with the Integer primitive that caused it to break when accepting long integer values. As long as the long integer value can be represented by a signed 32-bit integer, it is considered valid. Test cases have been added to check for this condition and the validation routine for Integer has been updated to raise appropriate errors.
760 lines
24 KiB
Python
760 lines
24 KiB
Python
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import logging
|
|
import six
|
|
import sys
|
|
|
|
from struct import pack, unpack
|
|
from enum import Enum
|
|
|
|
from kmip.core.enums import Types
|
|
from kmip.core.enums import Tags
|
|
|
|
from kmip.core.errors import ErrorStrings
|
|
|
|
from kmip.core import errors
|
|
from kmip.core import utils
|
|
|
|
|
|
class Base(object):
|
|
TAG_SIZE = 3
|
|
TYPE_SIZE = 1
|
|
LENGTH_SIZE = 4
|
|
|
|
def __init__(self, tag=Tags.DEFAULT, type=Types.DEFAULT):
|
|
self.tag = tag
|
|
self.type = type
|
|
self.length = None
|
|
|
|
# TODO (peter-hamilton) Convert this into a classmethod, class name can be
|
|
# obtained from cls parameter that replaces self
|
|
def is_oversized(self, stream):
|
|
extra = len(stream.peek())
|
|
if extra > 0:
|
|
raise errors.StreamNotEmptyError(Base.__name__, extra)
|
|
|
|
def read_tag(self, istream):
|
|
# Read in the bytes for the tag
|
|
tts = istream.read(self.TAG_SIZE)
|
|
tag = unpack('!I', b'\x00' + tts[0:self.TAG_SIZE])[0]
|
|
|
|
enum_tag = Tags(tag)
|
|
|
|
# Verify that the tag matches for the current object
|
|
if enum_tag is not self.tag:
|
|
raise errors.ReadValueError(Base.__name__, 'tag',
|
|
hex(self.tag.value), hex(tag))
|
|
|
|
def read_type(self, istream):
|
|
# Read in the bytes for the type
|
|
tts = istream.read(self.TYPE_SIZE)
|
|
num_bytes = len(tts)
|
|
if num_bytes != self.TYPE_SIZE:
|
|
min_bytes = 'a minimum of {0} bytes'.format(self.TYPE_SIZE)
|
|
raise errors.ReadValueError(Base.__name__, 'type', min_bytes,
|
|
'{0} bytes'.format(num_bytes))
|
|
typ = unpack('!B', tts)[0]
|
|
|
|
enum_typ = Types(typ)
|
|
|
|
if enum_typ is not self.type:
|
|
raise errors.ReadValueError(Base.__name__, 'type',
|
|
self.type.value, typ)
|
|
|
|
def read_length(self, istream):
|
|
# Read in the bytes for the length
|
|
lst = istream.read(self.LENGTH_SIZE)
|
|
num_bytes = len(lst)
|
|
if num_bytes != self.LENGTH_SIZE:
|
|
min_bytes = 'a minimum of {0} bytes'.format(self.LENGTH_SIZE)
|
|
raise errors.ReadValueError(Base.__name__, 'length', min_bytes,
|
|
'{0} bytes'.format(num_bytes))
|
|
self.length = unpack('!I', lst)[0]
|
|
|
|
def read_value(self, istream):
|
|
raise NotImplementedError()
|
|
|
|
def read(self, istream):
|
|
self.read_tag(istream)
|
|
self.read_type(istream)
|
|
self.read_length(istream)
|
|
|
|
def write_tag(self, ostream):
|
|
# Write the tag to the output stream
|
|
ostream.write(pack('!I', self.tag.value)[1:])
|
|
|
|
def write_type(self, ostream):
|
|
if type(self.type) is not Types:
|
|
msg = ErrorStrings.BAD_EXP_RECV
|
|
raise TypeError(msg.format(Base.__name__, 'type',
|
|
Types, type(self.type)))
|
|
ostream.write(pack('!B', self.type.value))
|
|
|
|
def write_length(self, ostream):
|
|
if type(self.length) is not int:
|
|
msg = ErrorStrings.BAD_EXP_RECV
|
|
raise TypeError(msg.format(Base.__name__, 'length',
|
|
int, type(self.length)))
|
|
num_bytes = utils.count_bytes(self.length)
|
|
if num_bytes > self.LENGTH_SIZE:
|
|
raise errors.WriteOverflowError(Base.__name__, 'length',
|
|
self.LENGTH_SIZE, num_bytes)
|
|
ostream.write(pack('!I', self.length))
|
|
|
|
def write_value(self, ostream):
|
|
raise NotImplementedError()
|
|
|
|
def write(self, ostream):
|
|
self.write_tag(ostream)
|
|
self.write_type(ostream)
|
|
self.write_length(ostream)
|
|
|
|
def validate(self):
|
|
raise NotImplementedError()
|
|
|
|
@staticmethod
|
|
def is_tag_next(tag, stream):
|
|
next_tag = stream.peek(Base.TAG_SIZE)
|
|
if len(next_tag) != Base.TAG_SIZE:
|
|
return False
|
|
next_tag = unpack('!I', b'\x00' + next_tag)[0]
|
|
if next_tag == tag.value:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def is_type_next(kmip_type, stream):
|
|
tag_type_size = Base.TAG_SIZE + Base.TYPE_SIZE
|
|
tt = stream.peek(tag_type_size)
|
|
|
|
if len(tt) != tag_type_size:
|
|
return False
|
|
|
|
typ = unpack('!B', tt[Base.TAG_SIZE:])[0]
|
|
|
|
if typ == kmip_type.value:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
class Struct(Base):
|
|
|
|
def __init__(self, tag=Tags.DEFAULT):
|
|
super(Struct, self).__init__(tag, type=Types.STRUCTURE)
|
|
|
|
# NOTE (peter-hamilton) If seen, should indicate repr needs to be defined
|
|
def __repr__(self):
|
|
return "Struct()"
|
|
|
|
|
|
class Integer(Base):
|
|
LENGTH = 4
|
|
|
|
# Set for signed 32-bit integers
|
|
MIN = -2147483648
|
|
MAX = 2147483647
|
|
|
|
def __init__(self, value=None, tag=Tags.DEFAULT, signed=True):
|
|
super(Integer, self).__init__(tag, type=Types.INTEGER)
|
|
|
|
self.value = value
|
|
if self.value is None:
|
|
self.value = 0
|
|
|
|
self.length = self.LENGTH
|
|
self.padding_length = self.LENGTH
|
|
if signed:
|
|
self.pack_string = '!i'
|
|
else:
|
|
self.pack_string = '!I'
|
|
|
|
self.validate()
|
|
|
|
def read_value(self, istream):
|
|
if self.length is not self.LENGTH:
|
|
raise errors.ReadValueError(Integer.__name__, 'length',
|
|
self.LENGTH, self.length)
|
|
|
|
self.value = unpack(self.pack_string, istream.read(self.length))[0]
|
|
pad = unpack(self.pack_string, istream.read(self.padding_length))[0]
|
|
|
|
if pad is not 0:
|
|
raise errors.ReadValueError(Integer.__name__, 'pad', 0,
|
|
pad)
|
|
self.validate()
|
|
|
|
def read(self, istream):
|
|
super(Integer, self).read(istream)
|
|
self.read_value(istream)
|
|
|
|
def write_value(self, ostream):
|
|
ostream.write(pack(self.pack_string, self.value))
|
|
ostream.write(pack(self.pack_string, 0))
|
|
|
|
def write(self, ostream):
|
|
super(Integer, self).write(ostream)
|
|
self.write_value(ostream)
|
|
|
|
def validate(self):
|
|
"""
|
|
Verify that the value of the Integer object is valid.
|
|
|
|
Raises:
|
|
TypeError: if the value is not of type int or long
|
|
ValueError: if the value cannot be represented by a signed 32-bit
|
|
integer
|
|
"""
|
|
if self.value is not None:
|
|
if type(self.value) not in six.integer_types:
|
|
raise TypeError('expected (one of): {0}, observed: {1}'.format(
|
|
six.integer_types, type(self.value)))
|
|
else:
|
|
if self.value > Integer.MAX:
|
|
raise ValueError('integer value greater than accepted max')
|
|
elif self.value < Integer.MIN:
|
|
raise ValueError('integer value less than accepted min')
|
|
|
|
def __repr__(self):
|
|
return "{0}(value={1})".format(type(self).__name__, repr(self.value))
|
|
|
|
def __str__(self):
|
|
return "{0}".format(repr(self.value))
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, Integer):
|
|
return self.value == other.value
|
|
else:
|
|
return NotImplemented
|
|
|
|
def __ne__(self, other):
|
|
if isinstance(other, Integer):
|
|
return not self.__eq__(other)
|
|
else:
|
|
return NotImplemented
|
|
|
|
|
|
class LongInteger(Base):
|
|
LENGTH = 8
|
|
|
|
def __init__(self, value=None, tag=Tags.DEFAULT):
|
|
super(LongInteger, self).__init__(tag, type=Types.LONG_INTEGER)
|
|
self.value = value
|
|
self.length = self.LENGTH
|
|
|
|
self.validate()
|
|
|
|
def read_value(self, istream):
|
|
if self.length is not self.LENGTH:
|
|
raise errors.ReadValueError(LongInteger.__name__, 'length',
|
|
self.LENGTH, self.length)
|
|
|
|
self.value = unpack('!q', istream.read(self.length))[0]
|
|
self.validate()
|
|
|
|
def read(self, istream):
|
|
super(LongInteger, self).read(istream)
|
|
self.read_value(istream)
|
|
|
|
def write_value(self, ostream):
|
|
ostream.write(pack('!q', self.value))
|
|
|
|
def write(self, ostream):
|
|
super(LongInteger, self).write(ostream)
|
|
self.write_value(ostream)
|
|
|
|
def validate(self):
|
|
self.__validate()
|
|
|
|
def __validate(self):
|
|
if self.value is not None:
|
|
data_type = type(self.value)
|
|
if data_type not in six.integer_types:
|
|
raise errors.StateTypeError(
|
|
LongInteger.__name__, "{0}".format(six.integer_types),
|
|
data_type)
|
|
num_bytes = utils.count_bytes(self.value)
|
|
if num_bytes > self.length:
|
|
raise errors.StateOverflowError(
|
|
LongInteger.__name__, 'value', self.length, num_bytes)
|
|
|
|
def __repr__(self):
|
|
return '<Long Integer, %d>' % (self.value)
|
|
|
|
|
|
class BigInteger(Base):
|
|
BLOCK_SIZE = 8
|
|
SHIFT_SIZE = 64
|
|
|
|
def __init__(self, value=None, tag=Tags.DEFAULT):
|
|
super(BigInteger, self).__init__(tag, type=Types.BIG_INTEGER)
|
|
self.value = value
|
|
|
|
if self.value is not None:
|
|
self.real_length = utils.count_bytes(self.value)
|
|
self.padding_length = self.BLOCK_SIZE - (self.length %
|
|
self.BLOCK_SIZE)
|
|
if self.padding_length == self.BLOCK_SIZE:
|
|
self.padding_length = 0
|
|
else:
|
|
self.length = None
|
|
self.padding_length = None
|
|
|
|
self.validate()
|
|
|
|
def read_value(self, istream):
|
|
if (self.length < self.BLOCK_SIZE) or (self.length % self.BLOCK_SIZE):
|
|
raise errors.InvalidLengthError(BigInteger.__name__,
|
|
('multiple'
|
|
'of {0}'.format(self.BLOCK_SIZE)),
|
|
self.length)
|
|
self.value = 0
|
|
num_blocks = self.length / self.BLOCK_SIZE
|
|
|
|
# Read first block as signed data
|
|
self.value = unpack('!q', str(istream.read(self.BLOCK_SIZE)))[0]
|
|
|
|
# Shift current value and add on next unsigned block
|
|
for _ in range(num_blocks - 1):
|
|
self.value = self.value << self.SHIFT_SIZE
|
|
stream_data = istream.read(self.BLOCK_SIZE)
|
|
self.value += unpack('!Q', stream_data)[0]
|
|
|
|
self.validate()
|
|
|
|
def read(self, istream):
|
|
super(BigInteger, self).read(istream)
|
|
self.read_value(istream)
|
|
|
|
def write_value(self, ostream):
|
|
# 1. Determine the sign of the value (+/-); save it.
|
|
# 2. Extend hex of value with 0s until encoding is right size (8x).
|
|
# 3. Write out each block of the encoding as signed, 2s complement:
|
|
# pack('!q', sign * block)
|
|
|
|
# Determine sign for padding
|
|
pad_byte = 0x00
|
|
pad_nybl = 0x0
|
|
|
|
if self.value < 0:
|
|
pad_byte = 0xff
|
|
pad_nybl = 0xf
|
|
|
|
# Compose padding bytes
|
|
pad = ''
|
|
for _ in range(self.padding_length):
|
|
pad += hex(pad_byte)[2:]
|
|
|
|
str_rep = hex(self.value).rstrip("Ll")[2:]
|
|
if len(str_rep) % 2:
|
|
pad += hex(pad_nybl)[2]
|
|
|
|
# Compose value for block-based write
|
|
str_rep = pad + str_rep
|
|
num_blocks = len(str_rep) / self.BLOCK_SIZE
|
|
|
|
# Write first block as signed data
|
|
block = int(str_rep[0:self.BLOCK_SIZE], 16)
|
|
ostream.write(pack('!q', block))
|
|
|
|
# Write remaining blocks as unsigned data
|
|
for i in range(1, num_blocks):
|
|
block = str_rep[(self.BLOCK_SIZE * i):(self.BLOCK_SIZE * (i + 1))]
|
|
block = int(block, 16)
|
|
ostream.write(pack('!Q', block))
|
|
|
|
def write(self, ostream):
|
|
super(BigInteger, self).write(ostream)
|
|
self.write_value(ostream)
|
|
|
|
def validate(self):
|
|
self.__validate()
|
|
|
|
def __validate(self):
|
|
if self.value is not None:
|
|
data_type = type(self.value)
|
|
if data_type not in six.integer_types:
|
|
raise errors.StateTypeError(
|
|
BigInteger.__name__, "{0}".format(six.integer_types),
|
|
data_type)
|
|
num_bytes = utils.count_bytes(self.length)
|
|
if num_bytes > self.LENGTH_SIZE:
|
|
raise errors.StateOverflowError(
|
|
BigInteger.__name__, 'length', self.LENGTH_SIZE,
|
|
num_bytes)
|
|
|
|
|
|
class Enumeration(Integer):
|
|
ENUM_TYPE = None
|
|
|
|
def __init__(self, value=None, tag=Tags.DEFAULT):
|
|
self.enum = value
|
|
self.validate()
|
|
|
|
if self.enum is None:
|
|
super(Enumeration, self).__init__(None, tag, False)
|
|
else:
|
|
super(Enumeration, self).__init__(self.enum.value, tag, False)
|
|
self.type = Types.ENUMERATION
|
|
|
|
def read(self, istream):
|
|
super(Enumeration, self).read(istream)
|
|
self.enum = self.ENUM_TYPE(self.value)
|
|
self.validate()
|
|
|
|
def write(self, ostream):
|
|
super(Enumeration, self).write(ostream)
|
|
|
|
def validate(self):
|
|
self.__validate()
|
|
|
|
def __validate(self):
|
|
if self.enum is not None:
|
|
if not isinstance(self.enum, Enum):
|
|
raise TypeError("expected {0}, observed {1}".format(
|
|
type(self.enum), Enum))
|
|
|
|
def __repr__(self):
|
|
return "{0}(value={1})".format(type(self).__name__, self.enum)
|
|
|
|
def __str__(self):
|
|
return "{0}.{1}".format(type(self.enum).__name__, self.enum.name)
|
|
|
|
|
|
class Boolean(Base):
|
|
"""
|
|
An encodeable object representing a boolean value.
|
|
|
|
A Boolean is one of the KMIP primitive object types. It is encoded as an
|
|
unsigned, big-endian, 8-byte value, capable of taking the values True (1)
|
|
or False (0). For more information, see Section 9.1 of the KMIP 1.1
|
|
specification.
|
|
"""
|
|
LENGTH = 8
|
|
|
|
def __init__(self, value=True, tag=Tags.DEFAULT):
|
|
"""
|
|
Create a Boolean object.
|
|
|
|
Args:
|
|
value (bool): The value of the Boolean. Optional, defaults to True.
|
|
tag (Tags): An enumeration defining the tag of the Boolean object.
|
|
Optional, defaults to Tags.DEFAULT.
|
|
"""
|
|
super(Boolean, self).__init__(tag, type=Types.BOOLEAN)
|
|
self.logger = logging.getLogger(__name__)
|
|
self.value = value
|
|
self.length = self.LENGTH
|
|
|
|
self.validate()
|
|
|
|
def read_value(self, istream):
|
|
"""
|
|
Read the value of the Boolean object from the input stream.
|
|
|
|
Args:
|
|
istream (Stream): A buffer containing the encoded bytes of the
|
|
value of a Boolean object. Usually a BytearrayStream object.
|
|
Required.
|
|
|
|
Raises:
|
|
ValueError: if the read boolean value is not a 0 or 1.
|
|
"""
|
|
try:
|
|
value = unpack('!Q', istream.read(self.LENGTH))[0]
|
|
except:
|
|
self.logger.error("Error reading boolean value from buffer")
|
|
raise
|
|
|
|
if value == 1:
|
|
self.value = True
|
|
elif value == 0:
|
|
self.value = False
|
|
else:
|
|
raise ValueError("expected: 0 or 1, observed: {0}".format(value))
|
|
|
|
self.validate()
|
|
|
|
def read(self, istream):
|
|
"""
|
|
Read the encoding of the Boolean object from the input stream.
|
|
|
|
Args:
|
|
istream (Stream): A buffer containing the encoded bytes of a
|
|
Boolean object. Usually a BytearrayStream object. Required.
|
|
"""
|
|
super(Boolean, self).read(istream)
|
|
self.read_value(istream)
|
|
|
|
def write_value(self, ostream):
|
|
"""
|
|
Write the value of the Boolean object to the output stream.
|
|
|
|
Args:
|
|
ostream (Stream): A buffer to contain the encoded bytes of the
|
|
value of a Boolean object. Usually a BytearrayStream object.
|
|
Required.
|
|
"""
|
|
try:
|
|
ostream.write(pack('!Q', self.value))
|
|
except:
|
|
self.logger.error("Error writing boolean value to buffer")
|
|
raise
|
|
|
|
def write(self, ostream):
|
|
"""
|
|
Write the encoding of the Boolean object to the output stream.
|
|
|
|
Args:
|
|
ostream (Stream): A buffer to contain the encoded bytes of a
|
|
Boolean object. Usually a BytearrayStream object. Required.
|
|
"""
|
|
super(Boolean, self).write(ostream)
|
|
self.write_value(ostream)
|
|
|
|
def validate(self):
|
|
"""
|
|
Verify that the value of the Boolean object is valid.
|
|
|
|
Raises:
|
|
TypeError: if the value is not of type bool.
|
|
"""
|
|
if self.value:
|
|
if not isinstance(self.value, bool):
|
|
raise TypeError("expected: {0}, observed: {1}".format(
|
|
bool, type(self.value)))
|
|
|
|
def __repr__(self):
|
|
return "{0}(value={1})".format(type(self).__name__, repr(self.value))
|
|
|
|
def __str__(self):
|
|
return "{0}".format(repr(self.value))
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, Boolean):
|
|
return self.value == other.value
|
|
else:
|
|
return NotImplemented
|
|
|
|
def __ne__(self, other):
|
|
if isinstance(other, Boolean):
|
|
return not self.__eq__(other)
|
|
else:
|
|
return NotImplemented
|
|
|
|
|
|
class TextString(Base):
|
|
PADDING_SIZE = 8
|
|
BYTE_FORMAT = '!c'
|
|
|
|
def __init__(self, value=None, tag=Tags.DEFAULT):
|
|
super(TextString, self).__init__(tag, type=Types.TEXT_STRING)
|
|
|
|
if value is None:
|
|
self.value = ''
|
|
else:
|
|
self.value = value
|
|
|
|
self.validate()
|
|
|
|
if self.value is not None:
|
|
self.length = len(self.value)
|
|
self.padding_length = self.PADDING_SIZE - (self.length %
|
|
self.PADDING_SIZE)
|
|
if self.padding_length == self.PADDING_SIZE:
|
|
self.padding_length = 0
|
|
else:
|
|
self.length = None
|
|
self.padding_length = None
|
|
|
|
def read_value(self, istream):
|
|
# Read string text
|
|
self.value = ''
|
|
for _ in range(self.length):
|
|
c = unpack(self.BYTE_FORMAT, istream.read(1))[0]
|
|
if sys.version >= '3':
|
|
c = c.decode()
|
|
self.value += c
|
|
|
|
# Read padding and check content
|
|
self.padding_length = self.PADDING_SIZE - (self.length %
|
|
self.PADDING_SIZE)
|
|
if self.padding_length < self.PADDING_SIZE:
|
|
for _ in range(self.padding_length):
|
|
pad = unpack('!B', istream.read(1))[0]
|
|
if pad is not 0:
|
|
raise errors.ReadValueError(TextString.__name__, 'pad', 0,
|
|
pad)
|
|
|
|
def read(self, istream):
|
|
super(TextString, self).read(istream)
|
|
self.read_value(istream)
|
|
self.validate()
|
|
|
|
def write_value(self, ostream):
|
|
# Write string to stream
|
|
for char in self.value:
|
|
if sys.version < '3':
|
|
c = char
|
|
else:
|
|
c = char.encode()
|
|
ostream.write(pack(self.BYTE_FORMAT, c))
|
|
|
|
# Write padding to stream
|
|
for _ in range(self.padding_length):
|
|
ostream.write(pack('!B', 0))
|
|
|
|
def write(self, ostream):
|
|
super(TextString, self).write(ostream)
|
|
self.write_value(ostream)
|
|
|
|
def validate(self):
|
|
self.__validate()
|
|
|
|
def __validate(self):
|
|
if self.value is not None:
|
|
data_type = type(self.value)
|
|
if data_type is not str:
|
|
msg = ErrorStrings.BAD_EXP_RECV
|
|
raise TypeError(msg.format('TextString', 'value', str,
|
|
data_type))
|
|
|
|
def __repr__(self):
|
|
return "{0}(value={1})".format(type(self).__name__, repr(self.value))
|
|
|
|
def __str__(self):
|
|
return "{0}".format(repr(self.value))
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, TextString):
|
|
return self.value == other.value
|
|
else:
|
|
return NotImplemented
|
|
|
|
def __ne__(self, other):
|
|
if isinstance(other, TextString):
|
|
return not (self == other)
|
|
else:
|
|
return NotImplemented
|
|
|
|
|
|
class ByteString(Base):
|
|
PADDING_SIZE = 8
|
|
BYTE_FORMAT = '!B'
|
|
|
|
def __init__(self, value=None, tag=Tags.DEFAULT):
|
|
super(ByteString, self).__init__(tag, type=Types.BYTE_STRING)
|
|
|
|
if value is None:
|
|
self.value = bytes()
|
|
else:
|
|
self.value = bytes(value)
|
|
|
|
self.validate()
|
|
|
|
if self.value is not None:
|
|
self.length = len(self.value)
|
|
self.padding_length = self.PADDING_SIZE - (self.length %
|
|
self.PADDING_SIZE)
|
|
if self.padding_length == self.PADDING_SIZE:
|
|
self.padding_length = 0
|
|
else:
|
|
self.length = None
|
|
self.padding_length = None
|
|
|
|
def read_value(self, istream):
|
|
# Read bytes into bytearray
|
|
data = bytearray()
|
|
for _ in range(self.length):
|
|
data.append(istream.read(1)[0])
|
|
self.value = bytes(data)
|
|
|
|
# Read padding and check content
|
|
self.padding_length = self.PADDING_SIZE - (self.length %
|
|
self.PADDING_SIZE)
|
|
if self.padding_length == self.PADDING_SIZE:
|
|
self.padding_length = 0
|
|
|
|
if self.padding_length < self.PADDING_SIZE:
|
|
for _ in range(self.padding_length):
|
|
pad = unpack('!B', istream.read(1))[0]
|
|
if pad is not 0:
|
|
raise errors.ReadValueError(TextString.__name__, 'pad', 0,
|
|
pad)
|
|
|
|
def read(self, istream):
|
|
super(ByteString, self).read(istream)
|
|
self.read_value(istream)
|
|
|
|
def write_value(self, ostream):
|
|
# Write bytes to stream
|
|
data = bytearray(self.value)
|
|
for byte in data:
|
|
ostream.write(pack(self.BYTE_FORMAT, byte))
|
|
|
|
# Write padding to stream
|
|
for _ in range(self.padding_length):
|
|
ostream.write(pack('!B', 0))
|
|
|
|
def write(self, ostream):
|
|
super(ByteString, self).write(ostream)
|
|
self.write_value(ostream)
|
|
|
|
def validate(self):
|
|
self.__validate()
|
|
|
|
def __validate(self):
|
|
# TODO (peter-hamilton) Test is pointless, value is always bytes. Fix.
|
|
if self.value is not None:
|
|
data_type = type(self.value)
|
|
if data_type is not bytes:
|
|
msg = ErrorStrings.BAD_EXP_RECV
|
|
raise TypeError(msg.format('ByteString', 'value', bytes,
|
|
data_type))
|
|
|
|
def __repr__(self):
|
|
return "{0}(value={1})".format(type(self).__name__, repr(self.value))
|
|
|
|
def __str__(self):
|
|
return "{0}".format(str(self.value))
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, ByteString):
|
|
return self.value == other.value
|
|
else:
|
|
return NotImplemented
|
|
|
|
def __ne__(self, other):
|
|
if isinstance(other, ByteString):
|
|
return not (self == other)
|
|
else:
|
|
return NotImplemented
|
|
|
|
|
|
class DateTime(LongInteger):
|
|
|
|
def __init__(self, value=None, tag=Tags.DEFAULT):
|
|
super(DateTime, self).__init__(value, tag)
|
|
self.type = Types.DATE_TIME
|
|
|
|
|
|
class Interval(Integer):
|
|
|
|
def __init__(self, value=None, tag=Tags.DEFAULT):
|
|
super(Interval, self).__init__(value, tag)
|
|
self.type = Types.INTERVAL
|