Add JSON parser producing marked values

Ref #278
This commit is contained in:
ZyX 2013-03-08 20:01:45 +04:00
parent 5b11feac73
commit 287a88f473
14 changed files with 1843 additions and 0 deletions

View File

View File

@ -0,0 +1,67 @@
from .error import *
from .tokens import *
from .events import *
from .nodes import *
from .loader import *
__version__ = '3.10'
def scan(stream, Loader=Loader):
"""
Scan a YAML stream and produce scanning tokens.
"""
loader = Loader(stream)
try:
while loader.check_token():
yield loader.get_token()
finally:
loader.dispose()
def parse(stream, Loader=Loader):
"""
Parse a YAML stream and produce parsing events.
"""
loader = Loader(stream)
try:
while loader.check_event():
yield loader.get_event()
finally:
loader.dispose()
def compose(stream, Loader=Loader):
"""
Parse the first YAML document in a stream
and produce the corresponding representation tree.
"""
loader = Loader(stream)
try:
return loader.get_single_node()
finally:
loader.dispose()
def compose_all(stream, Loader=Loader):
"""
Parse all YAML documents in a stream
and produce corresponding representation trees.
"""
loader = Loader(stream)
try:
while loader.check_node():
yield loader.get_node()
finally:
loader.dispose()
def load(stream, Loader=Loader):
"""
Parse the first YAML document in a stream
and produce the corresponding Python object.
"""
loader = Loader(stream)
try:
return loader.get_single_data()
finally:
loader.dispose()

View File

@ -0,0 +1,119 @@
__all__ = ['Composer', 'ComposerError']
from .error import MarkedYAMLError
from .events import *
from .nodes import *
class ComposerError(MarkedYAMLError):
pass
class Composer:
def __init__(self):
pass
def check_node(self):
# Drop the STREAM-START event.
if self.check_event(StreamStartEvent):
self.get_event()
# If there are more documents available?
return not self.check_event(StreamEndEvent)
def get_node(self):
# Get the root node of the next document.
if not self.check_event(StreamEndEvent):
return self.compose_document()
def get_single_node(self):
# Drop the STREAM-START event.
self.get_event()
# Compose a document if the stream is not empty.
document = None
if not self.check_event(StreamEndEvent):
document = self.compose_document()
# Ensure that the stream contains no more documents.
if not self.check_event(StreamEndEvent):
event = self.get_event()
raise ComposerError("expected a single document in the stream",
document.start_mark, "but found another document",
event.start_mark)
# Drop the STREAM-END event.
self.get_event()
return document
def compose_document(self):
# Drop the DOCUMENT-START event.
self.get_event()
# Compose the root node.
node = self.compose_node(None, None)
# Drop the DOCUMENT-END event.
self.get_event()
return node
def compose_node(self, parent, index):
event = self.peek_event()
self.descend_resolver(parent, index)
if self.check_event(ScalarEvent):
node = self.compose_scalar_node()
elif self.check_event(SequenceStartEvent):
node = self.compose_sequence_node()
elif self.check_event(MappingStartEvent):
node = self.compose_mapping_node()
self.ascend_resolver()
return node
def compose_scalar_node(self):
event = self.get_event()
tag = event.tag
if tag is None or tag == '!':
tag = self.resolve(ScalarNode, event.value, event.implicit, event.start_mark)
node = ScalarNode(tag, event.value,
event.start_mark, event.end_mark, style=event.style)
return node
def compose_sequence_node(self):
start_event = self.get_event()
tag = start_event.tag
if tag is None or tag == '!':
tag = self.resolve(SequenceNode, None, start_event.implicit)
node = SequenceNode(tag, [],
start_event.start_mark, None,
flow_style=start_event.flow_style)
index = 0
while not self.check_event(SequenceEndEvent):
node.value.append(self.compose_node(node, index))
index += 1
end_event = self.get_event()
node.end_mark = end_event.end_mark
return node
def compose_mapping_node(self):
start_event = self.get_event()
tag = start_event.tag
if tag is None or tag == '!':
tag = self.resolve(MappingNode, None, start_event.implicit)
node = MappingNode(tag, [],
start_event.start_mark, None,
flow_style=start_event.flow_style)
while not self.check_event(MappingEndEvent):
#key_event = self.peek_event()
item_key = self.compose_node(node, None)
#if item_key in node.value:
# raise ComposerError("while composing a mapping", start_event.start_mark,
# "found duplicate key", key_event.start_mark)
item_value = self.compose_node(node, item_key)
#node.value[item_key] = item_value
node.value.append((item_key, item_value))
end_event = self.get_event()
node.end_mark = end_event.end_mark
return node

View File

