Some fixes for flake8

This commit is contained in:
ZyX 2013-03-10 23:12:07 +04:00
parent 3fd1215091
commit 5da848fa4a
14 changed files with 1466 additions and 1539 deletions

View File

@ -108,7 +108,7 @@ class Spec(object):
else:
proceed, echo, fhadproblem = item_func(item, data, context, echoerr)
if echo and fhadproblem:
echoerr(context=self.cmsg.format(key=context_key(context)+'/list item '+unicode(i)),
echoerr(context=self.cmsg.format(key=context_key(context) + '/list item ' + unicode(i)),
context_mark=value.mark,
problem=msg_func(item),
problem_mark=item.mark)
@ -141,7 +141,7 @@ class Spec(object):
def check_tuple(self, value, context_mark, data, context, echoerr, start, end):
hadproblem = False
for (i, item, spec) in zip(itertools.count(), value, self.specs[start:end]):
proceed, ihadproblem = spec.match(item, value.mark, data, context + (('tuple item '+unicode(i), item),), echoerr)
proceed, ihadproblem = spec.match(item, value.mark, data, context + (('tuple item ' + unicode(i), item),), echoerr)
if ihadproblem:
hadproblem = True
if not proceed:
@ -154,9 +154,9 @@ class Spec(object):
cmp_funcs = {
'le': lambda x, y: x <= y,
'lt': lambda x, y: x < y,
'lt': lambda x, y: x < y,
'ge': lambda x, y: x >= y,
'gt': lambda x, y: x > y,
'gt': lambda x, y: x > y,
'eq': lambda x, y: x == y,
}
@ -424,8 +424,8 @@ main_spec = (Spec(
.context_message('Error while loading extensions configuration (key {key})'),
).context_message('Error while loading main configuration'))
term_color_spec=Spec().unsigned().cmp('le', 255).copy
true_color_spec=Spec().re('^[0-9a-fA-F]{6}$',
term_color_spec = Spec().unsigned().cmp('le', 255).copy
true_color_spec = Spec().re('^[0-9a-fA-F]{6}$',
lambda value: '"{0}" is not a six-digit hexadecimal unsigned integer written as a string'.format(value)).copy
colors_spec = (Spec(
colors=Spec().unknown_spec(
@ -568,7 +568,7 @@ def check_full_segment_data(segment, data, context, echoerr):
names = [segment['name']]
if segment.get('type', 'function') == 'function':
module = segment.get('module', context[0][1].get('default_module', 'powerline.segments.'+ext))
module = segment.get('module', context[0][1].get('default_module', 'powerline.segments.' + ext))
names.insert(0, unicode(module) + '.' + unicode(names[0]))
segment_copy = segment.copy()
@ -592,7 +592,7 @@ def check_full_segment_data(segment, data, context, echoerr):
def check_segment_name(name, data, context, echoerr):
ext = data['ext']
if context[-2][1].get('type', 'function') == 'function':
module = context[-2][1].get('module', context[0][1].get('default_module', 'powerline.segments.'+ext))
module = context[-2][1].get('module', context[0][1].get('default_module', 'powerline.segments.' + ext))
with WithPath(data['import_paths']):
try:
func = getattr(__import__(unicode(module), fromlist=[unicode(name)]), unicode(name))
@ -616,9 +616,9 @@ def check_segment_name(name, data, context, echoerr):
D_H_G_USED_STR = 'Divider highlight group used: '
for line in func.__doc__.split('\n'):
if H_G_USED_STR in line:
hl_groups.append(line[line.index(H_G_USED_STR)+len(H_G_USED_STR):])
hl_groups.append(line[line.index(H_G_USED_STR) + len(H_G_USED_STR):])
elif D_H_G_USED_STR in line:
divider_hl_group = line[line.index(D_H_G_USED_STR)+len(D_H_G_USED_STR)+2:-3]
divider_hl_group = line[line.index(D_H_G_USED_STR) + len(D_H_G_USED_STR) + 2:-3]
hadproblem = False
@ -756,7 +756,7 @@ def check_segment_data_key(key, data, context, echoerr):
if 'name' in segment:
if key == segment['name']:
found = True
module = segment.get('module', theme.get('default_module', 'powerline.segments.'+ext))
module = segment.get('module', theme.get('default_module', 'powerline.segments.' + ext))
if key == unicode(module) + '.' + unicode(segment['name']):
found = True
if found:
@ -861,6 +861,7 @@ def check(path=None):
))
lhadproblem = [False]
def load_config(stream):
r, hadproblem = load(stream)
if hadproblem:

View File

@ -1,23 +1,17 @@
from .error import *
from .tokens import *
from .events import *
from .nodes import *
from .loader import *
__version__ = '3.10'
def load(stream, Loader=Loader):
"""
Parse the first YAML document in a stream
and produce the corresponding Python object.
"""
loader = Loader(stream)
try:
r = loader.get_single_data()
return r, loader.haserrors
finally:
loader.dispose()
from .loader import Loader
def load(stream, Loader=Loader):
"""
Parse the first YAML document in a stream
and produce the corresponding Python object.
"""
loader = Loader(stream)
try:
r = loader.get_single_data()
return r, loader.haserrors
finally:
loader.dispose()

View File

@ -1,119 +1,117 @@
__all__ = ['Composer', 'ComposerError']
from .error import MarkedYAMLError
from .events import *
from .nodes import *
from .events import * # NOQA
from .nodes import * # NOQA
class ComposerError(MarkedYAMLError):
pass
pass
class Composer:
def __init__(self):
pass
def __init__(self):
pass
def check_node(self):
# Drop the STREAM-START event.
if self.check_event(StreamStartEvent):
self.get_event()
def check_node(self):
# Drop the STREAM-START event.
if self.check_event(StreamStartEvent):
self.get_event()
# If there are more documents available?
return not self.check_event(StreamEndEvent)
# If there are more documents available?
return not self.check_event(StreamEndEvent)
def get_node(self):
# Get the root node of the next document.
if not self.check_event(StreamEndEvent):
return self.compose_document()
def get_node(self):
# Get the root node of the next document.
if not self.check_event(StreamEndEvent):
return self.compose_document()
def get_single_node(self):
# Drop the STREAM-START event.
self.get_event()
def get_single_node(self):
# Drop the STREAM-START event.
self.get_event()
# Compose a document if the stream is not empty.
document = None
if not self.check_event(StreamEndEvent):
document = self.compose_document()
# Compose a document if the stream is not empty.
document = None
if not self.check_event(StreamEndEvent):
document = self.compose_document()
# Ensure that the stream contains no more documents.
if not self.check_event(StreamEndEvent):
event = self.get_event()
raise ComposerError("expected a single document in the stream",
document.start_mark, "but found another document",
event.start_mark)
# Ensure that the stream contains no more documents.
if not self.check_event(StreamEndEvent):
event = self.get_event()
raise ComposerError("expected a single document in the stream",
document.start_mark, "but found another document",
event.start_mark)
# Drop the STREAM-END event.
self.get_event()
# Drop the STREAM-END event.
self.get_event()
return document
return document
def compose_document(self):
# Drop the DOCUMENT-START event.
self.get_event()
def compose_document(self):
# Drop the DOCUMENT-START event.
self.get_event()
# Compose the root node.
node = self.compose_node(None, None)
# Compose the root node.
node = self.compose_node(None, None)
# Drop the DOCUMENT-END event.
self.get_event()
# Drop the DOCUMENT-END event.
self.get_event()
return node
return node
def compose_node(self, parent, index):
self.descend_resolver(parent, index)
if self.check_event(ScalarEvent):
node = self.compose_scalar_node()
elif self.check_event(SequenceStartEvent):
node = self.compose_sequence_node()
elif self.check_event(MappingStartEvent):
node = self.compose_mapping_node()
self.ascend_resolver()
return node
def compose_node(self, parent, index):
event = self.peek_event()
self.descend_resolver(parent, index)
if self.check_event(ScalarEvent):
node = self.compose_scalar_node()
elif self.check_event(SequenceStartEvent):
node = self.compose_sequence_node()
elif self.check_event(MappingStartEvent):
node = self.compose_mapping_node()
self.ascend_resolver()
return node
def compose_scalar_node(self):
event = self.get_event()
tag = event.tag
if tag is None or tag == '!':
tag = self.resolve(ScalarNode, event.value, event.implicit, event.start_mark)
node = ScalarNode(tag, event.value,
event.start_mark, event.end_mark, style=event.style)
return node
def compose_scalar_node(self):
event = self.get_event()
tag = event.tag
if tag is None or tag == '!':
tag = self.resolve(ScalarNode, event.value, event.implicit, event.start_mark)
node = ScalarNode(tag, event.value,
event.start_mark, event.end_mark, style=event.style)
return node
def compose_sequence_node(self):
start_event = self.get_event()
tag = start_event.tag
if tag is None or tag == '!':
tag = self.resolve(SequenceNode, None, start_event.implicit)
node = SequenceNode(tag, [],
start_event.start_mark, None,
flow_style=start_event.flow_style)
index = 0
while not self.check_event(SequenceEndEvent):
node.value.append(self.compose_node(node, index))
index += 1
end_event = self.get_event()
node.end_mark = end_event.end_mark
return node
def compose_mapping_node(self):
start_event = self.get_event()
tag = start_event.tag
if tag is None or tag == '!':
tag = self.resolve(MappingNode, None, start_event.implicit)
node = MappingNode(tag, [],
start_event.start_mark, None,
flow_style=start_event.flow_style)
while not self.check_event(MappingEndEvent):
#key_event = self.peek_event()
item_key = self.compose_node(node, None)
#if item_key in node.value:
# raise ComposerError("while composing a mapping", start_event.start_mark,
# "found duplicate key", key_event.start_mark)
item_value = self.compose_node(node, item_key)
#node.value[item_key] = item_value
node.value.append((item_key, item_value))
end_event = self.get_event()
node.end_mark = end_event.end_mark
return node
def compose_sequence_node(self):
start_event = self.get_event()
tag = start_event.tag
if tag is None or tag == '!':
tag = self.resolve(SequenceNode, None, start_event.implicit)
node = SequenceNode(tag, [],
start_event.start_mark, None,
flow_style=start_event.flow_style)
index = 0
while not self.check_event(SequenceEndEvent):
node.value.append(self.compose_node(node, index))
index += 1
end_event = self.get_event()
node.end_mark = end_event.end_mark
return node
def compose_mapping_node(self):
start_event = self.get_event()
tag = start_event.tag
if tag is None or tag == '!':
tag = self.resolve(MappingNode, None, start_event.implicit)
node = MappingNode(tag, [],
start_event.start_mark, None,
flow_style=start_event.flow_style)
while not self.check_event(MappingEndEvent):
#key_event = self.peek_event()
item_key = self.compose_node(node, None)
#if item_key in node.value:
# raise ComposerError("while composing a mapping", start_event.start_mark,
# "found duplicate key", key_event.start_mark)
item_value = self.compose_node(node, item_key)
#node.value[item_key] = item_value
node.value.append((item_key, item_value))
end_event = self.get_event()
node.end_mark = end_event.end_mark
return node

View File

@ -1,324 +1,274 @@
__all__ = ['BaseConstructor', 'Constructor', 'ConstructorError']
from .error import *
from .nodes import *
from .markedvalue import *
from .error import MarkedYAMLError
from .nodes import * # NOQA
from .markedvalue import gen_marked_value
import collections, datetime, base64, binascii, re, sys, types
import collections
import types
from functools import wraps
try:
from __builtin__ import unicode
from __builtin__ import unicode
except ImportError:
unicode = str
unicode = str # NOQA
def marked(func):
@wraps(func)
def f(self, node, *args, **kwargs):
return gen_marked_value(func(self, node, *args, **kwargs), node.start_mark)
return f
@wraps(func)
def f(self, node, *args, **kwargs):
return gen_marked_value(func(self, node, *args, **kwargs), node.start_mark)
return f
class ConstructorError(MarkedYAMLError):
pass
pass
class BaseConstructor:
yaml_constructors = {}
yaml_constructors = {}
yaml_multi_constructors = {}
def __init__(self):
self.constructed_objects = {}
self.state_generators = []
self.deep_construct = False
def __init__(self):
self.constructed_objects = {}
self.state_generators = []
self.deep_construct = False
def check_data(self):
# If there are more documents available?
return self.check_node()
def check_data(self):
# If there are more documents available?
return self.check_node()
def get_data(self):
# Construct and return the next document.
if self.check_node():
return self.construct_document(self.get_node())
def get_data(self):
# Construct and return the next document.
if self.check_node():
return self.construct_document(self.get_node())
def get_single_data(self):
# Ensure that the stream contains a single document and construct it.
node = self.get_single_node()
if node is not None:
return self.construct_document(node)
return None
def get_single_data(self):
# Ensure that the stream contains a single document and construct it.
node = self.get_single_node()
if node is not None:
return self.construct_document(node)
return None
def construct_document(self, node):
data = self.construct_object(node)
while self.state_generators:
state_generators = self.state_generators
self.state_generators = []
for generator in state_generators:
for dummy in generator:
pass
self.constructed_objects = {}
self.deep_construct = False
return data
def construct_document(self, node):
data = self.construct_object(node)
while self.state_generators:
state_generators = self.state_generators
self.state_generators = []
for generator in state_generators:
for dummy in generator:
pass
self.constructed_objects = {}
self.deep_construct = False
return data
def construct_object(self, node, deep=False):
if node in self.constructed_objects:
return self.constructed_objects[node]
if deep:
old_deep = self.deep_construct
self.deep_construct = True
constructor = None
tag_suffix = None
if node.tag in self.yaml_constructors:
constructor = self.yaml_constructors[node.tag]
else:
raise ConstructorError(None, None, 'no constructor for tag %s' % node.tag)
if tag_suffix is None:
data = constructor(self, node)
else:
data = constructor(self, tag_suffix, node)
if isinstance(data, types.GeneratorType):
generator = data
data = next(generator)
if self.deep_construct:
for dummy in generator:
pass
else:
self.state_generators.append(generator)
self.constructed_objects[node] = data
if deep:
self.deep_construct = old_deep
return data
def construct_object(self, node, deep=False):
if node in self.constructed_objects:
return self.constructed_objects[node]
if deep:
old_deep = self.deep_construct
self.deep_construct = True
constructor = None
tag_suffix = None
if node.tag in self.yaml_constructors:
constructor = self.yaml_constructors[node.tag]
else:
for tag_prefix in self.yaml_multi_constructors:
if node.tag.startswith(tag_prefix):
tag_suffix = node.tag[len(tag_prefix):]
constructor = self.yaml_multi_constructors[tag_prefix]
break
else:
if None in self.yaml_multi_constructors:
tag_suffix = node.tag
constructor = self.yaml_multi_constructors[None]
elif None in self.yaml_constructors:
constructor = self.yaml_constructors[None]
elif isinstance(node, ScalarNode):
constructor = self.__class__.construct_scalar
elif isinstance(node, SequenceNode):
constructor = self.__class__.construct_sequence
elif isinstance(node, MappingNode):
constructor = self.__class__.construct_mapping
if tag_suffix is None:
data = constructor(self, node)
else:
data = constructor(self, tag_suffix, node)
if isinstance(data, types.GeneratorType):
generator = data
data = next(generator)
if self.deep_construct:
for dummy in generator:
pass
else:
self.state_generators.append(generator)
self.constructed_objects[node] = data
if deep:
self.deep_construct = old_deep
return data
@marked
def construct_scalar(self, node):
if not isinstance(node, ScalarNode):
raise ConstructorError(None, None,
"expected a scalar node, but found %s" % node.id,
node.start_mark)
return node.value
@marked
def construct_scalar(self, node):
if not isinstance(node, ScalarNode):
raise ConstructorError(None, None,
"expected a scalar node, but found %s" % node.id,
node.start_mark)
return node.value
def construct_sequence(self, node, deep=False):
if not isinstance(node, SequenceNode):
raise ConstructorError(None, None,
"expected a sequence node, but found %s" % node.id,
node.start_mark)
return [self.construct_object(child, deep=deep)
for child in node.value]
def construct_sequence(self, node, deep=False):
if not isinstance(node, SequenceNode):
raise ConstructorError(None, None,
"expected a sequence node, but found %s" % node.id,
node.start_mark)
return [self.construct_object(child, deep=deep)
for child in node.value]
@marked
def construct_mapping(self, node, deep=False):
if not isinstance(node, MappingNode):
raise ConstructorError(None, None,
"expected a mapping node, but found %s" % node.id,
node.start_mark)
mapping = {}
for key_node, value_node in node.value:
key = self.construct_object(key_node, deep=deep)
if not isinstance(key, collections.Hashable):
self.echoerr('While constructing a mapping', node.start_mark,
'found unhashable key', key_node.start_mark)
continue
elif type(key.value) != unicode:
self.echoerr('Error while constructing a mapping', node.start_mark,
'found key that is not a string', key_node.start_mark)
continue
elif key in mapping:
self.echoerr('Error while constructing a mapping', node.start_mark,
'found duplicate key', key_node.start_mark)
continue
value = self.construct_object(value_node, deep=deep)
mapping[key] = value
return mapping
@marked
def construct_mapping(self, node, deep=False):
if not isinstance(node, MappingNode):
raise ConstructorError(None, None,
"expected a mapping node, but found %s" % node.id,
node.start_mark)
mapping = {}
for key_node, value_node in node.value:
key = self.construct_object(key_node, deep=deep)
if not isinstance(key, collections.Hashable):
self.echoerr('While constructing a mapping', node.start_mark,
'found unhashable key', key_node.start_mark)
continue
elif type(key.value) != unicode:
self.echoerr('Error while constructing a mapping', node.start_mark,
'found key that is not a string', key_node.start_mark)
continue
elif key in mapping:
self.echoerr('Error while constructing a mapping', node.start_mark,
'found duplicate key', key_node.start_mark)
continue
value = self.construct_object(value_node, deep=deep)
mapping[key] = value
return mapping
@classmethod
def add_constructor(cls, tag, constructor):
if not 'yaml_constructors' in cls.__dict__:
cls.yaml_constructors = cls.yaml_constructors.copy()
cls.yaml_constructors[tag] = constructor
@classmethod
def add_constructor(cls, tag, constructor):
if not 'yaml_constructors' in cls.__dict__:
cls.yaml_constructors = cls.yaml_constructors.copy()
cls.yaml_constructors[tag] = constructor
class Constructor(BaseConstructor):
def construct_scalar(self, node):
if isinstance(node, MappingNode):
for key_node, value_node in node.value:
if key_node.tag == 'tag:yaml.org,2002:value':
return self.construct_scalar(value_node)
return BaseConstructor.construct_scalar(self, node)
def construct_scalar(self, node):
if isinstance(node, MappingNode):
for key_node, value_node in node.value:
if key_node.tag == 'tag:yaml.org,2002:value':
return self.construct_scalar(value_node)
return BaseConstructor.construct_scalar(self, node)
def flatten_mapping(self, node):
merge = []
index = 0
while index < len(node.value):
key_node, value_node = node.value[index]
if key_node.tag == 'tag:yaml.org,2002:merge':
del node.value[index]
if isinstance(value_node, MappingNode):
self.flatten_mapping(value_node)
merge.extend(value_node.value)
elif isinstance(value_node, SequenceNode):
submerge = []
for subnode in value_node.value:
if not isinstance(subnode, MappingNode):
raise ConstructorError("while constructing a mapping",
node.start_mark,
"expected a mapping for merging, but found %s"
% subnode.id, subnode.start_mark)
self.flatten_mapping(subnode)
submerge.append(subnode.value)
submerge.reverse()
for value in submerge:
merge.extend(value)
else:
raise ConstructorError("while constructing a mapping", node.start_mark,
"expected a mapping or list of mappings for merging, but found %s"
% value_node.id, value_node.start_mark)
elif key_node.tag == 'tag:yaml.org,2002:value':
key_node.tag = 'tag:yaml.org,2002:str'
index += 1
else:
index += 1
if merge:
node.value = merge + node.value
def flatten_mapping(self, node):
merge = []
index = 0
while index < len(node.value):
key_node, value_node = node.value[index]
if key_node.tag == 'tag:yaml.org,2002:merge':
del node.value[index]
if isinstance(value_node, MappingNode):
self.flatten_mapping(value_node)
merge.extend(value_node.value)
elif isinstance(value_node, SequenceNode):
submerge = []
for subnode in value_node.value:
if not isinstance(subnode, MappingNode):
raise ConstructorError("while constructing a mapping",
node.start_mark,
"expected a mapping for merging, but found %s"
% subnode.id, subnode.start_mark)
self.flatten_mapping(subnode)
submerge.append(subnode.value)
submerge.reverse()
for value in submerge:
merge.extend(value)
else:
raise ConstructorError("while constructing a mapping", node.start_mark,
"expected a mapping or list of mappings for merging, but found %s"
% value_node.id, value_node.start_mark)
elif key_node.tag == 'tag:yaml.org,2002:value':
key_node.tag = 'tag:yaml.org,2002:str'
index += 1
else:
index += 1
if merge:
node.value = merge + node.value
def construct_mapping(self, node, deep=False):
if isinstance(node, MappingNode):
self.flatten_mapping(node)
return BaseConstructor.construct_mapping(self, node, deep=deep)
def construct_mapping(self, node, deep=False):
if isinstance(node, MappingNode):
self.flatten_mapping(node)
return BaseConstructor.construct_mapping(self, node, deep=deep)
@marked
def construct_yaml_null(self, node):
self.construct_scalar(node)
return None
@marked
def construct_yaml_null(self, node):
self.construct_scalar(node)
return None
@marked
def construct_yaml_bool(self, node):
value = self.construct_scalar(node).value
return bool(value)
bool_values = {
'yes': True,
'no': False,
'true': True,
'false': False,
'on': True,
'off': False,
}
@marked
def construct_yaml_int(self, node):
value = self.construct_scalar(node).value
sign = +1
if value[0] == '-':
sign = -1
if value[0] in '+-':
value = value[1:]
if value == '0':
return 0
else:
return sign * int(value)
@marked
def construct_yaml_bool(self, node):
value = self.construct_scalar(node)
return self.bool_values[value.lower()]
@marked
def construct_yaml_float(self, node):
value = self.construct_scalar(node).value
sign = +1
if value[0] == '-':
sign = -1
if value[0] in '+-':
value = value[1:]
else:
return sign * float(value)
@marked
def construct_yaml_int(self, node):
value = self.construct_scalar(node)
value = value.replace('_', '')
sign = +1
if value[0] == '-':
sign = -1
if value[0] in '+-':
value = value[1:]
if value == '0':
return 0
elif value.startswith('0b'):
return sign*int(value[2:], 2)
elif value.startswith('0x'):
return sign*int(value[2:], 16)
elif value[0] == '0':
return sign*int(value, 8)
elif ':' in value:
digits = [int(part) for part in value.split(':')]
digits.reverse()
base = 1
value = 0
for digit in digits:
value += digit*base
base *= 60
return sign*value
else:
return sign*int(value)
def construct_yaml_str(self, node):
return self.construct_scalar(node)
@marked
def construct_yaml_float(self, node):
value = self.construct_scalar(node)
value = value.replace('_', '').lower()
sign = +1
if value[0] == '-':
sign = -1
if value[0] in '+-':
value = value[1:]
elif ':' in value:
digits = [float(part) for part in value.split(':')]
digits.reverse()
base = 1
value = 0.0
for digit in digits:
value += digit*base
base *= 60
return sign*value
else:
return sign*float(value)
def construct_yaml_seq(self, node):
data = gen_marked_value([], node.start_mark)
yield data
data.extend(self.construct_sequence(node))
def construct_yaml_str(self, node):
return self.construct_scalar(node)
def construct_yaml_map(self, node):
data = gen_marked_value({}, node.start_mark)
yield data
value = self.construct_mapping(node)
data.update(value)
def construct_yaml_seq(self, node):
data = gen_marked_value([], node.start_mark)
yield data
data.extend(self.construct_sequence(node))
def construct_undefined(self, node):
raise ConstructorError(None, None,
"could not determine a constructor for the tag %r" % node.tag,
node.start_mark)
def construct_yaml_map(self, node):
data = gen_marked_value({}, node.start_mark)
yield data
value = self.construct_mapping(node)
data.update(value)
def construct_undefined(self, node):
raise ConstructorError(None, None,
"could not determine a constructor for the tag %r" % node.tag,
node.start_mark)
Constructor.add_constructor(
'tag:yaml.org,2002:null',
Constructor.construct_yaml_null)
'tag:yaml.org,2002:null',
Constructor.construct_yaml_null)
Constructor.add_constructor(
'tag:yaml.org,2002:bool',
Constructor.construct_yaml_bool)
'tag:yaml.org,2002:bool',
Constructor.construct_yaml_bool)
Constructor.add_constructor(
'tag:yaml.org,2002:int',
Constructor.construct_yaml_int)
'tag:yaml.org,2002:int',
Constructor.construct_yaml_int)
Constructor.add_constructor(
'tag:yaml.org,2002:float',
Constructor.construct_yaml_float)
'tag:yaml.org,2002:float',
Constructor.construct_yaml_float)
Constructor.add_constructor(
'tag:yaml.org,2002:str',
Constructor.construct_yaml_str)
'tag:yaml.org,2002:str',
Constructor.construct_yaml_str)
Constructor.add_constructor(
'tag:yaml.org,2002:seq',
Constructor.construct_yaml_seq)
'tag:yaml.org,2002:seq',
Constructor.construct_yaml_seq)
Constructor.add_constructor(
'tag:yaml.org,2002:map',
Constructor.construct_yaml_map)
'tag:yaml.org,2002:map',
Constructor.construct_yaml_map)
Constructor.add_constructor(None,
Constructor.construct_undefined)
Constructor.construct_undefined)

