Make Reader also produce MarkedError, remove JSONError

This commit is contained in:
ZyX 2013-03-11 08:40:16 +04:00
parent 49ad855eb5
commit 9dc69d91ad
2 changed files with 39 additions and 43 deletions

View File

@ -1,17 +1,24 @@
__all__ = ['Mark', 'JSONError', 'MarkedError', 'echoerr']
__all__ = ['Mark', 'MarkedError', 'echoerr', 'NON_PRINTABLE']
import sys
import re
NON_PRINTABLE = re.compile('[^\t\n\x20-\x7E' + unichr(0x85) + (unichr(0xA0) + '-' + unichr(0xD7FF)) + (unichr(0xE000) + '-' + unichr(0xFFFD)) + ']')
def repl(s):
return '<x%04x>' % ord(s.group())
def strtrans(s):
return s.replace('\t', '>---')
return NON_PRINTABLE.sub(repl, s.replace('\t', '>---'))
class Mark:
def __init__(self, name, index, line, column, buffer, pointer):
def __init__(self, name, line, column, buffer, pointer):
self.name = name
self.index = index
self.line = line
self.column = column
self.buffer = buffer
@ -53,10 +60,6 @@ class Mark:
return where.encode('utf-8')
class JSONError(Exception):
pass
def echoerr(*args, **kwargs):
sys.stderr.write('\n')
sys.stderr.write(format_error(*args, **kwargs) + '\n')
@ -81,8 +84,8 @@ def format_error(context=None, context_mark=None, problem=None, problem_mark=Non
return '\n'.join(lines)
class MarkedError(JSONError):
class MarkedError(Exception):
def __init__(self, context=None, context_mark=None,
problem=None, problem_mark=None, note=None):
JSONError.__init__(self, format_error(context, context_mark, problem,
Exception.__init__(self, format_error(context, context_mark, problem,
problem_mark, note))

View File

@ -3,10 +3,9 @@
__all__ = ['Reader', 'ReaderError']
from .error import JSONError, Mark
from .error import MarkedError, Mark, NON_PRINTABLE
import codecs
import re
try:
from __builtin__ import unicode, unichr
@ -15,25 +14,8 @@ except ImportError:
unichr = chr # NOQA
class ReaderError(JSONError):
def __init__(self, name, position, character, encoding, reason):
self.name = name
self.character = character
self.position = position
self.encoding = encoding
self.reason = reason
def __str__(self):
if isinstance(self.character, bytes):
return "'%s' codec can't decode byte #x%02x: %s\n" \
" in \"%s\", position %d" \
% (self.encoding, ord(self.character), self.reason,
self.name, self.position)
else:
return "unacceptable character #x%04x: %s\n" \
" in \"%s\", position %d" \
% (self.character, self.reason,
self.name, self.position)
class ReaderError(MarkedError):
pass
class Reader(object):
@ -83,9 +65,7 @@ class Reader(object):
self.update(length)
return self.buffer[self.pointer:self.pointer + length]
def forward(self, length=1):
if self.pointer + length + 1 >= len(self.buffer):
self.update(length + 1)
def update_pointer(self, length):
while length:
ch = self.buffer[self.pointer]
self.pointer += 1
@ -94,20 +74,26 @@ class Reader(object):
if ch == '\n':
self.line += 1
self.column = 0
else:
self.column += 1
length -= 1
def get_mark(self):
return Mark(self.name, self.index, self.line, self.column,
self.full_buffer, self.full_pointer)
def forward(self, length=1):
if self.pointer + length + 1 >= len(self.buffer):
self.update(length + 1)
self.update_pointer(length)
NON_PRINTABLE = re.compile('[^\t\n\x20-\x7E' + unichr(0x85) + (unichr(0xA0) + '-' + unichr(0xD7FF)) + (unichr(0xE000) + '-' + unichr(0xFFFD)) + ']')
def get_mark(self):
return Mark(self.name, self.line, self.column, self.full_buffer, self.full_pointer)
def check_printable(self, data):
match = self.NON_PRINTABLE.search(data)
match = NON_PRINTABLE.search(data)
if match:
character = match.group()
position = self.index + (len(self.buffer) - self.pointer) + match.start()
raise ReaderError(self.name, position, ord(character), 'unicode', "special characters are not allowed")
self.update_pointer(match.start())
raise ReaderError('while reading from stream', None,
'found special characters which are not allowed',
Mark(self.name, self.line, self.column, self.full_buffer, self.full_pointer))
def update(self, length):
if self.raw_buffer is None:
@ -123,11 +109,18 @@ class Reader(object):
except UnicodeDecodeError as exc:
character = self.raw_buffer[exc.start]
position = self.stream_pointer - len(self.raw_buffer) + exc.start
raise ReaderError(self.name, position, character, exc.encoding, exc.reason)
self.check_printable(data)
data, converted = self.raw_decode(self.raw_buffer[:exc.start], 'strict', self.eof)
self.buffer += data
self.full_buffer += data + '<' + str(ord(character)) + '>'
self.raw_buffer = self.raw_buffer[converted:]
self.update_pointer(exc.start - 1)
raise ReaderError('while reading from stream', None,
'found character #x%04x that cannot be decoded by UTF-8 codec' % ord(character),
Mark(self.name, self.line, self.column, self.full_buffer, position))
self.buffer += data
self.full_buffer += data
self.raw_buffer = self.raw_buffer[converted:]
self.check_printable(data)
if self.eof:
self.buffer += '\0'
self.raw_buffer = None