@ -0,0 +1,308 @@
__all__ = ['BaseConstructor', 'Constructor', 'ConstructorError']
from .error import *
from .nodes import *
from .markedvalue import *
import collections, datetime, base64, binascii, re, sys, types
from functools import wraps
def marked(func):
@wraps(func)
def f(self, node, *args, **kwargs):
return gen_marked_value(func(self, node, *args, **kwargs), node.start_mark)
return f
class ConstructorError(MarkedYAMLError):
pass
class BaseConstructor:
yaml_constructors = {}
yaml_multi_constructors = {}
def __init__(self):
self.constructed_objects = {}
self.state_generators = []
self.deep_construct = False
def check_data(self):
# If there are more documents available?
return self.check_node()
def get_data(self):
# Construct and return the next document.
if self.check_node():
return self.construct_document(self.get_node())
def get_single_data(self):
# Ensure that the stream contains a single document and construct it.
node = self.get_single_node()
if node is not None:
return self.construct_document(node)
return None
def construct_document(self, node):
data = self.construct_object(node)
while self.state_generators:
state_generators = self.state_generators
self.state_generators = []
for generator in state_generators:
for dummy in generator:
pass
self.constructed_objects = {}
self.deep_construct = False
return data
def construct_object(self, node, deep=False):
if node in self.constructed_objects:
return self.constructed_objects[node]
if deep:
old_deep = self.deep_construct
self.deep_construct = True
constructor = None
tag_suffix = None
if node.tag in self.yaml_constructors:
constructor = self.yaml_constructors[node.tag]
else:
for tag_prefix in self.yaml_multi_constructors:
if node.tag.startswith(tag_prefix):
tag_suffix = node.tag[len(tag_prefix):]
constructor = self.yaml_multi_constructors[tag_prefix]
break
else:
if None in self.yaml_multi_constructors:
tag_suffix = node.tag
constructor = self.yaml_multi_constructors[None]
elif None in self.yaml_constructors:
constructor = self.yaml_constructors[None]
elif isinstance(node, ScalarNode):
constructor = self.__class__.construct_scalar
elif isinstance(node, SequenceNode):
constructor = self.__class__.construct_sequence
elif isinstance(node, MappingNode):
constructor = self.__class__.construct_mapping
if tag_suffix is None:
data = constructor(self, node)
else:
data = constructor(self, tag_suffix, node)
if isinstance(data, types.GeneratorType):
generator = data
data = next(generator)
if self.deep_construct:
for dummy in generator:
pass
else:
self.state_generators.append(generator)
self.constructed_objects[node] = data
if deep:
self.deep_construct = old_deep
return data
@marked
def construct_scalar(self, node):
if not isinstance(node, ScalarNode):
raise ConstructorError(None, None,
"expected a scalar node, but found %s" % node.id,
node.start_mark)
return node.value
def construct_sequence(self, node, deep=False):
if not isinstance(node, SequenceNode):
raise ConstructorError(None, None,
"expected a sequence node, but found %s" % node.id,
node.start_mark)
return [self.construct_object(child, deep=deep)
for child in node.value]
def construct_mapping(self, node, deep=False):
if not isinstance(node, MappingNode):
raise ConstructorError(None, None,
"expected a mapping node, but found %s" % node.id,
node.start_mark)
mapping = {}
for key_node, value_node in node.value:
key = self.construct_object(key_node, deep=deep)
if not isinstance(key, collections.Hashable):
raise ConstructorError("while constructing a mapping", node.start_mark,
"found unhashable key", key_node.start_mark)
value = self.construct_object(value_node, deep=deep)
mapping[key] = value
return mapping
@classmethod
def add_constructor(cls, tag, constructor):
if not 'yaml_constructors' in cls.__dict__:
cls.yaml_constructors = cls.yaml_constructors.copy()
cls.yaml_constructors[tag] = constructor
class Constructor(BaseConstructor):
def construct_scalar(self, node):
if isinstance(node, MappingNode):
for key_node, value_node in node.value:
if key_node.tag == 'tag:yaml.org,2002:value':
return self.construct_scalar(value_node)
return BaseConstructor.construct_scalar(self, node)
def flatten_mapping(self, node):
merge = []
index = 0
while index < len(node.value):
key_node, value_node = node.value[index]
if key_node.tag == 'tag:yaml.org,2002:merge':
del node.value[index]
if isinstance(value_node, MappingNode):
self.flatten_mapping(value_node)
merge.extend(value_node.value)
elif isinstance(value_node, SequenceNode):
submerge = []
for subnode in value_node.value:
if not isinstance(subnode, MappingNode):
raise ConstructorError("while constructing a mapping",
node.start_mark,
"expected a mapping for merging, but found %s"
% subnode.id, subnode.start_mark)
self.flatten_mapping(subnode)
submerge.append(subnode.value)
submerge.reverse()
for value in submerge:
merge.extend(value)
else:
raise ConstructorError("while constructing a mapping", node.start_mark,
"expected a mapping or list of mappings for merging, but found %s"
% value_node.id, value_node.start_mark)
elif key_node.tag == 'tag:yaml.org,2002:value':
key_node.tag = 'tag:yaml.org,2002:str'
index += 1
else:
index += 1
if merge:
node.value = merge + node.value
def construct_mapping(self, node, deep=False):
if isinstance(node, MappingNode):
self.flatten_mapping(node)
return BaseConstructor.construct_mapping(self, node, deep=deep)
@marked
def construct_yaml_null(self, node):
self.construct_scalar(node)
return None
bool_values = {
'yes': True,
'no': False,
'true': True,
'false': False,
'on': True,
'off': False,
}
@marked
def construct_yaml_bool(self, node):
value = self.construct_scalar(node)
return self.bool_values[value.lower()]
@marked
def construct_yaml_int(self, node):
value = self.construct_scalar(node)
value = value.replace('_', '')
sign = +1
if value[0] == '-':
sign = -1
if value[0] in '+-':
value = value[1:]
if value == '0':
return 0
elif value.startswith('0b'):
return sign*int(value[2:], 2)
elif value.startswith('0x'):
return sign*int(value[2:], 16)
elif value[0] == '0':
return sign*int(value, 8)
elif ':' in value:
digits = [int(part) for part in value.split(':')]
digits.reverse()
base = 1
value = 0
for digit in digits:
value += digit*base
base *= 60
return sign*value
else:
return sign*int(value)
@marked
def construct_yaml_float(self, node):
value = self.construct_scalar(node)
value = value.replace('_', '').lower()
sign = +1
if value[0] == '-':
sign = -1
if value[0] in '+-':
value = value[1:]
elif ':' in value:
digits = [float(part) for part in value.split(':')]
digits.reverse()
base = 1
value = 0.0
for digit in digits:
value += digit*base
base *= 60
return sign*value
else:
return sign*float(value)
def construct_yaml_str(self, node):
return self.construct_scalar(node)
def construct_yaml_seq(self, node):
data = gen_marked_value([], node.start_mark)
yield data
data.extend(self.construct_sequence(node))
def construct_yaml_map(self, node):
data = gen_marked_value({}, node.start_mark)
yield data
value = self.construct_mapping(node)
data.update(value)
def construct_undefined(self, node):
raise ConstructorError(None, None,
"could not determine a constructor for the tag %r" % node.tag,
node.start_mark)
Constructor.add_constructor(
'tag:yaml.org,2002:null',
Constructor.construct_yaml_null)
Constructor.add_constructor(
'tag:yaml.org,2002:bool',
Constructor.construct_yaml_bool)
Constructor.add_constructor(
'tag:yaml.org,2002:int',
Constructor.construct_yaml_int)
Constructor.add_constructor(
'tag:yaml.org,2002:float',
Constructor.construct_yaml_float)
Constructor.add_constructor(
'tag:yaml.org,2002:str',
Constructor.construct_yaml_str)
Constructor.add_constructor(
'tag:yaml.org,2002:seq',
Constructor.construct_yaml_seq)
Constructor.add_constructor(
'tag:yaml.org,2002:map',
Constructor.construct_yaml_map)
Constructor.add_constructor(None,
Constructor.construct_undefined)