View File

@ -1,4 +1,3 @@
__all__ = ['Mark', 'YAMLError', 'MarkedYAMLError', 'echoerr']
@ -6,82 +5,84 @@ import sys
def strtrans(s):
return s.replace('\t', '>---')
return s.replace('\t', '>---')
class Mark:
def __init__(self, name, index, line, column, buffer, pointer):
self.name = name
self.index = index
self.line = line
self.column = column
self.buffer = buffer
self.pointer = pointer
def __init__(self, name, index, line, column, buffer, pointer):
self.name = name
self.index = index
self.line = line
self.column = column
self.buffer = buffer
self.pointer = pointer
def get_snippet(self, indent=4, max_length=75):
if self.buffer is None:
return None
head = ''
start = self.pointer
while start > 0 and self.buffer[start - 1] not in '\0\n':
start -= 1
if self.pointer - start > max_length / 2 - 1:
head = ' ... '
start += 5
break
tail = ''
end = self.pointer
while end < len(self.buffer) and self.buffer[end] not in '\0\n':
end += 1
if end - self.pointer > max_length / 2 - 1:
tail = ' ... '
end -= 5
break
snippet = [self.buffer[start:self.pointer], self.buffer[self.pointer], self.buffer[self.pointer + 1:end]]
snippet = [strtrans(s) for s in snippet]
return ' ' * indent + head + ''.join(snippet) + tail + '\n' \
+ ' ' * (indent + len(head) + len(snippet[0])) + '^'
def get_snippet(self, indent=4, max_length=75):
if self.buffer is None:
return None
head = ''
start = self.pointer
while start > 0 and self.buffer[start-1] not in '\0\n':
start -= 1
if self.pointer-start > max_length/2-1:
head = ' ... '
start += 5
break
tail = ''
end = self.pointer
while end < len(self.buffer) and self.buffer[end] not in '\0\n':
end += 1
if end-self.pointer > max_length/2-1:
tail = ' ... '
end -= 5
break
snippet = [self.buffer[start:self.pointer], self.buffer[self.pointer], self.buffer[self.pointer+1:end]]
snippet = [strtrans(s) for s in snippet]
return ' ' * indent + head + ''.join(snippet) + tail + '\n' \
+ ' ' * (indent + len(head) + len(snippet[0])) + '^'
def __str__(self):
snippet = self.get_snippet()
where = " in \"%s\", line %d, column %d" \
% (self.name, self.line + 1, self.column + 1)
if snippet is not None:
where += ":\n" + snippet
if type(where) is str:
return where
else:
return where.encode('utf-8')
def __str__(self):
snippet = self.get_snippet()
where = " in \"%s\", line %d, column %d" \
% (self.name, self.line+1, self.column+1)
if snippet is not None:
where += ":\n" + snippet
if type(where) is str:
return where
else:
return where.encode('utf-8')
class YAMLError(Exception):
pass
pass
def echoerr(*args, **kwargs):
sys.stderr.write('\n')
sys.stderr.write(format_error(*args, **kwargs) + '\n')
sys.stderr.write('\n')
sys.stderr.write(format_error(*args, **kwargs) + '\n')
def format_error(context=None, context_mark=None, problem=None, problem_mark=None, note=None):
lines = []
if context is not None:
lines.append(context)
if context_mark is not None \
and (problem is None or problem_mark is None
or context_mark.name != problem_mark.name
or context_mark.line != problem_mark.line
or context_mark.column != problem_mark.column):
lines.append(str(context_mark))
if problem is not None:
lines.append(problem)
if problem_mark is not None:
lines.append(str(problem_mark))
if note is not None:
lines.append(note)
return '\n'.join(lines)
lines = []
if context is not None:
lines.append(context)
if context_mark is not None \
and (problem is None or problem_mark is None
or context_mark.name != problem_mark.name
or context_mark.line != problem_mark.line
or context_mark.column != problem_mark.column):
lines.append(str(context_mark))
if problem is not None:
lines.append(problem)
if problem_mark is not None:
lines.append(str(problem_mark))
if note is not None:
lines.append(note)
return '\n'.join(lines)
class MarkedYAMLError(YAMLError):
def __init__(self, context=None, context_mark=None,
problem=None, problem_mark=None, note=None):
YAMLError.__init__(self, format_error(context, context_mark, problem,
problem_mark, note))
def __init__(self, context=None, context_mark=None,
problem=None, problem_mark=None, note=None):
YAMLError.__init__(self, format_error(context, context_mark, problem,
problem_mark, note))