View File

@ -0,0 +1,86 @@
__all__ = ['Mark', 'YAMLError', 'MarkedYAMLError', 'echoerr']
import sys
def strtrans(s):
return s.replace('\t', '>---')
class Mark:
def __init__(self, name, index, line, column, buffer, pointer):
self.name = name
self.index = index
self.line = line
self.column = column
self.buffer = buffer
self.pointer = pointer
def get_snippet(self, indent=4, max_length=75):
if self.buffer is None:
return None
head = ''
start = self.pointer
while start > 0 and self.buffer[start-1] not in '\0\n':
start -= 1
if self.pointer-start > max_length/2-1:
head = ' ... '
start += 5
break
tail = ''
end = self.pointer
while end < len(self.buffer) and self.buffer[end] not in '\0\n':
end += 1
if end-self.pointer > max_length/2-1:
tail = ' ... '
end -= 5
break
snippet = [self.buffer[start:self.pointer], self.buffer[self.pointer], self.buffer[self.pointer+1:end]]
snippet = [strtrans(s) for s in snippet]
return ' ' * indent + head + ''.join(snippet) + tail + '\n' \
+ ' ' * (indent + len(head) + len(snippet[0])) + '^'
def __str__(self):
snippet = self.get_snippet()
where = " in \"%s\", line %d, column %d" \
% (self.name, self.line+1, self.column+1)
if snippet is not None:
where += ":\n" + snippet
try:
return where.encode('utf-8')
except AttributeError:
return where
class YAMLError(Exception):
pass
def echoerr(*args, **kwargs):
sys.stderr.write(format_error(*args, **kwargs) + '\n')
def format_error(context=None, context_mark=None, problem=None, problem_mark=None, note=None):
lines = []
if context is not None:
lines.append(context)
if context_mark is not None \
and (problem is None or problem_mark is None
or context_mark.name != problem_mark.name
or context_mark.line != problem_mark.line
or context_mark.column != problem_mark.column):
lines.append(str(context_mark))
if problem is not None:
lines.append(problem)
if problem_mark is not None:
lines.append(str(problem_mark))
if note is not None:
lines.append(note)
return '\n'.join(lines)
class MarkedYAMLError(YAMLError):
def __init__(self, context=None, context_mark=None,
problem=None, problem_mark=None, note=None):
YAMLError.__init__(format_error(context, context_mark, problem,
problem_mark, note))

View File

@ -0,0 +1,83 @@
# Abstract classes.
class Event(object):
def __init__(self, start_mark=None, end_mark=None):
self.start_mark = start_mark
self.end_mark = end_mark
def __repr__(self):
attributes = [key for key in ['implicit', 'value']
if hasattr(self, key)]
arguments = ', '.join(['%s=%r' % (key, getattr(self, key))
for key in attributes])
return '%s(%s)' % (self.__class__.__name__, arguments)
class NodeEvent(Event):
def __init__(self, start_mark=None, end_mark=None):
self.start_mark = start_mark
self.end_mark = end_mark
class CollectionStartEvent(NodeEvent):
def __init__(self, implicit, start_mark=None, end_mark=None,
flow_style=None):
self.tag = None
self.implicit = implicit
self.start_mark = start_mark
self.end_mark = end_mark
self.flow_style = flow_style
class CollectionEndEvent(Event):
pass
# Implementations.
class StreamStartEvent(Event):
def __init__(self, start_mark=None, end_mark=None, encoding=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.encoding = encoding
class StreamEndEvent(Event):
pass
class DocumentStartEvent(Event):
def __init__(self, start_mark=None, end_mark=None,
explicit=None, version=None, tags=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.explicit = explicit
self.version = version
self.tags = tags
class DocumentEndEvent(Event):
def __init__(self, start_mark=None, end_mark=None,
explicit=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.explicit = explicit
class AliasEvent(NodeEvent):
pass
class ScalarEvent(NodeEvent):
def __init__(self, implicit, value,
start_mark=None, end_mark=None, style=None):
self.tag = None
self.implicit = implicit
self.value = value
self.start_mark = start_mark
self.end_mark = end_mark
self.style = style
class SequenceStartEvent(CollectionStartEvent):
pass
class SequenceEndEvent(CollectionEndEvent):
pass
class MappingStartEvent(CollectionStartEvent):
pass
class MappingEndEvent(CollectionEndEvent):
pass

View File

@ -0,0 +1,20 @@
__all__ = ['Loader']
from .reader import *
from .scanner import *
from .parser import *
from .composer import *
from .constructor import *
from .resolver import *
class Loader(Reader, Scanner, Parser, Composer, Constructor, Resolver):
def __init__(self, stream):
Reader.__init__(self, stream)
Scanner.__init__(self)
Parser.__init__(self)
Composer.__init__(self)
Constructor.__init__(self)
Resolver.__init__(self)

View File

@ -0,0 +1,28 @@
__all__ = ['gen_marked_value', 'MarkedValue']
class MarkedValue:
def __init__(self, value, mark):
self.mark = mark
self.value = value
classcache = {}
def gen_marked_value(value, mark):
if value.__class__ in classcache:
Marked = classcache[value.__class__]
else:
class Marked(MarkedValue):
for func in value.__class__.__dict__:
if func not in set(('__init__', '__new__', '__getattribute__')):
if func in set(('__eq__',)):
# HACK to make marked dictionaries always work
exec '''def {0}(self, *args):
return self.value.{0}(*[arg.value if isinstance(arg, MarkedValue) else arg for arg in args])
'''.format(func)
else:
exec '''def {0}(self, *args, **kwargs):
return self.value.{0}(*args, **kwargs)
'''.format(func)
classcache[value.__class__] = Marked
return Marked(value, mark)

View File

@ -0,0 +1,49 @@
class Node(object):
def __init__(self, tag, value, start_mark, end_mark):
self.tag = tag
self.value = value
self.start_mark = start_mark
self.end_mark = end_mark
def __repr__(self):
value = self.value
#if isinstance(value, list):
# if len(value) == 0:
# value = '<empty>'
# elif len(value) == 1:
# value = '<1 item>'
# else:
# value = '<%d items>' % len(value)
#else:
# if len(value) > 75:
# value = repr(value[:70]+u' ... ')
# else:
# value = repr(value)
value = repr(value)
return '%s(tag=%r, value=%s)' % (self.__class__.__name__, self.tag, value)
class ScalarNode(Node):
id = 'scalar'
def __init__(self, tag, value,
start_mark=None, end_mark=None, style=None):
self.tag = tag
self.value = value
self.start_mark = start_mark
self.end_mark = end_mark
self.style = style
class CollectionNode(Node):
def __init__(self, tag, value,
start_mark=None, end_mark=None, flow_style=None):
self.tag = tag
self.value = value
self.start_mark = start_mark
self.end_mark = end_mark
self.flow_style = flow_style
class SequenceNode(CollectionNode):
id = 'sequence'
class MappingNode(CollectionNode):
id = 'mapping'

View File

@ -0,0 +1,254 @@
__all__ = ['Parser', 'ParserError']
from .error import MarkedYAMLError, echoerr
from .tokens import *
from .events import *
from .scanner import *
class ParserError(MarkedYAMLError):
pass
class Parser:
# Since writing a recursive-descendant parser is a straightforward task, we
# do not give many comments here.
DEFAULT_TAGS = {
'!': '!',
'!!': 'tag:yaml.org,2002:',
}
def __init__(self):
self.current_event = None
self.yaml_version = None
self.tag_handles = {}
self.states = []
self.marks = []
self.state = self.parse_stream_start
def dispose(self):
# Reset the state attributes (to clear self-references)
self.states = []
self.state = None
def check_event(self, *choices):
# Check the type of the next event.
if self.current_event is None:
if self.state:
self.current_event = self.state()
if self.current_event is not None:
if not choices:
return True
for choice in choices:
if isinstance(self.current_event, choice):
return True
return False
def peek_event(self):
# Get the next event.
if self.current_event is None:
if self.state:
self.current_event = self.state()
return self.current_event
def get_event(self):
# Get the next event and proceed further.
if self.current_event is None:
if self.state:
self.current_event = self.state()
value = self.current_event
self.current_event = None
return value
# stream ::= STREAM-START implicit_document? explicit_document* STREAM-END
# implicit_document ::= block_node DOCUMENT-END*
# explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END*
def parse_stream_start(self):
# Parse the stream start.
token = self.get_token()
event = StreamStartEvent(token.start_mark, token.end_mark,
encoding=token.encoding)
# Prepare the next state.
self.state = self.parse_implicit_document_start
return event
def parse_implicit_document_start(self):
# Parse an implicit document.
if not self.check_token(StreamEndToken):
self.tag_handles = self.DEFAULT_TAGS
token = self.peek_token()
start_mark = end_mark = token.start_mark
event = DocumentStartEvent(start_mark, end_mark, explicit=False)
# Prepare the next state.
self.states.append(self.parse_document_end)
self.state = self.parse_node
return event
else:
return self.parse_document_start()
def parse_document_start(self):
# Parse an explicit document.
if not self.check_token(StreamEndToken):
token = self.peek_token()
start_mark = token.start_mark
raise ParserError(None, None,
"expected '<stream end>', but found %r"
% self.peek_token().id,
self.peek_token().start_mark)
else:
# Parse the end of the stream.
token = self.get_token()
event = StreamEndEvent(token.start_mark, token.end_mark)
assert not self.states
assert not self.marks
self.state = None
return event
def parse_document_end(self):
# Parse the document end.
token = self.peek_token()
start_mark = end_mark = token.start_mark
explicit = False
event = DocumentEndEvent(start_mark, end_mark, explicit=explicit)
# Prepare the next state.
self.state = self.parse_document_start
return event
def parse_document_content(self):
return self.parse_node()
def parse_node(self, indentless_sequence=False):
start_mark = end_mark = tag_mark = None
if start_mark is None:
start_mark = end_mark = self.peek_token().start_mark
event = None
implicit = True
if self.check_token(ScalarToken):
token = self.get_token()
end_mark = token.end_mark
if token.plain:
implicit = (True, False)
else:
implicit = (False, True)
event = ScalarEvent(implicit, token.value,
start_mark, end_mark, style=token.style)
self.state = self.states.pop()
elif self.check_token(FlowSequenceStartToken):
end_mark = self.peek_token().end_mark
event = SequenceStartEvent(implicit,
start_mark, end_mark, flow_style=True)
self.state = self.parse_flow_sequence_first_entry
elif self.check_token(FlowMappingStartToken):
end_mark = self.peek_token().end_mark
event = MappingStartEvent(implicit,
start_mark, end_mark, flow_style=True)
self.state = self.parse_flow_mapping_first_key
else:
token = self.peek_token()
raise ParserError("while parsing a flow node", start_mark,
"expected the node content, but found %r" % token.id,
token.start_mark)
return event
def parse_flow_sequence_first_entry(self):
token = self.get_token()
self.marks.append(token.start_mark)
return self.parse_flow_sequence_entry(first=True)
def parse_flow_sequence_entry(self, first=False):
if not self.check_token(FlowSequenceEndToken):
if not first:
if self.check_token(FlowEntryToken):
self.get_token()
if self.check_token(FlowSequenceEndToken):
token = self.peek_token()
echoerr("While parsing a flow sequence", self.marks[-1],
"expected sequence value, but got %r" % token.id, token.start_mark)
else:
token = self.peek_token()
raise ParserError("while parsing a flow sequence", self.marks[-1],
"expected ',' or ']', but got %r" % token.id, token.start_mark)
if not self.check_token(FlowSequenceEndToken):
self.states.append(self.parse_flow_sequence_entry)
return self.parse_node()
token = self.get_token()
event = SequenceEndEvent(token.start_mark, token.end_mark)
self.state = self.states.pop()
self.marks.pop()
return event
def parse_flow_sequence_entry_mapping_end(self):
self.state = self.parse_flow_sequence_entry
token = self.peek_token()
return MappingEndEvent(token.start_mark, token.start_mark)
def parse_flow_mapping_first_key(self):
token = self.get_token()
self.marks.append(token.start_mark)
return self.parse_flow_mapping_key(first=True)
def parse_flow_mapping_key(self, first=False):
if not self.check_token(FlowMappingEndToken):
if not first:
if self.check_token(FlowEntryToken):
self.get_token()
if self.check_token(FlowMappingEndToken):
token = self.peek_token()
echoerr("While parsing a flow mapping", self.marks[-1],
"expected mapping key, but got %r" % token.id, token.start_mark)
else:
token = self.peek_token()
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected ',' or '}', but got %r" % token.id, token.start_mark)
if self.check_token(KeyToken):
token = self.get_token()
if not self.check_token(ValueToken,
FlowEntryToken, FlowMappingEndToken):
self.states.append(self.parse_flow_mapping_value)
return self.parse_node()
else:
token = self.peek_token()
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected value, but got %r" % token.id, token.start_mark)
elif not self.check_token(FlowMappingEndToken):
token = self.peek_token()
expect_key = self.check_token(ValueToken, FlowEntryToken)
if not expect_key:
self.get_token()
expect_key = self.check_token(ValueToken)
if expect_key:
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected string key, but got %r" % token.id, token.start_mark)
else:
token = self.peek_token()
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected ':', but got %r" % token.id, token.start_mark)
token = self.get_token()
event = MappingEndEvent(token.start_mark, token.end_mark)
self.state = self.states.pop()
self.marks.pop()
return event
def parse_flow_mapping_value(self):
if self.check_token(ValueToken):
token = self.get_token()
if not self.check_token(FlowEntryToken, FlowMappingEndToken):
self.states.append(self.parse_flow_mapping_key)
return self.parse_node()
token = self.peek_token()
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected mapping value, but got %r" % token.id, token.start_mark)

View File

@ -0,0 +1,160 @@
# This module contains abstractions for the input stream. You don't have to
# looks further, there are no pretty code.
#
# We define two classes here.
#
# Mark(source, line, column)
# It's just a record and its only use is producing nice error messages.
# Parser does not use it for any other purposes.
#
# Reader(source, data)
# Reader determines the encoding of `data` and converts it to unicode.
# Reader provides the following methods and attributes:
# reader.peek(length=1) - return the next `length` characters
# reader.forward(length=1) - move the current position to `length` characters.
# reader.index - the number of the current character.
# reader.line, stream.column - the line and the column of the current character.
__all__ = ['Reader', 'ReaderError']
from .error import YAMLError, Mark
import codecs, re
try:
from __builtin__ import unicode, unichr
except ImportError:
unicode = str
unichr = chr
class ReaderError(YAMLError):
def __init__(self, name, position, character, encoding, reason):
self.name = name
self.character = character
self.position = position
self.encoding = encoding
self.reason = reason
def __str__(self):
if isinstance(self.character, bytes):
return "'%s' codec can't decode byte #x%02x: %s\n" \
" in \"%s\", position %d" \
% (self.encoding, ord(self.character), self.reason,
self.name, self.position)
else:
return "unacceptable character #x%04x: %s\n" \
" in \"%s\", position %d" \
% (self.character, self.reason,
self.name, self.position)
class Reader(object):
# Reader:
# - determines the data encoding and converts it to a unicode string,
# - checks if characters are in allowed range,
# - adds '\0' to the end.
# Reader accepts
# - a file-like object with its `read` method returning `str`,
# Yeah, it's ugly and slow.
def __init__(self, stream):
self.name = None
self.stream = None
self.stream_pointer = 0
self.eof = True
self.buffer = ''
self.pointer = 0
self.full_buffer = unicode('')
self.full_pointer = 0
self.raw_buffer = None
self.raw_decode = codecs.utf_8_decode
self.encoding = 'utf-8'
self.index = 0
self.line = 0
self.column = 0
self.stream = stream
self.name = getattr(stream, 'name', "<file>")
self.eof = False
self.raw_buffer = None
while not self.eof and (self.raw_buffer is None or len(self.raw_buffer) < 2):
self.update_raw()
self.update(1)
def peek(self, index=0):
try:
return self.buffer[self.pointer+index]
except IndexError:
self.update(index+1)
return self.buffer[self.pointer+index]
def prefix(self, length=1):
if self.pointer+length >= len(self.buffer):
self.update(length)
return self.buffer[self.pointer:self.pointer+length]
def forward(self, length=1):
if self.pointer+length+1 >= len(self.buffer):
self.update(length+1)
while length:
ch = self.buffer[self.pointer]
self.pointer += 1
self.full_pointer += 1
self.index += 1
if ch == '\n':
self.line += 1
self.column = 0
length -= 1
def get_mark(self):
return Mark(self.name, self.index, self.line, self.column,
self.full_buffer, self.full_pointer)
NON_PRINTABLE = re.compile('[^\t\n\x20-\x7E' + unichr(0x85) + (unichr(0xA0)+'-'+unichr(0xD7FF)) + (unichr(0xE000)+'-'+unichr(0xFFFD)) + ']')
def check_printable(self, data):
match = self.NON_PRINTABLE.search(data)
if match:
character = match.group()
position = self.index+(len(self.buffer)-self.pointer)+match.start()
raise ReaderError(self.name, position, ord(character),
'unicode', "special characters are not allowed")
def update(self, length):
if self.raw_buffer is None:
return
self.buffer = self.buffer[self.pointer:]
self.pointer = 0
while len(self.buffer) < length:
if not self.eof:
self.update_raw()
try:
data, converted = self.raw_decode(self.raw_buffer,
'strict', self.eof)
except UnicodeDecodeError as exc:
character = self.raw_buffer[exc.start]
position = self.stream_pointer-len(self.raw_buffer)+exc.start
raise ReaderError(self.name, position, character,
exc.encoding, exc.reason)
self.check_printable(data)
self.buffer += data
self.full_buffer += data
self.raw_buffer = self.raw_buffer[converted:]
if self.eof:
self.buffer += '\0'
self.raw_buffer = None
break
def update_raw(self, size=4096):
data = self.stream.read(size)
if self.raw_buffer is None:
self.raw_buffer = data
else:
self.raw_buffer += data
self.stream_pointer += len(data)
if not data:
self.eof = True

View File

@ -0,0 +1,135 @@
__all__ = ['BaseResolver', 'Resolver']
from .error import *
from .nodes import *
import re
class ResolverError(MarkedYAMLError):
pass
class BaseResolver:
DEFAULT_SCALAR_TAG = 'tag:yaml.org,2002:str'
DEFAULT_SEQUENCE_TAG = 'tag:yaml.org,2002:seq'
DEFAULT_MAPPING_TAG = 'tag:yaml.org,2002:map'
yaml_implicit_resolvers = {}
yaml_path_resolvers = {}
def __init__(self):
self.resolver_exact_paths = []
self.resolver_prefix_paths = []
@classmethod
def add_implicit_resolver(cls, tag, regexp, first):
if not 'yaml_implicit_resolvers' in cls.__dict__:
cls.yaml_implicit_resolvers = cls.yaml_implicit_resolvers.copy()
if first is None:
first = [None]
for ch in first:
cls.yaml_implicit_resolvers.setdefault(ch, []).append((tag, regexp))
def descend_resolver(self, current_node, current_index):
if not self.yaml_path_resolvers:
return
exact_paths = {}
prefix_paths = []
if current_node:
depth = len(self.resolver_prefix_paths)
for path, kind in self.resolver_prefix_paths[-1]:
if self.check_resolver_prefix(depth, path, kind,
current_node, current_index):
if len(path) > depth:
prefix_paths.append((path, kind))
else:
exact_paths[kind] = self.yaml_path_resolvers[path, kind]
else:
for path, kind in self.yaml_path_resolvers:
if not path:
exact_paths[kind] = self.yaml_path_resolvers[path, kind]
else:
prefix_paths.append((path, kind))
self.resolver_exact_paths.append(exact_paths)
self.resolver_prefix_paths.append(prefix_paths)
def ascend_resolver(self):
if not self.yaml_path_resolvers:
return
self.resolver_exact_paths.pop()
self.resolver_prefix_paths.pop()
def check_resolver_prefix(self, depth, path, kind,
current_node, current_index):
node_check, index_check = path[depth-1]
if isinstance(node_check, str):
if current_node.tag != node_check:
return
elif node_check is not None:
if not isinstance(current_node, node_check):
return
if index_check is True and current_index is not None:
return
if (index_check is False or index_check is None) \
and current_index is None:
return
if isinstance(index_check, str):
if not (isinstance(current_index, ScalarNode)
and index_check == current_index.value):
return
elif isinstance(index_check, int) and not isinstance(index_check, bool):
if index_check != current_index:
return
return True
def resolve(self, kind, value, implicit, mark=None):
if kind is ScalarNode and implicit[0]:
if value == '':
resolvers = self.yaml_implicit_resolvers.get('', [])
else:
resolvers = self.yaml_implicit_resolvers.get(value[0], [])
resolvers += self.yaml_implicit_resolvers.get(None, [])
for tag, regexp in resolvers:
if regexp.match(value):
return tag
else:
raise ResolverError('while resolving plain scalar', None,
"expected floating-point value, integer, null or boolean, but got %r" % value,
mark)
implicit = implicit[1]
if self.yaml_path_resolvers:
exact_paths = self.resolver_exact_paths[-1]
if kind in exact_paths:
return exact_paths[kind]
if None in exact_paths:
return exact_paths[None]
if kind is ScalarNode:
return self.DEFAULT_SCALAR_TAG
elif kind is SequenceNode:
return self.DEFAULT_SEQUENCE_TAG
elif kind is MappingNode:
return self.DEFAULT_MAPPING_TAG
class Resolver(BaseResolver):
pass
Resolver.add_implicit_resolver(
'tag:yaml.org,2002:bool',
re.compile(r'''^(?:true|false)$''', re.X),
list('yYnNtTfFoO'))
Resolver.add_implicit_resolver(
'tag:yaml.org,2002:float',
re.compile(r'^-?(?:0|[1-9]\d*)(?=[.eE])(?:\.\d+)?(?:[eE][-+]?\d+)?$', re.X),
list('-0123456789'))
Resolver.add_implicit_resolver(
'tag:yaml.org,2002:int',
re.compile(r'^(?:0|-?[1-9]\d*)$', re.X),
list('-0123456789'))
Resolver.add_implicit_resolver(
'tag:yaml.org,2002:null',
re.compile(r'^null$', re.X),
['n'])

View File

@ -0,0 +1,477 @@
# Scanner produces tokens of the following types:
# STREAM-START
# STREAM-END
# DOCUMENT-START
# DOCUMENT-END
# FLOW-SEQUENCE-START
# FLOW-MAPPING-START
# FLOW-SEQUENCE-END
# FLOW-MAPPING-END
# FLOW-ENTRY
# KEY
# VALUE
# ALIAS(value)
# ANCHOR(value)
# TAG(value)
# SCALAR(value, plain, style)
#
# Read comments in the Scanner code for more details.
#
__all__ = ['Scanner', 'ScannerError']
from .error import MarkedYAMLError
from .tokens import *
class ScannerError(MarkedYAMLError):
pass
class SimpleKey:
# See below simple keys treatment.
def __init__(self, token_number, index, line, column, mark):
self.token_number = token_number
self.index = index
self.line = line
self.column = column
self.mark = mark
class Scanner:
def __init__(self):
"""Initialize the scanner."""
# It is assumed that Scanner and Reader will have a common descendant.
# Reader do the dirty work of checking for BOM and converting the
# input data to Unicode. It also adds NUL to the end.
#
# Reader supports the following methods
# self.peek(i=0) # peek the next i-th character
# self.prefix(l=1) # peek the next l characters
# self.forward(l=1) # read the next l characters and move the pointer.
# Had we reached the end of the stream?
self.done = False
# The number of unclosed '{' and '['. `flow_level == 0` means block
# context.
self.flow_level = 0
# List of processed tokens that are not yet emitted.
self.tokens = []
# Add the STREAM-START token.
self.fetch_stream_start()
# Number of tokens that were emitted through the `get_token` method.
self.tokens_taken = 0
# Variables related to simple keys treatment.
# A simple key is a key that is not denoted by the '?' indicator.
# We emit the KEY token before all keys, so when we find a potential
# simple key, we try to locate the corresponding ':' indicator.
# Simple keys should be limited to a single line.
# Can a simple key start at the current position? A simple key may
# start:
# - after '{', '[', ',' (in the flow context),
self.allow_simple_key = False
# Keep track of possible simple keys. This is a dictionary. The key
# is `flow_level`; there can be no more that one possible simple key
# for each level. The value is a SimpleKey record:
# (token_number, index, line, column, mark)
# A simple key may start with SCALAR(flow), '[', or '{' tokens.
self.possible_simple_keys = {}
# Public methods.
def check_token(self, *choices):
# Check if the next token is one of the given types.
while self.need_more_tokens():
self.fetch_more_tokens()
if self.tokens:
if not choices:
return True
for choice in choices:
if isinstance(self.tokens[0], choice):
return True
return False
def peek_token(self):
# Return the next token, but do not delete if from the queue.
while self.need_more_tokens():
self.fetch_more_tokens()
if self.tokens:
return self.tokens[0]
def get_token(self):
# Return the next token.
while self.need_more_tokens():
self.fetch_more_tokens()
if self.tokens:
self.tokens_taken += 1
return self.tokens.pop(0)
# Private methods.
def need_more_tokens(self):
if self.done:
return False
if not self.tokens:
return True
# The current token may be a potential simple key, so we
# need to look further.
self.stale_possible_simple_keys()
if self.next_possible_simple_key() == self.tokens_taken:
return True
def fetch_more_tokens(self):
# Eat whitespaces and comments until we reach the next token.
self.scan_to_next_token()
# Remove obsolete possible simple keys.
self.stale_possible_simple_keys()
# Peek the next character.
ch = self.peek()
# Is it the end of stream?
if ch == '\0':
return self.fetch_stream_end()
# Note: the order of the following checks is NOT significant.
# Is it the flow sequence start indicator?
if ch == '[':
return self.fetch_flow_sequence_start()
# Is it the flow mapping start indicator?
if ch == '{':
return self.fetch_flow_mapping_start()
# Is it the flow sequence end indicator?
if ch == ']':
return self.fetch_flow_sequence_end()
# Is it the flow mapping end indicator?
if ch == '}':
return self.fetch_flow_mapping_end()
# Is it the flow entry indicator?
if ch == ',':
return self.fetch_flow_entry()
# Is it the value indicator?
if ch == ':' and self.flow_level:
return self.fetch_value()
# Is it a double quoted scalar?
if ch == '\"':
return self.fetch_double()
# It must be a plain scalar then.
if self.check_plain():
return self.fetch_plain()
# No? It's an error. Let's produce a nice error message.
raise ScannerError("while scanning for the next token", None,
"found character %r that cannot start any token" % ch,
self.get_mark())
# Simple keys treatment.
def next_possible_simple_key(self):
# Return the number of the nearest possible simple key. Actually we
# don't need to loop through the whole dictionary. We may replace it
# with the following code:
# if not self.possible_simple_keys:
# return None
# return self.possible_simple_keys[
# min(self.possible_simple_keys.keys())].token_number
min_token_number = None
for level in self.possible_simple_keys:
key = self.possible_simple_keys[level]
if min_token_number is None or key.token_number < min_token_number:
min_token_number = key.token_number
return min_token_number
def stale_possible_simple_keys(self):
# Remove entries that are no longer possible simple keys. According to
# the YAML specification, simple keys
# - should be limited to a single line,
# Disabling this procedure will allow simple keys of any length and
# height (may cause problems if indentation is broken though).
for level in list(self.possible_simple_keys):
key = self.possible_simple_keys[level]
if key.line != self.line:
del self.possible_simple_keys[level]
def save_possible_simple_key(self):
# The next token may start a simple key. We check if it's possible
# and save its position. This function is called for
# SCALAR(flow), '[', and '{'.
# The next token might be a simple key. Let's save it's number and
# position.
if self.allow_simple_key:
self.remove_possible_simple_key()
token_number = self.tokens_taken+len(self.tokens)
key = SimpleKey(token_number,
self.index, self.line, self.column, self.get_mark())
self.possible_simple_keys[self.flow_level] = key
def remove_possible_simple_key(self):
# Remove the saved possible key position at the current flow level.
if self.flow_level in self.possible_simple_keys:
key = self.possible_simple_keys[self.flow_level]
del self.possible_simple_keys[self.flow_level]
# Fetchers.
def fetch_stream_start(self):
# We always add STREAM-START as the first token and STREAM-END as the
# last token.
# Read the token.
mark = self.get_mark()
# Add STREAM-START.
self.tokens.append(StreamStartToken(mark, mark,
encoding=self.encoding))
def fetch_stream_end(self):
# Reset simple keys.
self.remove_possible_simple_key()
self.allow_simple_key = False
self.possible_simple_keys = {}
# Read the token.
mark = self.get_mark()
# Add STREAM-END.
self.tokens.append(StreamEndToken(mark, mark))
# The steam is finished.
self.done = True
def fetch_flow_sequence_start(self):
self.fetch_flow_collection_start(FlowSequenceStartToken)
def fetch_flow_mapping_start(self):
self.fetch_flow_collection_start(FlowMappingStartToken)
def fetch_flow_collection_start(self, TokenClass):
# '[' and '{' may start a simple key.
self.save_possible_simple_key()
# Increase the flow level.
self.flow_level += 1
# Simple keys are allowed after '[' and '{'.
self.allow_simple_key = True
# Add FLOW-SEQUENCE-START or FLOW-MAPPING-START.
start_mark = self.get_mark()
self.forward()
end_mark = self.get_mark()
self.tokens.append(TokenClass(start_mark, end_mark))
def fetch_flow_sequence_end(self):
self.fetch_flow_collection_end(FlowSequenceEndToken)
def fetch_flow_mapping_end(self):
self.fetch_flow_collection_end(FlowMappingEndToken)
def fetch_flow_collection_end(self, TokenClass):
# Reset possible simple key on the current level.
self.remove_possible_simple_key()
# Decrease the flow level.
self.flow_level -= 1
# No simple keys after ']' or '}'.
self.allow_simple_key = False
# Add FLOW-SEQUENCE-END or FLOW-MAPPING-END.
start_mark = self.get_mark()
self.forward()
end_mark = self.get_mark()
self.tokens.append(TokenClass(start_mark, end_mark))
def fetch_value(self):
# Do we determine a simple key?
if self.flow_level in self.possible_simple_keys:
# Add KEY.
key = self.possible_simple_keys[self.flow_level]
del self.possible_simple_keys[self.flow_level]
self.tokens.insert(key.token_number-self.tokens_taken,
KeyToken(key.mark, key.mark))
# There cannot be two simple keys one after another.
self.allow_simple_key = False
# Add VALUE.
start_mark = self.get_mark()
self.forward()
end_mark = self.get_mark()
self.tokens.append(ValueToken(start_mark, end_mark))
def fetch_flow_entry(self):
# Simple keys are allowed after ','.
self.allow_simple_key = True
# Reset possible simple key on the current level.
self.remove_possible_simple_key()
# Add FLOW-ENTRY.
start_mark = self.get_mark()
self.forward()
end_mark = self.get_mark()
self.tokens.append(FlowEntryToken(start_mark, end_mark))
def fetch_double(self):
# A flow scalar could be a simple key.
self.save_possible_simple_key()
# No simple keys after flow scalars.
self.allow_simple_key = False
# Scan and add SCALAR.
self.tokens.append(self.scan_flow_scalar())
def fetch_plain(self):
# No simple keys after plain scalars. But note that `scan_plain` will
# change this flag if the scan is finished at the beginning of the
# line.
self.allow_simple_key = False
# Scan and add SCALAR. May change `allow_simple_key`.
self.tokens.append(self.scan_plain())
# Checkers.
def check_plain(self):
return self.peek() in '0123456789-ntf'
# Scanners.
def scan_to_next_token(self):
while self.peek() in ' \t\n':
self.forward()
def scan_flow_scalar(self):
# See the specification for details.
# Note that we loose indentation rules for quoted scalars. Quoted
# scalars don't need to adhere indentation because " and ' clearly
# mark the beginning and the end of them. Therefore we are less
# restrictive then the specification requires. We only need to check
# that document separators are not included in scalars.
chunks = []
start_mark = self.get_mark()
quote = self.peek()
self.forward()
chunks.extend(self.scan_flow_scalar_non_spaces(start_mark))
while self.peek() != quote:
chunks.extend(self.scan_flow_scalar_spaces(start_mark))
chunks.extend(self.scan_flow_scalar_non_spaces(start_mark))
self.forward()
end_mark = self.get_mark()
return ScalarToken(''.join(chunks), False, start_mark, end_mark, '"')
ESCAPE_REPLACEMENTS = {
'b': '\x08',
't': '\x09',
'n': '\x0A',
'f': '\x0C',
'r': '\x0D',
'\"': '\"',
'\\': '\\',
}
ESCAPE_CODES = {
'u': 4,
}
def scan_flow_scalar_non_spaces(self, start_mark):
# See the specification for details.
chunks = []
while True:
length = 0
while self.peek(length) not in '\"\\\0 \t\n':
length += 1
if length:
chunks.append(self.prefix(length))
self.forward(length)
ch = self.peek()
if ch == '\\':
self.forward()
ch = self.peek()
if ch in self.ESCAPE_REPLACEMENTS:
chunks.append(self.ESCAPE_REPLACEMENTS[ch])
self.forward()
elif ch in self.ESCAPE_CODES:
length = self.ESCAPE_CODES[ch]
self.forward()
for k in range(length):
if self.peek(k) not in '0123456789ABCDEFabcdef':
raise ScannerError("while scanning a double-quoted scalar", start_mark,
"expected escape sequence of %d hexdecimal numbers, but found %r" %
(length, self.peek(k)), self.get_mark())
code = int(self.prefix(length), 16)
chunks.append(chr(code))
self.forward(length)
else:
raise ScannerError("while scanning a double-quoted scalar", start_mark,
"found unknown escape character %r" % ch, self.get_mark())
else:
return chunks
def scan_flow_scalar_spaces(self, start_mark):
# See the specification for details.
chunks = []
length = 0
while self.peek(length) in ' \t':
length += 1
whitespaces = self.prefix(length)
self.forward(length)
ch = self.peek()
if ch == '\0':
raise ScannerError("while scanning a quoted scalar", start_mark,
"found unexpected end of stream", self.get_mark())
elif ch == '\n':
raise ScannerError("while scanning a quoted scalar", start_mark,
"found unexpected line end", self.get_mark())
else:
chunks.append(whitespaces)
return chunks
def scan_plain(self):
chunks = []
start_mark = self.get_mark()
spaces = []
while True:
length = 0
while True:
ch = self.peek(length)
if self.peek(length) not in 'eE.0123456789nul-tr+fas':
break
length += 1
if length == 0:
break
self.allow_simple_key = False
chunks.extend(spaces)
chunks.append(self.prefix(length))
self.forward(length)
end_mark = self.get_mark()
return ScalarToken(''.join(chunks), True, start_mark, end_mark)

View File

@ -0,0 +1,57 @@
class Token(object):
def __init__(self, start_mark, end_mark):
self.start_mark = start_mark
self.end_mark = end_mark
def __repr__(self):
attributes = [key for key in self.__dict__
if not key.endswith('_mark')]
attributes.sort()
arguments = ', '.join(['%s=%r' % (key, getattr(self, key))
for key in attributes])
return '%s(%s)' % (self.__class__.__name__, arguments)
#class BOMToken(Token):
# id = '<byte order mark>'
class StreamStartToken(Token):
id = '<stream start>'
def __init__(self, start_mark=None, end_mark=None,
encoding=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.encoding = encoding
class StreamEndToken(Token):
id = '<stream end>'
class FlowSequenceStartToken(Token):
id = '['
class FlowMappingStartToken(Token):
id = '{'
class FlowSequenceEndToken(Token):
id = ']'
class FlowMappingEndToken(Token):
id = '}'
class KeyToken(Token):
id = '?'
class ValueToken(Token):
id = ':'
class FlowEntryToken(Token):
id = ','
class ScalarToken(Token):
id = '<scalar>'
def __init__(self, value, plain, start_mark, end_mark, style=None):
self.value = value
self.plain = plain
self.start_mark = start_mark
self.end_mark = end_mark
self.style = style