View File

@ -1,83 +1,97 @@
# Abstract classes.
class Event(object):
def __init__(self, start_mark=None, end_mark=None):
self.start_mark = start_mark
self.end_mark = end_mark
def __repr__(self):
attributes = [key for key in ['implicit', 'value']
if hasattr(self, key)]
arguments = ', '.join(['%s=%r' % (key, getattr(self, key))
for key in attributes])
return '%s(%s)' % (self.__class__.__name__, arguments)
def __init__(self, start_mark=None, end_mark=None):
self.start_mark = start_mark
self.end_mark = end_mark
def __repr__(self):
attributes = [key for key in ['implicit', 'value']
if hasattr(self, key)]
arguments = ', '.join(['%s=%r' % (key, getattr(self, key))
for key in attributes])
return '%s(%s)' % (self.__class__.__name__, arguments)
class NodeEvent(Event):
def __init__(self, start_mark=None, end_mark=None):
self.start_mark = start_mark
self.end_mark = end_mark
def __init__(self, start_mark=None, end_mark=None):
self.start_mark = start_mark
self.end_mark = end_mark
class CollectionStartEvent(NodeEvent):
def __init__(self, implicit, start_mark=None, end_mark=None,
flow_style=None):
self.tag = None
self.implicit = implicit
self.start_mark = start_mark
self.end_mark = end_mark
self.flow_style = flow_style
def __init__(self, implicit, start_mark=None, end_mark=None,
flow_style=None):
self.tag = None
self.implicit = implicit
self.start_mark = start_mark
self.end_mark = end_mark
self.flow_style = flow_style
class CollectionEndEvent(Event):
pass
pass
# Implementations.
class StreamStartEvent(Event):
def __init__(self, start_mark=None, end_mark=None, encoding=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.encoding = encoding
def __init__(self, start_mark=None, end_mark=None, encoding=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.encoding = encoding
class StreamEndEvent(Event):
pass
pass
class DocumentStartEvent(Event):
def __init__(self, start_mark=None, end_mark=None,
explicit=None, version=None, tags=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.explicit = explicit
self.version = version
self.tags = tags
def __init__(self, start_mark=None, end_mark=None,
explicit=None, version=None, tags=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.explicit = explicit
self.version = version
self.tags = tags
class DocumentEndEvent(Event):
def __init__(self, start_mark=None, end_mark=None,
explicit=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.explicit = explicit
def __init__(self, start_mark=None, end_mark=None,
explicit=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.explicit = explicit
class AliasEvent(NodeEvent):
pass
pass
class ScalarEvent(NodeEvent):
def __init__(self, implicit, value,
start_mark=None, end_mark=None, style=None):
self.tag = None
self.implicit = implicit
self.value = value
self.start_mark = start_mark
self.end_mark = end_mark
self.style = style
def __init__(self, implicit, value,
start_mark=None, end_mark=None, style=None):
self.tag = None
self.implicit = implicit
self.value = value
self.start_mark = start_mark
self.end_mark = end_mark
self.style = style
class SequenceStartEvent(CollectionStartEvent):
pass
pass
class SequenceEndEvent(CollectionEndEvent):
pass
pass
class MappingStartEvent(CollectionStartEvent):
pass
pass
class MappingEndEvent(CollectionEndEvent):
pass
pass

View File

@ -1,26 +1,24 @@
__all__ = ['Loader']
from .reader import *
from .scanner import *
from .parser import *
from .composer import *
from .constructor import *
from .resolver import *
from .reader import Reader
from .scanner import Scanner
from .parser import Parser
from .composer import Composer
from .constructor import Constructor
from .resolver import Resolver
from .error import echoerr
class Loader(Reader, Scanner, Parser, Composer, Constructor, Resolver):
def __init__(self, stream):
Reader.__init__(self, stream)
Scanner.__init__(self)
Parser.__init__(self)
Composer.__init__(self)
Constructor.__init__(self)
Resolver.__init__(self)
self.haserrors = False
def __init__(self, stream):
Reader.__init__(self, stream)
Scanner.__init__(self)
Parser.__init__(self)
Composer.__init__(self)
Constructor.__init__(self)
Resolver.__init__(self)
self.haserrors = False
def echoerr(self, *args, **kwargs):
echoerr(*args, **kwargs)
self.haserrors = True
def echoerr(self, *args, **kwargs):
echoerr(*args, **kwargs)
self.haserrors = True

View File

@ -1,26 +1,29 @@
__all__ = ['gen_marked_value', 'MarkedValue']
class MarkedValue:
def __init__(self, value, mark):
self.mark = mark
self.value = value
def __init__(self, value, mark):
self.mark = mark
self.value = value
classcache = {}
def gen_marked_value(value, mark):
if value.__class__ in classcache:
Marked = classcache[value.__class__]
else:
class Marked(MarkedValue):
for func in value.__class__.__dict__:
if func not in set(('__init__', '__new__', '__getattribute__')):
if func in set(('__eq__',)):
# HACK to make marked dictionaries always work
exec (('def {0}(self, *args):\n'
' return self.value.{0}(*[arg.value if isinstance(arg, MarkedValue) else arg for arg in args])').format(func))
else:
exec (('def {0}(self, *args, **kwargs):\n'
' return self.value.{0}(*args, **kwargs)\n').format(func))
classcache[value.__class__] = Marked
return Marked(value, mark)
def gen_marked_value(value, mark):
if value.__class__ in classcache:
Marked = classcache[value.__class__]
else:
class Marked(MarkedValue):
for func in value.__class__.__dict__:
if func not in set(('__init__', '__new__', '__getattribute__')):
if func in set(('__eq__',)):
# HACK to make marked dictionaries always work
exec (('def {0}(self, *args):\n'
' return self.value.{0}(*[arg.value if isinstance(arg, MarkedValue) else arg for arg in args])').format(func))
else:
exec (('def {0}(self, *args, **kwargs):\n'
' return self.value.{0}(*args, **kwargs)\n').format(func))
classcache[value.__class__] = Marked
return Marked(value, mark)

View File

@ -1,49 +1,53 @@
class Node(object):
def __init__(self, tag, value, start_mark, end_mark):
self.tag = tag
self.value = value
self.start_mark = start_mark
self.end_mark = end_mark
def __repr__(self):
value = self.value
#if isinstance(value, list):
# if len(value) == 0:
# value = '<empty>'
# elif len(value) == 1:
# value = '<1 item>'
# else:
# value = '<%d items>' % len(value)
#else:
# if len(value) > 75:
# value = repr(value[:70]+u' ... ')
# else:
# value = repr(value)
value = repr(value)
return '%s(tag=%r, value=%s)' % (self.__class__.__name__, self.tag, value)
def __init__(self, tag, value, start_mark, end_mark):
self.tag = tag
self.value = value
self.start_mark = start_mark
self.end_mark = end_mark
def __repr__(self):
value = self.value
#if isinstance(value, list):
# if len(value) == 0:
# value = '<empty>'
# elif len(value) == 1:
# value = '<1 item>'
# else:
# value = '<%d items>' % len(value)
#else:
# if len(value) > 75:
# value = repr(value[:70]+u' ... ')
# else:
# value = repr(value)
value = repr(value)
return '%s(tag=%r, value=%s)' % (self.__class__.__name__, self.tag, value)
class ScalarNode(Node):
id = 'scalar'
def __init__(self, tag, value,
start_mark=None, end_mark=None, style=None):
self.tag = tag
self.value = value
self.start_mark = start_mark
self.end_mark = end_mark
self.style = style
id = 'scalar'
def __init__(self, tag, value,
start_mark=None, end_mark=None, style=None):
self.tag = tag
self.value = value
self.start_mark = start_mark
self.end_mark = end_mark
self.style = style
class CollectionNode(Node):
def __init__(self, tag, value,
start_mark=None, end_mark=None, flow_style=None):
self.tag = tag
self.value = value
self.start_mark = start_mark
self.end_mark = end_mark
self.flow_style = flow_style
def __init__(self, tag, value,
start_mark=None, end_mark=None, flow_style=None):
self.tag = tag
self.value = value
self.start_mark = start_mark
self.end_mark = end_mark
self.flow_style = flow_style
class SequenceNode(CollectionNode):
id = 'sequence'
id = 'sequence'
class MappingNode(CollectionNode):
id = 'mapping'
id = 'mapping'

View File

@ -1,255 +1,240 @@
__all__ = ['Parser', 'ParserError']
from .error import MarkedYAMLError
from .tokens import *
from .events import *
from .scanner import *
from .tokens import * # NOQA
from .events import * # NOQA
class ParserError(MarkedYAMLError):
pass
pass
class Parser:
# Since writing a recursive-descendant parser is a straightforward task, we
# do not give many comments here.
def __init__(self):
self.current_event = None
self.yaml_version = None
self.states = []
self.marks = []
self.state = self.parse_stream_start
DEFAULT_TAGS = {
'!': '!',
'!!': 'tag:yaml.org,2002:',
}
def dispose(self):
# Reset the state attributes (to clear self-references)
self.states = []
self.state = None
def __init__(self):
self.current_event = None
self.yaml_version = None
self.tag_handles = {}
self.states = []
self.marks = []
self.state = self.parse_stream_start
def check_event(self, *choices):
# Check the type of the next event.
if self.current_event is None:
if self.state:
self.current_event = self.state()
if self.current_event is not None:
if not choices:
return True
for choice in choices:
if isinstance(self.current_event, choice):
return True
return False
def dispose(self):
# Reset the state attributes (to clear self-references)
self.states = []
self.state = None
def peek_event(self):
# Get the next event.
if self.current_event is None:
if self.state:
self.current_event = self.state()
return self.current_event
def check_event(self, *choices):
# Check the type of the next event.
if self.current_event is None:
if self.state:
self.current_event = self.state()
if self.current_event is not None:
if not choices:
return True
for choice in choices:
if isinstance(self.current_event, choice):
return True
return False
def get_event(self):
# Get the next event and proceed further.
if self.current_event is None:
if self.state:
self.current_event = self.state()
value = self.current_event
self.current_event = None
return value
def peek_event(self):
# Get the next event.
if self.current_event is None:
if self.state:
self.current_event = self.state()
return self.current_event
# stream ::= STREAM-START implicit_document? explicit_document* STREAM-END
# implicit_document ::= block_node DOCUMENT-END*
# explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END*
def get_event(self):
# Get the next event and proceed further.
if self.current_event is None:
if self.state:
self.current_event = self.state()
value = self.current_event
self.current_event = None
return value
def parse_stream_start(self):
# Parse the stream start.
token = self.get_token()
event = StreamStartEvent(token.start_mark, token.end_mark,
encoding=token.encoding)
# stream ::= STREAM-START implicit_document? explicit_document* STREAM-END
# implicit_document ::= block_node DOCUMENT-END*
# explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END*
# Prepare the next state.
self.state = self.parse_implicit_document_start
def parse_stream_start(self):
return event
# Parse the stream start.
token = self.get_token()
event = StreamStartEvent(token.start_mark, token.end_mark,
encoding=token.encoding)
def parse_implicit_document_start(self):
# Parse an implicit document.
if not self.check_token(StreamEndToken):
token = self.peek_token()
start_mark = end_mark = token.start_mark
event = DocumentStartEvent(start_mark, end_mark, explicit=False)
# Prepare the next state.
self.state = self.parse_implicit_document_start
# Prepare the next state.
self.states.append(self.parse_document_end)
self.state = self.parse_node
return event
return event
def parse_implicit_document_start(self):
else:
return self.parse_document_start()
# Parse an implicit document.
if not self.check_token(StreamEndToken):
self.tag_handles = self.DEFAULT_TAGS
token = self.peek_token()
start_mark = end_mark = token.start_mark
event = DocumentStartEvent(start_mark, end_mark, explicit=False)
def parse_document_start(self):
# Parse an explicit document.
if not self.check_token(StreamEndToken):
token = self.peek_token()
self.echoerr(None, None,
"expected '<stream end>', but found %r" % token.id,
token.start_mark)
return StreamEndEvent(token.start_mark, token.end_mark)
else:
# Parse the end of the stream.
token = self.get_token()
event = StreamEndEvent(token.start_mark, token.end_mark)
assert not self.states
assert not self.marks
self.state = None
return event
# Prepare the next state.
self.states.append(self.parse_document_end)
self.state = self.parse_node
def parse_document_end(self):
# Parse the document end.
token = self.peek_token()
start_mark = end_mark = token.start_mark
explicit = False
event = DocumentEndEvent(start_mark, end_mark, explicit=explicit)
return event
# Prepare the next state.
self.state = self.parse_document_start
else:
return self.parse_document_start()
return event
def parse_document_start(self):
def parse_document_content(self):
return self.parse_node()
# Parse an explicit document.
if not self.check_token(StreamEndToken):
token = self.peek_token()
start_mark = token.start_mark
self.echoerr(None, None,
"expected '<stream end>', but found %r"
% self.peek_token().id,
self.peek_token().start_mark)
return StreamEndEvent(token.start_mark, token.end_mark)
else:
# Parse the end of the stream.
token = self.get_token()
event = StreamEndEvent(token.start_mark, token.end_mark)
assert not self.states
assert not self.marks
self.state = None
return event
def parse_node(self, indentless_sequence=False):
start_mark = end_mark = None
if start_mark is None:
start_mark = end_mark = self.peek_token().start_mark
event = None
implicit = True
if self.check_token(ScalarToken):
token = self.get_token()
end_mark = token.end_mark
if token.plain:
implicit = (True, False)
else:
implicit = (False, True)
event = ScalarEvent(implicit, token.value,
start_mark, end_mark, style=token.style)
self.state = self.states.pop()
elif self.check_token(FlowSequenceStartToken):
end_mark = self.peek_token().end_mark
event = SequenceStartEvent(implicit,
start_mark, end_mark, flow_style=True)
self.state = self.parse_flow_sequence_first_entry
elif self.check_token(FlowMappingStartToken):
end_mark = self.peek_token().end_mark
event = MappingStartEvent(implicit,
start_mark, end_mark, flow_style=True)
self.state = self.parse_flow_mapping_first_key
else:
token = self.peek_token()
raise ParserError("while parsing a flow node", start_mark,
"expected the node content, but found %r" % token.id,
token.start_mark)
return event
def parse_document_end(self):
def parse_flow_sequence_first_entry(self):
token = self.get_token()
self.marks.append(token.start_mark)
return self.parse_flow_sequence_entry(first=True)
# Parse the document end.
token = self.peek_token()
start_mark = end_mark = token.start_mark
explicit = False
event = DocumentEndEvent(start_mark, end_mark, explicit=explicit)
def parse_flow_sequence_entry(self, first=False):
if not self.check_token(FlowSequenceEndToken):
if not first:
if self.check_token(FlowEntryToken):
self.get_token()
if self.check_token(FlowSequenceEndToken):
token = self.peek_token()
self.echoerr("While parsing a flow sequence", self.marks[-1],
"expected sequence value, but got %r" % token.id, token.start_mark)
else:
token = self.peek_token()
raise ParserError("while parsing a flow sequence", self.marks[-1],
"expected ',' or ']', but got %r" % token.id, token.start_mark)
# Prepare the next state.
self.state = self.parse_document_start
if not self.check_token(FlowSequenceEndToken):
self.states.append(self.parse_flow_sequence_entry)
return self.parse_node()
token = self.get_token()
event = SequenceEndEvent(token.start_mark, token.end_mark)
self.state = self.states.pop()
self.marks.pop()
return event
return event
def parse_flow_sequence_entry_mapping_end(self):
self.state = self.parse_flow_sequence_entry
token = self.peek_token()
return MappingEndEvent(token.start_mark, token.start_mark)
def parse_document_content(self):
return self.parse_node()
def parse_flow_mapping_first_key(self):
token = self.get_token()
self.marks.append(token.start_mark)
return self.parse_flow_mapping_key(first=True)
def parse_node(self, indentless_sequence=False):
start_mark = end_mark = tag_mark = None
if start_mark is None:
start_mark = end_mark = self.peek_token().start_mark
event = None
implicit = True
if self.check_token(ScalarToken):
token = self.get_token()
end_mark = token.end_mark
if token.plain:
implicit = (True, False)
else:
implicit = (False, True)
event = ScalarEvent(implicit, token.value,
start_mark, end_mark, style=token.style)
self.state = self.states.pop()
elif self.check_token(FlowSequenceStartToken):
end_mark = self.peek_token().end_mark
event = SequenceStartEvent(implicit,
start_mark, end_mark, flow_style=True)
self.state = self.parse_flow_sequence_first_entry
elif self.check_token(FlowMappingStartToken):
end_mark = self.peek_token().end_mark
event = MappingStartEvent(implicit,
start_mark, end_mark, flow_style=True)
self.state = self.parse_flow_mapping_first_key
else:
token = self.peek_token()
raise ParserError("while parsing a flow node", start_mark,
"expected the node content, but found %r" % token.id,
token.start_mark)
return event
def parse_flow_mapping_key(self, first=False):
if not self.check_token(FlowMappingEndToken):
if not first:
if self.check_token(FlowEntryToken):
self.get_token()
if self.check_token(FlowMappingEndToken):
token = self.peek_token()
self.echoerr("While parsing a flow mapping", self.marks[-1],
"expected mapping key, but got %r" % token.id, token.start_mark)
else:
token = self.peek_token()
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected ',' or '}', but got %r" % token.id, token.start_mark)
if self.check_token(KeyToken):
token = self.get_token()
if not self.check_token(ValueToken,
FlowEntryToken, FlowMappingEndToken):
self.states.append(self.parse_flow_mapping_value)
return self.parse_node()
else:
token = self.peek_token()
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected value, but got %r" % token.id, token.start_mark)
elif not self.check_token(FlowMappingEndToken):
token = self.peek_token()
expect_key = self.check_token(ValueToken, FlowEntryToken)
if not expect_key:
self.get_token()
expect_key = self.check_token(ValueToken)
def parse_flow_sequence_first_entry(self):
token = self.get_token()
self.marks.append(token.start_mark)
return self.parse_flow_sequence_entry(first=True)
if expect_key:
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected string key, but got %r" % token.id, token.start_mark)
else:
token = self.peek_token()
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected ':', but got %r" % token.id, token.start_mark)
token = self.get_token()
event = MappingEndEvent(token.start_mark, token.end_mark)
self.state = self.states.pop()
self.marks.pop()
return event
def parse_flow_sequence_entry(self, first=False):
if not self.check_token(FlowSequenceEndToken):
if not first:
if self.check_token(FlowEntryToken):
self.get_token()
if self.check_token(FlowSequenceEndToken):
token = self.peek_token()
self.echoerr("While parsing a flow sequence", self.marks[-1],
"expected sequence value, but got %r" % token.id, token.start_mark)
else:
token = self.peek_token()
raise ParserError("while parsing a flow sequence", self.marks[-1],
"expected ',' or ']', but got %r" % token.id, token.start_mark)
def parse_flow_mapping_value(self):
if self.check_token(ValueToken):
token = self.get_token()
if not self.check_token(FlowEntryToken, FlowMappingEndToken):
self.states.append(self.parse_flow_mapping_key)
return self.parse_node()
if not self.check_token(FlowSequenceEndToken):
self.states.append(self.parse_flow_sequence_entry)
return self.parse_node()
token = self.get_token()
event = SequenceEndEvent(token.start_mark, token.end_mark)
self.state = self.states.pop()
self.marks.pop()
return event
def parse_flow_sequence_entry_mapping_end(self):
self.state = self.parse_flow_sequence_entry
token = self.peek_token()
return MappingEndEvent(token.start_mark, token.start_mark)
def parse_flow_mapping_first_key(self):
token = self.get_token()
self.marks.append(token.start_mark)
return self.parse_flow_mapping_key(first=True)
def parse_flow_mapping_key(self, first=False):
if not self.check_token(FlowMappingEndToken):
if not first:
if self.check_token(FlowEntryToken):
self.get_token()
if self.check_token(FlowMappingEndToken):
token = self.peek_token()
self.echoerr("While parsing a flow mapping", self.marks[-1],
"expected mapping key, but got %r" % token.id, token.start_mark)
else:
token = self.peek_token()
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected ',' or '}', but got %r" % token.id, token.start_mark)
if self.check_token(KeyToken):
token = self.get_token()
if not self.check_token(ValueToken,
FlowEntryToken, FlowMappingEndToken):
self.states.append(self.parse_flow_mapping_value)
return self.parse_node()
else:
token = self.peek_token()
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected value, but got %r" % token.id, token.start_mark)
elif not self.check_token(FlowMappingEndToken):
token = self.peek_token()
expect_key = self.check_token(ValueToken, FlowEntryToken)
if not expect_key:
self.get_token()
expect_key = self.check_token(ValueToken)
if expect_key:
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected string key, but got %r" % token.id, token.start_mark)
else:
token = self.peek_token()
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected ':', but got %r" % token.id, token.start_mark)
token = self.get_token()
event = MappingEndEvent(token.start_mark, token.end_mark)
self.state = self.states.pop()
self.marks.pop()
return event
def parse_flow_mapping_value(self):
if self.check_token(ValueToken):
token = self.get_token()
if not self.check_token(FlowEntryToken, FlowMappingEndToken):
self.states.append(self.parse_flow_mapping_key)
return self.parse_node()
token = self.peek_token()
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected mapping value, but got %r" % token.id, token.start_mark)
token = self.peek_token()
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected mapping value, but got %r" % token.id, token.start_mark)

View File

@ -1,160 +1,144 @@
# This module contains abstractions for the input stream. You don't have to
# looks further, there are no pretty code.
#
# We define two classes here.
#
# Mark(source, line, column)
# It's just a record and its only use is producing nice error messages.
# Parser does not use it for any other purposes.
#
# Reader(source, data)
# Reader determines the encoding of `data` and converts it to unicode.
# Reader provides the following methods and attributes:
# reader.peek(length=1) - return the next `length` characters
# reader.forward(length=1) - move the current position to `length` characters.
# reader.index - the number of the current character.
# reader.line, stream.column - the line and the column of the current character.
__all__ = ['Reader', 'ReaderError']
from .error import YAMLError, Mark
import codecs, re
import codecs
import re
try:
from __builtin__ import unicode, unichr
from __builtin__ import unicode, unichr
except ImportError:
unicode = str
unichr = chr
unicode = str # NOQA
unichr = chr # NOQA
class ReaderError(YAMLError):
def __init__(self, name, position, character, encoding, reason):
self.name = name
self.character = character
self.position = position
self.encoding = encoding
self.reason = reason
def __init__(self, name, position, character, encoding, reason):
self.name = name
self.character = character
self.position = position
self.encoding = encoding
self.reason = reason
def __str__(self):
if isinstance(self.character, bytes):
return "'%s' codec can't decode byte #x%02x: %s\n" \
" in \"%s\", position %d" \
% (self.encoding, ord(self.character), self.reason,
self.name, self.position)
else:
return "unacceptable character #x%04x: %s\n" \
" in \"%s\", position %d" \
% (self.character, self.reason,
self.name, self.position)
def __str__(self):
if isinstance(self.character, bytes):
return "'%s' codec can't decode byte #x%02x: %s\n" \
" in \"%s\", position %d" \
% (self.encoding, ord(self.character), self.reason,
self.name, self.position)
else:
return "unacceptable character #x%04x: %s\n" \
" in \"%s\", position %d" \
% (self.character, self.reason,
self.name, self.position)
class Reader(object):
# Reader:
# - determines the data encoding and converts it to a unicode string,
# - checks if characters are in allowed range,
# - adds '\0' to the end.
# Reader:
# - determines the data encoding and converts it to a unicode string,
# - checks if characters are in allowed range,
# - adds '\0' to the end.
# Reader accepts
# - a file-like object with its `read` method returning `str`,
# Reader accepts
# - a file-like object with its `read` method returning `str`,
# Yeah, it's ugly and slow.
# Yeah, it's ugly and slow.
def __init__(self, stream):
self.name = None
self.stream = None
self.stream_pointer = 0
self.eof = True
self.buffer = ''
self.pointer = 0
self.full_buffer = unicode('')
self.full_pointer = 0
self.raw_buffer = None
self.raw_decode = codecs.utf_8_decode
self.encoding = 'utf-8'
self.index = 0
self.line = 0
self.column = 0
def __init__(self, stream):
self.name = None
self.stream = None
self.stream_pointer = 0
self.eof = True
self.buffer = ''
self.pointer = 0
self.full_buffer = unicode('')
self.full_pointer = 0
self.raw_buffer = None
self.raw_decode = codecs.utf_8_decode
self.encoding = 'utf-8'
self.index = 0
self.line = 0
self.column = 0
self.stream = stream
self.name = getattr(stream, 'name', "<file>")
self.eof = False
self.raw_buffer = None
self.stream = stream
self.name = getattr(stream, 'name', "<file>")
self.eof = False
self.raw_buffer = None
while not self.eof and (self.raw_buffer is None or len(self.raw_buffer) < 2):
self.update_raw()
self.update(1)
while not self.eof and (self.raw_buffer is None or len(self.raw_buffer) < 2):
self.update_raw()
self.update(1)
def peek(self, index=0):
try:
return self.buffer[self.pointer + index]
except IndexError:
self.update(index + 1)
return self.buffer[self.pointer + index]
def peek(self, index=0):
try:
return self.buffer[self.pointer+index]
except IndexError:
self.update(index+1)
return self.buffer[self.pointer+index]
def prefix(self, length=1):
if self.pointer + length >= len(self.buffer):
self.update(length)
return self.buffer[self.pointer:self.pointer + length]
def prefix(self, length=1):
if self.pointer+length >= len(self.buffer):
self.update(length)
return self.buffer[self.pointer:self.pointer+length]
def forward(self, length=1):
if self.pointer + length + 1 >= len(self.buffer):
self.update(length + 1)
while length:
ch = self.buffer[self.pointer]
self.pointer += 1
self.full_pointer += 1
self.index += 1
if ch == '\n':
self.line += 1
self.column = 0
length -= 1
def forward(self, length=1):
if self.pointer+length+1 >= len(self.buffer):
self.update(length+1)
while length:
ch = self.buffer[self.pointer]
self.pointer += 1
self.full_pointer += 1
self.index += 1
if ch == '\n':
self.line += 1
self.column = 0
length -= 1
def get_mark(self):
return Mark(self.name, self.index, self.line, self.column,
self.full_buffer, self.full_pointer)
def get_mark(self):
return Mark(self.name, self.index, self.line, self.column,
self.full_buffer, self.full_pointer)
NON_PRINTABLE = re.compile('[^\t\n\x20-\x7E' + unichr(0x85) + (unichr(0xA0) + '-' + unichr(0xD7FF)) + (unichr(0xE000) + '-' + unichr(0xFFFD)) + ']')
NON_PRINTABLE = re.compile('[^\t\n\x20-\x7E' + unichr(0x85) + (unichr(0xA0)+'-'+unichr(0xD7FF)) + (unichr(0xE000)+'-'+unichr(0xFFFD)) + ']')
def check_printable(self, data):
match = self.NON_PRINTABLE.search(data)
if match:
character = match.group()
position = self.index+(len(self.buffer)-self.pointer)+match.start()
raise ReaderError(self.name, position, ord(character),
'unicode', "special characters are not allowed")
def check_printable(self, data):
match = self.NON_PRINTABLE.search(data)
if match:
character = match.group()
position = self.index + (len(self.buffer) - self.pointer) + match.start()
raise ReaderError(self.name, position, ord(character), 'unicode', "special characters are not allowed")
def update(self, length):
if self.raw_buffer is None:
return
self.buffer = self.buffer[self.pointer:]
self.pointer = 0
while len(self.buffer) < length:
if not self.eof:
self.update_raw()
try:
data, converted = self.raw_decode(self.raw_buffer,
'strict', self.eof)
except UnicodeDecodeError as exc:
character = self.raw_buffer[exc.start]
position = self.stream_pointer-len(self.raw_buffer)+exc.start
raise ReaderError(self.name, position, character,
exc.encoding, exc.reason)
self.check_printable(data)
self.buffer += data
self.full_buffer += data
self.raw_buffer = self.raw_buffer[converted:]
if self.eof:
self.buffer += '\0'
self.raw_buffer = None
break
def update_raw(self, size=4096):
data = self.stream.read(size)
if self.raw_buffer is None:
self.raw_buffer = data
else:
self.raw_buffer += data
self.stream_pointer += len(data)
if not data:
self.eof = True
def update(self, length):
if self.raw_buffer is None:
return
self.buffer = self.buffer[self.pointer:]
self.pointer = 0
while len(self.buffer) < length:
if not self.eof:
self.update_raw()
try:
data, converted = self.raw_decode(self.raw_buffer,
'strict', self.eof)
except UnicodeDecodeError as exc:
character = self.raw_buffer[exc.start]
position = self.stream_pointer - len(self.raw_buffer) + exc.start
raise ReaderError(self.name, position, character, exc.encoding, exc.reason)
self.check_printable(data)
self.buffer += data
self.full_buffer += data
self.raw_buffer = self.raw_buffer[converted:]
if self.eof:
self.buffer += '\0'
self.raw_buffer = None
break
def update_raw(self, size=4096):
data = self.stream.read(size)
if self.raw_buffer is None:
self.raw_buffer = data
else:
self.raw_buffer += data
self.stream_pointer += len(data)
if not data:
self.eof = True

View File

@ -1,135 +1,131 @@
__all__ = ['BaseResolver', 'Resolver']
from .error import *
from .nodes import *
from .error import MarkedYAMLError
from .nodes import * # NOQA
import re
class ResolverError(MarkedYAMLError):
pass
pass
class BaseResolver:
DEFAULT_SCALAR_TAG = 'tag:yaml.org,2002:str'
DEFAULT_SEQUENCE_TAG = 'tag:yaml.org,2002:seq'
DEFAULT_MAPPING_TAG = 'tag:yaml.org,2002:map'
DEFAULT_SCALAR_TAG = 'tag:yaml.org,2002:str'
DEFAULT_SEQUENCE_TAG = 'tag:yaml.org,2002:seq'
DEFAULT_MAPPING_TAG = 'tag:yaml.org,2002:map'
yaml_implicit_resolvers = {}
yaml_path_resolvers = {}
yaml_implicit_resolvers = {}
yaml_path_resolvers = {}
def __init__(self):
self.resolver_exact_paths = []
self.resolver_prefix_paths = []
def __init__(self):
self.resolver_exact_paths = []
self.resolver_prefix_paths = []
@classmethod
def add_implicit_resolver(cls, tag, regexp, first):
if not 'yaml_implicit_resolvers' in cls.__dict__:
cls.yaml_implicit_resolvers = cls.yaml_implicit_resolvers.copy()
if first is None:
first = [None]
for ch in first:
cls.yaml_implicit_resolvers.setdefault(ch, []).append((tag, regexp))
@classmethod
def add_implicit_resolver(cls, tag, regexp, first):
if not 'yaml_implicit_resolvers' in cls.__dict__:
cls.yaml_implicit_resolvers = cls.yaml_implicit_resolvers.copy()
if first is None:
first = [None]
for ch in first:
cls.yaml_implicit_resolvers.setdefault(ch, []).append((tag, regexp))
def descend_resolver(self, current_node, current_index):
if not self.yaml_path_resolvers:
return
exact_paths = {}
prefix_paths = []
if current_node:
depth = len(self.resolver_prefix_paths)
for path, kind in self.resolver_prefix_paths[-1]:
if self.check_resolver_prefix(depth, path, kind,
current_node, current_index):
if len(path) > depth:
prefix_paths.append((path, kind))
else:
exact_paths[kind] = self.yaml_path_resolvers[path, kind]
else:
for path, kind in self.yaml_path_resolvers:
if not path:
exact_paths[kind] = self.yaml_path_resolvers[path, kind]
else:
prefix_paths.append((path, kind))
self.resolver_exact_paths.append(exact_paths)
self.resolver_prefix_paths.append(prefix_paths)
def descend_resolver(self, current_node, current_index):
if not self.yaml_path_resolvers:
return
exact_paths = {}
prefix_paths = []
if current_node:
depth = len(self.resolver_prefix_paths)
for path, kind in self.resolver_prefix_paths[-1]:
if self.check_resolver_prefix(depth, path, kind,
current_node, current_index):
if len(path) > depth:
prefix_paths.append((path, kind))
else:
exact_paths[kind] = self.yaml_path_resolvers[path, kind]
else:
for path, kind in self.yaml_path_resolvers:
if not path:
exact_paths[kind] = self.yaml_path_resolvers[path, kind]
else:
prefix_paths.append((path, kind))
self.resolver_exact_paths.append(exact_paths)
self.resolver_prefix_paths.append(prefix_paths)
def ascend_resolver(self):
if not self.yaml_path_resolvers:
return
self.resolver_exact_paths.pop()
self.resolver_prefix_paths.pop()
def ascend_resolver(self):
if not self.yaml_path_resolvers:
return
self.resolver_exact_paths.pop()
self.resolver_prefix_paths.pop()
def check_resolver_prefix(self, depth, path, kind,
current_node, current_index):
node_check, index_check = path[depth - 1]
if isinstance(node_check, str):
if current_node.tag != node_check:
return
elif node_check is not None:
if not isinstance(current_node, node_check):
return
if index_check is True and current_index is not None:
return
if (index_check is False or index_check is None) \
and current_index is None:
return
if isinstance(index_check, str):
if not (isinstance(current_index, ScalarNode)
and index_check == current_index.value):
return
elif isinstance(index_check, int) and not isinstance(index_check, bool):
if index_check != current_index:
return
return True
def check_resolver_prefix(self, depth, path, kind,
current_node, current_index):
node_check, index_check = path[depth-1]
if isinstance(node_check, str):
if current_node.tag != node_check:
return
elif node_check is not None:
if not isinstance(current_node, node_check):
return
if index_check is True and current_index is not None:
return
if (index_check is False or index_check is None) \
and current_index is None:
return
if isinstance(index_check, str):
if not (isinstance(current_index, ScalarNode)
and index_check == current_index.value):
return
elif isinstance(index_check, int) and not isinstance(index_check, bool):
if index_check != current_index:
return
return True
def resolve(self, kind, value, implicit, mark=None):
if kind is ScalarNode and implicit[0]:
if value == '':
resolvers = self.yaml_implicit_resolvers.get('', [])
else:
resolvers = self.yaml_implicit_resolvers.get(value[0], [])
resolvers += self.yaml_implicit_resolvers.get(None, [])
for tag, regexp in resolvers:
if regexp.match(value):
return tag
else:
self.echoerr('While resolving plain scalar', None,
'expected floating-point value, integer, null or boolean, but got %r' % value,
mark)
return self.DEFAULT_SCALAR_TAG
if kind is ScalarNode:
return self.DEFAULT_SCALAR_TAG
elif kind is SequenceNode:
return self.DEFAULT_SEQUENCE_TAG
elif kind is MappingNode:
return self.DEFAULT_MAPPING_TAG
def resolve(self, kind, value, implicit, mark=None):
if kind is ScalarNode and implicit[0]:
if value == '':
resolvers = self.yaml_implicit_resolvers.get('', [])
else:
resolvers = self.yaml_implicit_resolvers.get(value[0], [])
resolvers += self.yaml_implicit_resolvers.get(None, [])
for tag, regexp in resolvers:
if regexp.match(value):
return tag
else:
raise ResolverError('while resolving plain scalar', None,
"expected floating-point value, integer, null or boolean, but got %r" % value,
mark)
implicit = implicit[1]
if self.yaml_path_resolvers:
exact_paths = self.resolver_exact_paths[-1]
if kind in exact_paths:
return exact_paths[kind]
if None in exact_paths:
return exact_paths[None]
if kind is ScalarNode:
return self.DEFAULT_SCALAR_TAG
elif kind is SequenceNode:
return self.DEFAULT_SEQUENCE_TAG
elif kind is MappingNode:
return self.DEFAULT_MAPPING_TAG
class Resolver(BaseResolver):
pass
pass
Resolver.add_implicit_resolver(
'tag:yaml.org,2002:bool',
re.compile(r'''^(?:true|false)$''', re.X),
list('yYnNtTfFoO'))
'tag:yaml.org,2002:bool',
re.compile(r'''^(?:true|false)$''', re.X),
list('yYnNtTfFoO'))
Resolver.add_implicit_resolver(
'tag:yaml.org,2002:float',
re.compile(r'^-?(?:0|[1-9]\d*)(?=[.eE])(?:\.\d+)?(?:[eE][-+]?\d+)?$', re.X),
list('-0123456789'))
'tag:yaml.org,2002:float',
re.compile(r'^-?(?:0|[1-9]\d*)(?=[.eE])(?:\.\d+)?(?:[eE][-+]?\d+)?$', re.X),
list('-0123456789'))
Resolver.add_implicit_resolver(
'tag:yaml.org,2002:int',
re.compile(r'^(?:0|-?[1-9]\d*)$', re.X),
list('-0123456789'))
'tag:yaml.org,2002:int',
re.compile(r'^(?:0|-?[1-9]\d*)$', re.X),
list('-0123456789'))
Resolver.add_implicit_resolver(
'tag:yaml.org,2002:null',
re.compile(r'^null$', re.X),
['n'])
'tag:yaml.org,2002:null',
re.compile(r'^null$', re.X),
['n'])

View File

@ -1,4 +1,3 @@
# Scanner produces tokens of the following types:
# STREAM-START
# STREAM-END
@ -11,467 +10,459 @@
# FLOW-ENTRY
# KEY
# VALUE
# ALIAS(value)
# ANCHOR(value)
# TAG(value)
# SCALAR(value, plain, style)
#
# Read comments in the Scanner code for more details.
#
__all__ = ['Scanner', 'ScannerError']
from .error import MarkedYAMLError
from .tokens import *
from .tokens import * # NOQA
class ScannerError(MarkedYAMLError):
pass
pass
class SimpleKey:
# See below simple keys treatment.
# See below simple keys treatment.
def __init__(self, token_number, index, line, column, mark):
self.token_number = token_number
self.index = index
self.line = line
self.column = column
self.mark = mark
def __init__(self, token_number, index, line, column, mark):
self.token_number = token_number
self.index = index
self.line = line
self.column = column
self.mark = mark
class Scanner:
def __init__(self):
"""Initialize the scanner."""
# It is assumed that Scanner and Reader will have a common descendant.
# Reader do the dirty work of checking for BOM and converting the
# input data to Unicode. It also adds NUL to the end.
#
# Reader supports the following methods
# self.peek(i=0) # peek the next i-th character
# self.prefix(l=1) # peek the next l characters
# self.forward(l=1) # read the next l characters and move the pointer.
# Had we reached the end of the stream?
self.done = False
# The number of unclosed '{' and '['. `flow_level == 0` means block
# context.
self.flow_level = 0
# List of processed tokens that are not yet emitted.
self.tokens = []
# Add the STREAM-START token.
self.fetch_stream_start()
# Number of tokens that were emitted through the `get_token` method.
self.tokens_taken = 0
# Variables related to simple keys treatment.
# A simple key is a key that is not denoted by the '?' indicator.
# We emit the KEY token before all keys, so when we find a potential
# simple key, we try to locate the corresponding ':' indicator.
# Simple keys should be limited to a single line.
# Can a simple key start at the current position? A simple key may
# start:
# - after '{', '[', ',' (in the flow context),
self.allow_simple_key = False
# Keep track of possible simple keys. This is a dictionary. The key
# is `flow_level`; there can be no more that one possible simple key
# for each level. The value is a SimpleKey record:
# (token_number, index, line, column, mark)
# A simple key may start with SCALAR(flow), '[', or '{' tokens.
self.possible_simple_keys = {}
# Public methods.
def check_token(self, *choices):
# Check if the next token is one of the given types.
while self.need_more_tokens():
self.fetch_more_tokens()
if self.tokens:
if not choices:
return True
for choice in choices:
if isinstance(self.tokens[0], choice):
return True
return False
def peek_token(self):
# Return the next token, but do not delete if from the queue.
while self.need_more_tokens():
self.fetch_more_tokens()
if self.tokens:
return self.tokens[0]
def get_token(self):
# Return the next token.
while self.need_more_tokens():
self.fetch_more_tokens()
if self.tokens:
self.tokens_taken += 1
return self.tokens.pop(0)
# Private methods.
def need_more_tokens(self):
if self.done:
return False
if not self.tokens:
return True
# The current token may be a potential simple key, so we
# need to look further.
self.stale_possible_simple_keys()
if self.next_possible_simple_key() == self.tokens_taken:
return True
def fetch_more_tokens(self):
# Eat whitespaces and comments until we reach the next token.
self.scan_to_next_token()
# Remove obsolete possible simple keys.
self.stale_possible_simple_keys()
# Peek the next character.
ch = self.peek()
# Is it the end of stream?
if ch == '\0':
return self.fetch_stream_end()
# Note: the order of the following checks is NOT significant.
# Is it the flow sequence start indicator?
if ch == '[':
return self.fetch_flow_sequence_start()
# Is it the flow mapping start indicator?
if ch == '{':
return self.fetch_flow_mapping_start()
# Is it the flow sequence end indicator?
if ch == ']':
return self.fetch_flow_sequence_end()
# Is it the flow mapping end indicator?
if ch == '}':
return self.fetch_flow_mapping_end()
# Is it the flow entry indicator?
if ch == ',':
return self.fetch_flow_entry()
# Is it the value indicator?
if ch == ':' and self.flow_level:
return self.fetch_value()
# Is it a double quoted scalar?
if ch == '\"':
return self.fetch_double()
# It must be a plain scalar then.
if self.check_plain():
return self.fetch_plain()
# No? It's an error. Let's produce a nice error message.
raise ScannerError("while scanning for the next token", None,
"found character %r that cannot start any token" % ch,
self.get_mark())
# Simple keys treatment.
def next_possible_simple_key(self):
# Return the number of the nearest possible simple key. Actually we
# don't need to loop through the whole dictionary. We may replace it
# with the following code:
# if not self.possible_simple_keys:
# return None
# return self.possible_simple_keys[
# min(self.possible_simple_keys.keys())].token_number
min_token_number = None
for level in self.possible_simple_keys:
key = self.possible_simple_keys[level]
if min_token_number is None or key.token_number < min_token_number:
min_token_number = key.token_number
return min_token_number
def stale_possible_simple_keys(self):
# Remove entries that are no longer possible simple keys. According to
# the YAML specification, simple keys
# - should be limited to a single line,
# Disabling this procedure will allow simple keys of any length and
# height (may cause problems if indentation is broken though).
for level in list(self.possible_simple_keys):
key = self.possible_simple_keys[level]
if key.line != self.line:
del self.possible_simple_keys[level]
def save_possible_simple_key(self):
# The next token may start a simple key. We check if it's possible
# and save its position. This function is called for
# SCALAR(flow), '[', and '{'.
# The next token might be a simple key. Let's save it's number and
# position.
if self.allow_simple_key:
self.remove_possible_simple_key()
token_number = self.tokens_taken + len(self.tokens)
key = SimpleKey(token_number,
self.index, self.line, self.column, self.get_mark())
self.possible_simple_keys[self.flow_level] = key
def remove_possible_simple_key(self):
# Remove the saved possible key position at the current flow level.
if self.flow_level in self.possible_simple_keys:
del self.possible_simple_keys[self.flow_level]
# Fetchers.
def fetch_stream_start(self):
# We always add STREAM-START as the first token and STREAM-END as the
# last token.
# Read the token.
mark = self.get_mark()
# Add STREAM-START.
self.tokens.append(StreamStartToken(mark, mark,
encoding=self.encoding))
def fetch_stream_end(self):
# Reset simple keys.
self.remove_possible_simple_key()
self.allow_simple_key = False
self.possible_simple_keys = {}
# Read the token.
mark = self.get_mark()
# Add STREAM-END.
self.tokens.append(StreamEndToken(mark, mark))
# The steam is finished.
self.done = True
def __init__(self):
"""Initialize the scanner."""
# It is assumed that Scanner and Reader will have a common descendant.
# Reader do the dirty work of checking for BOM and converting the
# input data to Unicode. It also adds NUL to the end.
#
# Reader supports the following methods
# self.peek(i=0) # peek the next i-th character
# self.prefix(l=1) # peek the next l characters
# self.forward(l=1) # read the next l characters and move the pointer.
# Had we reached the end of the stream?
self.done = False
# The number of unclosed '{' and '['. `flow_level == 0` means block
# context.
self.flow_level = 0
# List of processed tokens that are not yet emitted.
self.tokens = []
# Add the STREAM-START token.
self.fetch_stream_start()
# Number of tokens that were emitted through the `get_token` method.
self.tokens_taken = 0
# Variables related to simple keys treatment.
# A simple key is a key that is not denoted by the '?' indicator.
# We emit the KEY token before all keys, so when we find a potential
# simple key, we try to locate the corresponding ':' indicator.
# Simple keys should be limited to a single line.
# Can a simple key start at the current position? A simple key may
# start:
# - after '{', '[', ',' (in the flow context),
self.allow_simple_key = False
# Keep track of possible simple keys. This is a dictionary. The key
# is `flow_level`; there can be no more that one possible simple key
# for each level. The value is a SimpleKey record:
# (token_number, index, line, column, mark)
# A simple key may start with SCALAR(flow), '[', or '{' tokens.
self.possible_simple_keys = {}
# Public methods.
def check_token(self, *choices):
# Check if the next token is one of the given types.
while self.need_more_tokens():
self.fetch_more_tokens()
if self.tokens:
if not choices:
return True
for choice in choices:
if isinstance(self.tokens[0], choice):
return True
return False
def peek_token(self):
# Return the next token, but do not delete if from the queue.
while self.need_more_tokens():
self.fetch_more_tokens()
if self.tokens:
return self.tokens[0]
def get_token(self):
# Return the next token.
while self.need_more_tokens():
self.fetch_more_tokens()
if self.tokens:
self.tokens_taken += 1
return self.tokens.pop(0)
# Private methods.
def need_more_tokens(self):
if self.done:
return False
if not self.tokens:
return True
# The current token may be a potential simple key, so we
# need to look further.
self.stale_possible_simple_keys()
if self.next_possible_simple_key() == self.tokens_taken:
return True
def fetch_more_tokens(self):
# Eat whitespaces and comments until we reach the next token.
self.scan_to_next_token()
# Remove obsolete possible simple keys.
self.stale_possible_simple_keys()
# Peek the next character.
ch = self.peek()
# Is it the end of stream?
if ch == '\0':
return self.fetch_stream_end()
# Note: the order of the following checks is NOT significant.
# Is it the flow sequence start indicator?
if ch == '[':
return self.fetch_flow_sequence_start()
# Is it the flow mapping start indicator?
if ch == '{':
return self.fetch_flow_mapping_start()
# Is it the flow sequence end indicator?
if ch == ']':
return self.fetch_flow_sequence_end()
# Is it the flow mapping end indicator?
if ch == '}':
return self.fetch_flow_mapping_end()
# Is it the flow entry indicator?
if ch == ',':
return self.fetch_flow_entry()
# Is it the value indicator?
if ch == ':' and self.flow_level:
return self.fetch_value()
# Is it a double quoted scalar?
if ch == '\"':
return self.fetch_double()
# It must be a plain scalar then.
if self.check_plain():
return self.fetch_plain()
# No? It's an error. Let's produce a nice error message.
raise ScannerError("while scanning for the next token", None,
"found character %r that cannot start any token" % ch,
self.get_mark())
# Simple keys treatment.
def next_possible_simple_key(self):
# Return the number of the nearest possible simple key. Actually we
# don't need to loop through the whole dictionary. We may replace it
# with the following code:
# if not self.possible_simple_keys:
# return None
# return self.possible_simple_keys[
# min(self.possible_simple_keys.keys())].token_number
min_token_number = None
for level in self.possible_simple_keys:
key = self.possible_simple_keys[level]
if min_token_number is None or key.token_number < min_token_number:
min_token_number = key.token_number
return min_token_number
def stale_possible_simple_keys(self):
# Remove entries that are no longer possible simple keys. According to
# the YAML specification, simple keys
# - should be limited to a single line,
# Disabling this procedure will allow simple keys of any length and
# height (may cause problems if indentation is broken though).
for level in list(self.possible_simple_keys):
key = self.possible_simple_keys[level]
if key.line != self.line:
del self.possible_simple_keys[level]
def save_possible_simple_key(self):
# The next token may start a simple key. We check if it's possible
# and save its position. This function is called for
# SCALAR(flow), '[', and '{'.
# The next token might be a simple key. Let's save it's number and
# position.
if self.allow_simple_key:
self.remove_possible_simple_key()
token_number = self.tokens_taken+len(self.tokens)
key = SimpleKey(token_number,
self.index, self.line, self.column, self.get_mark())
self.possible_simple_keys[self.flow_level] = key
def remove_possible_simple_key(self):
# Remove the saved possible key position at the current flow level.
if self.flow_level in self.possible_simple_keys:
key = self.possible_simple_keys[self.flow_level]
del self.possible_simple_keys[self.flow_level]
# Fetchers.
def fetch_stream_start(self):
# We always add STREAM-START as the first token and STREAM-END as the
# last token.
# Read the token.
mark = self.get_mark()
# Add STREAM-START.
self.tokens.append(StreamStartToken(mark, mark,
encoding=self.encoding))
def fetch_stream_end(self):
# Reset simple keys.
self.remove_possible_simple_key()
self.allow_simple_key = False
self.possible_simple_keys = {}
# Read the token.
mark = self.get_mark()
# Add STREAM-END.
self.tokens.append(StreamEndToken(mark, mark))
# The steam is finished.
self.done = True
def fetch_flow_sequence_start(self):
self.fetch_flow_collection_start(FlowSequenceStartToken)
def fetch_flow_mapping_start(self):
self.fetch_flow_collection_start(FlowMappingStartToken)
def fetch_flow_collection_start(self, TokenClass):
# '[' and '{' may start a simple key.
self.save_possible_simple_key()
# Increase the flow level.
self.flow_level += 1
# Simple keys are allowed after '[' and '{'.
self.allow_simple_key = True
# Add FLOW-SEQUENCE-START or FLOW-MAPPING-START.
start_mark = self.get_mark()
self.forward()
end_mark = self.get_mark()
self.tokens.append(TokenClass(start_mark, end_mark))
def fetch_flow_sequence_end(self):
self.fetch_flow_collection_end(FlowSequenceEndToken)
def fetch_flow_mapping_end(self):
self.fetch_flow_collection_end(FlowMappingEndToken)
def fetch_flow_collection_end(self, TokenClass):
# Reset possible simple key on the current level.
self.remove_possible_simple_key()
# Decrease the flow level.
self.flow_level -= 1
# No simple keys after ']' or '}'.
self.allow_simple_key = False
# Add FLOW-SEQUENCE-END or FLOW-MAPPING-END.
start_mark = self.get_mark()
self.forward()
end_mark = self.get_mark()
self.tokens.append(TokenClass(start_mark, end_mark))
def fetch_value(self):
# Do we determine a simple key?
if self.flow_level in self.possible_simple_keys:
# Add KEY.
key = self.possible_simple_keys[self.flow_level]
del self.possible_simple_keys[self.flow_level]
self.tokens.insert(key.token_number-self.tokens_taken,
KeyToken(key.mark, key.mark))
# There cannot be two simple keys one after another.
self.allow_simple_key = False
# Add VALUE.
start_mark = self.get_mark()
self.forward()
end_mark = self.get_mark()
self.tokens.append(ValueToken(start_mark, end_mark))
def fetch_flow_entry(self):
# Simple keys are allowed after ','.
self.allow_simple_key = True
# Reset possible simple key on the current level.
self.remove_possible_simple_key()
# Add FLOW-ENTRY.
start_mark = self.get_mark()
self.forward()
end_mark = self.get_mark()
self.tokens.append(FlowEntryToken(start_mark, end_mark))
def fetch_double(self):
# A flow scalar could be a simple key.
self.save_possible_simple_key()
# No simple keys after flow scalars.
self.allow_simple_key = False
# Scan and add SCALAR.
self.tokens.append(self.scan_flow_scalar())
def fetch_plain(self):
self.save_possible_simple_key()
# No simple keys after plain scalars.
self.allow_simple_key = False
# Scan and add SCALAR. May change `allow_simple_key`.
self.tokens.append(self.scan_plain())
# Checkers.
def check_plain(self):
return self.peek() in '0123456789-ntf'
# Scanners.
def scan_to_next_token(self):
while self.peek() in ' \t\n':
self.forward()
def scan_flow_scalar(self):
# See the specification for details.
# Note that we loose indentation rules for quoted scalars. Quoted
# scalars don't need to adhere indentation because " and ' clearly
# mark the beginning and the end of them. Therefore we are less
# restrictive then the specification requires. We only need to check
# that document separators are not included in scalars.
chunks = []
start_mark = self.get_mark()
quote = self.peek()
self.forward()
chunks.extend(self.scan_flow_scalar_non_spaces(start_mark))
while self.peek() != quote:
chunks.extend(self.scan_flow_scalar_spaces(start_mark))
chunks.extend(self.scan_flow_scalar_non_spaces(start_mark))
self.forward()
end_mark = self.get_mark()
return ScalarToken(''.join(chunks), False, start_mark, end_mark, '"')
ESCAPE_REPLACEMENTS = {
'b': '\x08',
't': '\x09',
'n': '\x0A',
'f': '\x0C',
'r': '\x0D',
'\"': '\"',
'\\': '\\',
}
ESCAPE_CODES = {
'u': 4,
}
def scan_flow_scalar_non_spaces(self, start_mark):
# See the specification for details.
chunks = []
while True:
length = 0
while self.peek(length) not in '\"\\\0 \t\n':
length += 1
if length:
chunks.append(self.prefix(length))
self.forward(length)
ch = self.peek()
if ch == '\\':
self.forward()
ch = self.peek()
if ch in self.ESCAPE_REPLACEMENTS:
chunks.append(self.ESCAPE_REPLACEMENTS[ch])
self.forward()
elif ch in self.ESCAPE_CODES:
length = self.ESCAPE_CODES[ch]
self.forward()
for k in range(length):
if self.peek(k) not in '0123456789ABCDEFabcdef':
raise ScannerError("while scanning a double-quoted scalar", start_mark,
"expected escape sequence of %d hexdecimal numbers, but found %r" %
(length, self.peek(k)), self.get_mark())
code = int(self.prefix(length), 16)
chunks.append(chr(code))
self.forward(length)
else:
raise ScannerError("while scanning a double-quoted scalar", start_mark,
"found unknown escape character %r" % ch, self.get_mark())
else:
return chunks
def scan_flow_scalar_spaces(self, start_mark):
# See the specification for details.
chunks = []
length = 0
while self.peek(length) in ' \t':
length += 1
whitespaces = self.prefix(length)
self.forward(length)
ch = self.peek()
if ch == '\0':
raise ScannerError("while scanning a quoted scalar", start_mark,
"found unexpected end of stream", self.get_mark())
elif ch == '\n':
raise ScannerError("while scanning a quoted scalar", start_mark,
"found unexpected line end", self.get_mark())
else:
chunks.append(whitespaces)
return chunks
def scan_plain(self):
chunks = []
start_mark = self.get_mark()
spaces = []
while True:
length = 0
while True:
ch = self.peek(length)
if self.peek(length) not in 'eE.0123456789nul-tr+fas':
break
length += 1
if length == 0:
break
self.allow_simple_key = False
chunks.extend(spaces)
chunks.append(self.prefix(length))
self.forward(length)
end_mark = self.get_mark()
return ScalarToken(''.join(chunks), True, start_mark, end_mark)
def fetch_flow_sequence_start(self):
self.fetch_flow_collection_start(FlowSequenceStartToken)
def fetch_flow_mapping_start(self):
self.fetch_flow_collection_start(FlowMappingStartToken)
def fetch_flow_collection_start(self, TokenClass):
# '[' and '{' may start a simple key.
self.save_possible_simple_key()
# Increase the flow level.
self.flow_level += 1
# Simple keys are allowed after '[' and '{'.
self.allow_simple_key = True
# Add FLOW-SEQUENCE-START or FLOW-MAPPING-START.
start_mark = self.get_mark()
self.forward()
end_mark = self.get_mark()
self.tokens.append(TokenClass(start_mark, end_mark))
def fetch_flow_sequence_end(self):
self.fetch_flow_collection_end(FlowSequenceEndToken)
def fetch_flow_mapping_end(self):
self.fetch_flow_collection_end(FlowMappingEndToken)
def fetch_flow_collection_end(self, TokenClass):
# Reset possible simple key on the current level.
self.remove_possible_simple_key()
# Decrease the flow level.
self.flow_level -= 1
# No simple keys after ']' or '}'.
self.allow_simple_key = False
# Add FLOW-SEQUENCE-END or FLOW-MAPPING-END.
start_mark = self.get_mark()
self.forward()
end_mark = self.get_mark()
self.tokens.append(TokenClass(start_mark, end_mark))
def fetch_value(self):
# Do we determine a simple key?
if self.flow_level in self.possible_simple_keys:
# Add KEY.
key = self.possible_simple_keys[self.flow_level]
del self.possible_simple_keys[self.flow_level]
self.tokens.insert(key.token_number - self.tokens_taken,
KeyToken(key.mark, key.mark))
# There cannot be two simple keys one after another.
self.allow_simple_key = False
# Add VALUE.
start_mark = self.get_mark()
self.forward()
end_mark = self.get_mark()
self.tokens.append(ValueToken(start_mark, end_mark))
def fetch_flow_entry(self):
# Simple keys are allowed after ','.
self.allow_simple_key = True
# Reset possible simple key on the current level.
self.remove_possible_simple_key()
# Add FLOW-ENTRY.
start_mark = self.get_mark()
self.forward()
end_mark = self.get_mark()
self.tokens.append(FlowEntryToken(start_mark, end_mark))
def fetch_double(self):
# A flow scalar could be a simple key.
self.save_possible_simple_key()
# No simple keys after flow scalars.
self.allow_simple_key = False
# Scan and add SCALAR.
self.tokens.append(self.scan_flow_scalar())
def fetch_plain(self):
self.save_possible_simple_key()
# No simple keys after plain scalars.
self.allow_simple_key = False
# Scan and add SCALAR. May change `allow_simple_key`.
self.tokens.append(self.scan_plain())
# Checkers.
def check_plain(self):
return self.peek() in '0123456789-ntf'
# Scanners.
def scan_to_next_token(self):
while self.peek() in ' \t\n':
self.forward()
def scan_flow_scalar(self):
# See the specification for details.
# Note that we loose indentation rules for quoted scalars. Quoted
# scalars don't need to adhere indentation because " and ' clearly
# mark the beginning and the end of them. Therefore we are less
# restrictive then the specification requires. We only need to check
# that document separators are not included in scalars.
chunks = []
start_mark = self.get_mark()
quote = self.peek()
self.forward()
chunks.extend(self.scan_flow_scalar_non_spaces(start_mark))
while self.peek() != quote:
chunks.extend(self.scan_flow_scalar_spaces(start_mark))
chunks.extend(self.scan_flow_scalar_non_spaces(start_mark))
self.forward()
end_mark = self.get_mark()
return ScalarToken(''.join(chunks), False, start_mark, end_mark, '"')
ESCAPE_REPLACEMENTS = {
'b': '\x08',
't': '\x09',
'n': '\x0A',
'f': '\x0C',
'r': '\x0D',
'\"': '\"',
'\\': '\\',
}
ESCAPE_CODES = {
'u': 4,
}
def scan_flow_scalar_non_spaces(self, start_mark):
# See the specification for details.
chunks = []
while True:
length = 0
while self.peek(length) not in '\"\\\0 \t\n':
length += 1
if length:
chunks.append(self.prefix(length))
self.forward(length)
ch = self.peek()
if ch == '\\':
self.forward()
ch = self.peek()
if ch in self.ESCAPE_REPLACEMENTS:
chunks.append(self.ESCAPE_REPLACEMENTS[ch])
self.forward()
elif ch in self.ESCAPE_CODES:
length = self.ESCAPE_CODES[ch]
self.forward()
for k in range(length):
if self.peek(k) not in '0123456789ABCDEFabcdef':
raise ScannerError("while scanning a double-quoted scalar", start_mark,
"expected escape sequence of %d hexdecimal numbers, but found %r" %
(length, self.peek(k)), self.get_mark())
code = int(self.prefix(length), 16)
chunks.append(chr(code))
self.forward(length)
else:
raise ScannerError("while scanning a double-quoted scalar", start_mark,
"found unknown escape character %r" % ch, self.get_mark())
else:
return chunks
def scan_flow_scalar_spaces(self, start_mark):
# See the specification for details.
chunks = []
length = 0
while self.peek(length) in ' \t':
length += 1
whitespaces = self.prefix(length)
self.forward(length)
ch = self.peek()
if ch == '\0':
raise ScannerError("while scanning a quoted scalar", start_mark,
"found unexpected end of stream", self.get_mark())
elif ch == '\n':
raise ScannerError("while scanning a quoted scalar", start_mark,
"found unexpected line end", self.get_mark())
else:
chunks.append(whitespaces)
return chunks
def scan_plain(self):
chunks = []
start_mark = self.get_mark()
spaces = []
while True:
length = 0
while True:
if self.peek(length) not in 'eE.0123456789nul-tr+fas':
break
length += 1
if length == 0:
break
self.allow_simple_key = False
chunks.extend(spaces)
chunks.append(self.prefix(length))
self.forward(length)
end_mark = self.get_mark()
return ScalarToken(''.join(chunks), True, start_mark, end_mark)

View File

@ -1,57 +1,65 @@
class Token(object):
def __init__(self, start_mark, end_mark):
self.start_mark = start_mark
self.end_mark = end_mark
def __repr__(self):
attributes = [key for key in self.__dict__
if not key.endswith('_mark')]
attributes.sort()
arguments = ', '.join(['%s=%r' % (key, getattr(self, key))
for key in attributes])
return '%s(%s)' % (self.__class__.__name__, arguments)
def __init__(self, start_mark, end_mark):
self.start_mark = start_mark
self.end_mark = end_mark
def __repr__(self):
attributes = [key for key in self.__dict__
if not key.endswith('_mark')]
attributes.sort()
arguments = ', '.join(['%s=%r' % (key, getattr(self, key))
for key in attributes])
return '%s(%s)' % (self.__class__.__name__, arguments)
#class BOMToken(Token):
# id = '<byte order mark>'
class StreamStartToken(Token):
id = '<stream start>'
def __init__(self, start_mark=None, end_mark=None,
encoding=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.encoding = encoding
id = '<stream start>'
def __init__(self, start_mark=None, end_mark=None,
encoding=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.encoding = encoding
class StreamEndToken(Token):
id = '<stream end>'
id = '<stream end>'
class FlowSequenceStartToken(Token):
id = '['
id = '['
class FlowMappingStartToken(Token):
id = '{'
id = '{'
class FlowSequenceEndToken(Token):
id = ']'
id = ']'
class FlowMappingEndToken(Token):
id = '}'
id = '}'
class KeyToken(Token):
id = '?'
id = '?'
class ValueToken(Token):
id = ':'
id = ':'
class FlowEntryToken(Token):
id = ','
id = ','
class ScalarToken(Token):
id = '<scalar>'
def __init__(self, value, plain, start_mark, end_mark, style=None):
self.value = value
self.plain = plain
self.start_mark = start_mark
self.end_mark = end_mark
self.style = style
id = '<scalar>'
def __init__(self, value, plain, start_mark, end_mark, style=None):
self.value = value
self.plain = plain
self.start_mark = start_mark
self.end_mark = end_mark
self.style = style