Merge remote-tracking branch 'zyx-i/fix-278-powerline-lint' into develop

This commit is contained in:
Kim Silkebækken 2013-03-11 16:24:35 +01:00
commit 89204d2a7b
28 changed files with 2822 additions and 42 deletions

View File

@ -113,9 +113,9 @@ Common configuration
Common configuration is a subdictionary that is a value of ``common`` key in
:file:`powerline/config.json` file.
.. _config-common-term_24bit_colors:
.. _config-common-term_truecolor:
``term_24bit_colors``
``term_truecolor``
Defines whether to output cterm indices (8-bit) or RGB colors (24-bit)
to the terminal emulator. See the :ref:`term-feature-support-matrix` for
information on whether your terminal emulator supports 24-bit colors.

View File

@ -31,8 +31,8 @@ custom symbols for developers. This requires that you either have a symbol
font or a patched font on your system. Your terminal emulator must also
support either patched fonts or fontconfig for Powerline to work properly.
You can also enable :ref:`24-bit color support
<config-common-term_24bit_colors>` if your terminal emulator supports it.
You can also enable :ref:`24-bit color support <config-common-term_truecolor>`
if your terminal emulator supports it.
.. table:: Application/terminal emulator feature support matrix
:name: term-feature-support-matrix

View File

@ -9,13 +9,17 @@ from powerline.colorscheme import Colorscheme
from powerline.lib import underscore_to_camelcase
def load_json_config(search_paths, config_file):
def open_file(path):
return open(path, 'r')
def load_json_config(search_paths, config_file, load=json.load, open=open_file):
config_file += '.json'
for path in search_paths:
config_file_path = os.path.join(path, config_file)
if os.path.isfile(config_file_path):
with open(config_file_path, 'r') as config_file_fp:
return json.load(config_file_fp)
with open(config_file_path) as config_file_fp:
return load(config_file_fp)
raise IOError('Config file not found in search path: {0}'.format(config_file))
@ -56,7 +60,7 @@ class Powerline(object):
'common_config': common_config,
'segment_info': self.get_segment_info(),
}
local_themes = self.get_local_themes(ext_config.get('local_themes', {}))
local_themes = self.get_local_themes(ext_config.get('local_themes'))
# Load and initialize extension renderer
renderer_module_name = renderer_module or ext
@ -67,10 +71,11 @@ class Powerline(object):
except ImportError as e:
sys.stderr.write('Error while importing renderer module: {0}\n'.format(e))
sys.exit(1)
options = {'term_truecolor': common_config.get('term_24bit_colors', False)}
options = {'term_truecolor': common_config.get('term_truecolor', False)}
self.renderer = Renderer(theme_config, local_themes, theme_kwargs, colorscheme, **options)
def get_config_paths(self):
@staticmethod
def get_config_paths():
'''Get configuration paths.
:return: list of paths
@ -125,14 +130,15 @@ class Powerline(object):
required.
:param dict local_themes:
Usually accepts ``{matcher_name : theme_name}``.
Usually accepts ``{matcher_name : theme_name}``. May also receive
None in case there is no local_themes configuration.
:return:
anything accepted by ``self.renderer.get_theme`` and processable by
``self.renderer.add_local_theme``. Renderer module is determined by
``__init__`` arguments, refer to its documentation.
'''
return {}
return None
@staticmethod
def get_segment_info():

View File

@ -1,6 +1,6 @@
{
"common": {
"term_24bit_colors": false,
"term_truecolor": false,
"dividers": {
"left": {
"hard": " ",
@ -11,8 +11,7 @@
"soft": " "
}
},
"spaces": 1,
"colors": "default"
"spaces": 1
},
"ext": {
"ipython": {

951
powerline/lint/__init__.py Normal file
View File

@ -0,0 +1,951 @@
from powerline.lint.markedjson import load
from powerline import load_json_config, Powerline
from powerline.lint.markedjson.error import echoerr, MarkedError
from powerline.segments.vim import vim_modes
import itertools
import sys
import os
import re
from collections import defaultdict
from copy import copy
try:
from __builtin__ import unicode
except ImportError:
unicode = str
def open_file(path):
return open(path, 'rb')
def find_config(search_paths, config_file):
config_file += '.json'
for path in search_paths:
if os.path.isfile(os.path.join(path, config_file)):
return path
return None
EMPTYTUPLE = tuple()
def context_key(context):
return '/'.join((unicode(c[0]) for c in context))
class Spec(object):
def __init__(self, **keys):
new_keys = {}
self.specs = list(keys.values())
for k, v in keys.items():
new_keys[k] = len(self.specs)
self.specs.append(v)
self.keys = new_keys
self.checks = []
self.cmsg = ''
self.isoptional = False
self.uspecs = []
self.ufailmsg = lambda key: 'found unknown key: {0}'.format(key)
if keys:
self.type(dict)
def copy(self):
return self.__class__().update(self.__dict__)
def update(self, d):
self.__dict__.update(d)
self.checks = copy(self.checks)
self.uspecs = copy(self.uspecs)
self.specs = [spec.copy() for spec in self.specs]
return self
def unknown_spec(self, keyfunc, spec):
if isinstance(keyfunc, Spec):
self.specs.append(keyfunc)
keyfunc = len(self.specs) - 1
self.specs.append(spec)
self.uspecs.append((keyfunc, len(self.specs) - 1))
return self
def unknown_msg(self, msgfunc):
self.ufailmsg = msgfunc
return self
def context_message(self, msg):
self.cmsg = msg
for spec in self.specs:
if not spec.cmsg:
spec.context_message(msg)
return self
def check_type(self, value, context_mark, data, context, echoerr, t):
if type(value.value) is not t:
echoerr(context=self.cmsg.format(key=context_key(context)),
context_mark=context_mark,
problem='must be a {0} instance'.format(t.__name__),
problem_mark=value.mark)
return False, True
return True, False
def check_func(self, value, context_mark, data, context, echoerr, func, msg_func):
proceed, echo, hadproblem = func(value, data, context, echoerr)
if echo and hadproblem:
echoerr(context=self.cmsg.format(key=context_key(context)),
context_mark=context_mark,
problem=msg_func(value),
problem_mark=value.mark)
return proceed, hadproblem
def check_list(self, value, context_mark, data, context, echoerr, item_func, msg_func):
i = 0
hadproblem = False
for item in value:
if isinstance(item_func, int):
spec = self.specs[item_func]
proceed, fhadproblem = spec.match(item, value.mark, data, context + (('list item ' + unicode(i), item),), echoerr)
else:
proceed, echo, fhadproblem = item_func(item, data, context, echoerr)
if echo and fhadproblem:
echoerr(context=self.cmsg.format(key=context_key(context) + '/list item ' + unicode(i)),
context_mark=value.mark,
problem=msg_func(item),
problem_mark=item.mark)
if fhadproblem:
hadproblem = True
if not proceed:
return proceed, hadproblem
i += 1
return True, hadproblem
def check_either(self, value, context_mark, data, context, echoerr, start, end):
errs = []
def new_echoerr(*args, **kwargs):
errs.append((args, kwargs))
hadproblem = False
for spec in self.specs[start:end]:
proceed, hadproblem = spec.match(value, value.mark, data, context, new_echoerr)
if not proceed:
break
if not hadproblem:
return True, False
for args, kwargs in errs:
echoerr(*args, **kwargs)
return False, hadproblem
def check_tuple(self, value, context_mark, data, context, echoerr, start, end):
hadproblem = False
for (i, item, spec) in zip(itertools.count(), value, self.specs[start:end]):
proceed, ihadproblem = spec.match(item, value.mark, data, context + (('tuple item ' + unicode(i), item),), echoerr)
if ihadproblem:
hadproblem = True
if not proceed:
return False, hadproblem
return True, hadproblem
def type(self, t):
self.checks.append(('check_type', t))
return self
cmp_funcs = {
'le': lambda x, y: x <= y,
'lt': lambda x, y: x < y,
'ge': lambda x, y: x >= y,
'gt': lambda x, y: x > y,
'eq': lambda x, y: x == y,
}
cmp_msgs = {
'le': 'lesser or equal to',
'lt': 'lesser then',
'ge': 'greater or equal to',
'gt': 'greater then',
'eq': 'equal to',
}
def len(self, comparison, cint, msg_func=None):
cmp_func = self.cmp_funcs[comparison]
msg_func = msg_func or (lambda value: 'length of {0!r} is not {1} {2}'.format(value, self.cmp_msgs[comparison], cint))
self.checks.append(('check_func',
(lambda value, *args: (True, True, not cmp_func(len(value), cint))),
msg_func))
return self
def cmp(self, comparison, cint, msg_func=None):
if type(cint) is str:
self.type(unicode)
else:
self.type(type(cint))
cmp_func = self.cmp_funcs[comparison]
msg_func = msg_func or (lambda value: '{0} is not {1} {2}'.format(value, self.cmp_msgs[comparison], cint))
self.checks.append(('check_func',
(lambda value, *args: (True, True, not cmp_func(value, cint))),
msg_func))
return self
def unsigned(self, msg_func=None):
self.type(int)
self.checks.append(('check_func',
(lambda value, *args: (True, True, value < 0)),
lambda value: '{0} must be greater then zero'.format(value)))
return self
def list(self, item_func, msg_func=None):
self.type(list)
if isinstance(item_func, Spec):
self.specs.append(item_func)
item_func = len(self.specs) - 1
self.checks.append(('check_list', item_func, msg_func or (lambda item: 'failed check')))
return self
def tuple(self, *specs):
self.type(list)
max_len = len(specs)
min_len = max_len
for spec in reversed(specs):
if spec.isoptional:
min_len -= 1
else:
break
if max_len == min_len:
self.len('eq', len(specs))
else:
self.len('ge', min_len)
self.len('le', max_len)
start = len(self.specs)
for i, spec in zip(itertools.count(), specs):
self.specs.append(spec)
self.checks.append(('check_tuple', start, len(self.specs)))
return self
def func(self, func, msg_func=None):
self.checks.append(('check_func', func, msg_func or (lambda value: 'failed check')))
return self
def re(self, regex, msg_func=None):
self.type(unicode)
compiled = re.compile(regex)
msg_func = msg_func or (lambda value: 'String "{0}" does not match "{1}"'.format(value, regex))
self.checks.append(('check_func',
(lambda value, *args: (True, True, not compiled.match(value.value))),
msg_func))
return self
def ident(self, msg_func=None):
msg_func = msg_func or (lambda value: 'String "{0}" is not an alphanumeric/underscore identifier'.format(value))
return self.re('^\w+$', msg_func)
def oneof(self, collection, msg_func=None):
msg_func = msg_func or (lambda value: '"{0}" must be one of {1!r}'.format(value, list(collection)))
self.checks.append(('check_func',
lambda value, *args: (True, True, value not in collection),
msg_func))
return self
def either(self, *specs):
start = len(self.specs)
self.specs.extend(specs)
self.checks.append(('check_either', start, len(self.specs)))
return self
def optional(self):
self.isoptional = True
return self
def match_checks(self, *args):
hadproblem = False
for check in self.checks:
proceed, chadproblem = getattr(self, check[0])(*(args + check[1:]))
if chadproblem:
hadproblem = True
if not proceed:
return False, hadproblem
return True, hadproblem
def match(self, value, context_mark=None, data=None, context=EMPTYTUPLE, echoerr=echoerr):
proceed, hadproblem = self.match_checks(value, context_mark, data, context, echoerr)
if proceed:
if self.keys or self.uspecs:
for key, vali in self.keys.items():
valspec = self.specs[vali]
if key in value:
proceed, mhadproblem = valspec.match(value[key], value.mark, data, context + ((key, value[key]),), echoerr)
if mhadproblem:
hadproblem = True
if not proceed:
return False, hadproblem
else:
if not valspec.isoptional:
hadproblem = True
echoerr(context=self.cmsg.format(key=context_key(context)),
context_mark=None,
problem='required key is missing: {0}'.format(key),
problem_mark=value.mark)
for key in value.keys():
if key not in self.keys:
for keyfunc, vali in self.uspecs:
valspec = self.specs[vali]
if isinstance(keyfunc, int):
spec = self.specs[keyfunc]
proceed, khadproblem = spec.match(key, context_mark, data, context, echoerr)
else:
proceed, khadproblem = keyfunc(key, data, context, echoerr)
if khadproblem:
hadproblem = True
if proceed:
valspec.match(value[key], value.mark, data, context + ((key, value[key]),), echoerr)
break
else:
hadproblem = True
if self.ufailmsg:
echoerr(context=self.cmsg.format(key=context_key(context)),
context_mark=None,
problem=self.ufailmsg(key),
problem_mark=key.mark)
return True, hadproblem
class WithPath(object):
def __init__(self, import_paths):
self.import_paths = import_paths
def __enter__(self):
self.oldpath = sys.path
sys.path = self.import_paths + sys.path
def __exit__(self, *args):
sys.path = self.oldpath
def check_matcher_func(ext, match_name, data, context, echoerr):
import_paths = [os.path.expanduser(path) for path in context[0][1].get('common', {}).get('paths', [])]
match_module, separator, match_function = match_name.rpartition('.')
if not separator:
match_module = 'powerline.matchers.{0}'.format(ext)
match_function = match_name
with WithPath(import_paths):
try:
func = getattr(__import__(match_module, fromlist=[match_function]), unicode(match_function))
except ImportError:
echoerr(context='Error while loading matcher functions',
problem='failed to load module {0}'.format(match_module),
problem_mark=match_name.mark)
return True, True
except AttributeError:
echoerr(context='Error while loading matcher functions',
problem='failed to load matcher function {0}'.format(match_function),
problem_mark=match_name.mark)
return True, True
if not callable(func):
echoerr(context='Error while loading matcher functions',
problem='loaded "function" {0} is not callable'.format(match_function),
problem_mark=match_name.mark)
return True, True
if hasattr(func, 'func_code') and hasattr(func.func_code, 'co_argcount'):
if func.func_code.co_argcount != 1:
echoerr(context='Error while loading matcher functions',
problem='function {0} accepts {1} arguments instead of 1. Are you sure it is the proper function?'.format(match_function, func.func_code.co_argcount),
problem_mark=match_name.mark)
return True, False
def check_ext(ext, data, context, echoerr):
hadsomedirs = False
hadproblem = False
for subdir in ('themes', 'colorschemes'):
if ext not in data['configs'][subdir]:
hadproblem = True
echoerr(context='Error while loading {0} extension configuration'.format(ext),
context_mark=ext.mark,
problem='{0} configuration does not exist'.format(subdir))
else:
hadsomedirs = True
return hadsomedirs, hadproblem
def check_config(d, theme, data, context, echoerr):
if len(context) == 4:
ext = context[-2][0]
else:
# local_themes
ext = context[-3][0]
if ext not in data['configs']['themes'] or theme not in data['configs']['themes'][ext]:
echoerr(context='Error while loading {0} from {1} extension configuration'.format(d[:-1], ext),
problem='failed to find configuration file themes/{0}/{1}.json'.format(ext, theme),
problem_mark=theme.mark)
return True, False, True
return True, False, False
divider_spec = Spec().type(unicode).len('le', 3,
lambda value: 'Divider {0!r} is too large!'.format(value)).copy
divside_spec = Spec(
hard=divider_spec(),
soft=divider_spec(),
).copy
colorscheme_spec = Spec().type(unicode).func(lambda *args: check_config('colorschemes', *args)).copy
theme_spec = Spec().type(unicode).func(lambda *args: check_config('themes', *args)).copy
main_spec = (Spec(
common=Spec(
dividers=Spec(
left=divside_spec(),
right=divside_spec(),
),
spaces=Spec().unsigned().cmp('le', 2,
lambda value: 'Are you sure you need such a big ({0}) number of spaces?'.format(value)),
term_truecolor=Spec().type(bool).optional(),
# Python is capable of loading from zip archives. Thus checking path
# only for existence of the path, not for it being a directory
paths=Spec().list((lambda value, *args: (True, True, not os.path.exists(value.value))),
lambda value: 'path does not exist: {0}'.format(value)).optional(),
).context_message('Error while loading common configuration (key {key})'),
ext=Spec(
vim=Spec(
colorscheme=colorscheme_spec(),
theme=theme_spec(),
local_themes=Spec()
.unknown_spec(lambda *args: check_matcher_func('vim', *args), theme_spec())
),
).unknown_spec(check_ext,
Spec(
colorscheme=colorscheme_spec(),
theme=theme_spec(),
))
.context_message('Error while loading extensions configuration (key {key})'),
).context_message('Error while loading main configuration'))
term_color_spec = Spec().unsigned().cmp('le', 255).copy
true_color_spec = Spec().re('^[0-9a-fA-F]{6}$',
lambda value: '"{0}" is not a six-digit hexadecimal unsigned integer written as a string'.format(value)).copy
colors_spec = (Spec(
colors=Spec().unknown_spec(
Spec().ident(),
Spec().either(
Spec().tuple(term_color_spec(), true_color_spec()),
term_color_spec()))
.context_message('Error while checking colors (key {key})'),
gradients=Spec().unknown_spec(
Spec().ident(),
Spec().tuple(
Spec().len('gt', 1).list(term_color_spec()),
Spec().len('gt', 1).list(true_color_spec()).optional(),
)
).context_message('Error while checking gradients (key {key})'),
).context_message('Error while loading colors configuration'))
def check_color(color, data, context, echoerr):
if color not in data['colors_config'].get('colors', {}) and color not in data['colors_config'].get('gradients', {}):
echoerr(context='Error while checking highlight group in colorscheme (key {key})'.format(key=context_key(context)),
problem='found unexistent color or gradient {0}'.format(color),
problem_mark=color.mark)
return True, False, True
return True, False, False
def check_translated_group_name(group, data, context, echoerr):
if group not in context[0][1].get('groups', {}):
echoerr(context='Error while checking translated group in colorscheme (key {key})'.format(key=context_key(context)),
problem='translated group {0} is not in main groups dictionary'.format(group),
problem_mark=group.mark)
return True, False, True
return True, False, False
color_spec = Spec().type(unicode).func(check_color).copy
name_spec = Spec().type(unicode).len('gt', 0).optional().copy
group_spec = Spec(
fg=color_spec(),
bg=color_spec(),
attr=Spec().list(Spec().type(unicode).oneof(set(('bold', 'italic', 'underline')))).optional(),
).copy
group_name_spec = Spec().re('^\w+(?::\w+)?$').copy
groups_spec = Spec().unknown_spec(
group_name_spec(),
group_spec(),
).copy
colorscheme_spec = (Spec(
name=name_spec(),
groups=groups_spec().context_message('Error while loading groups (key {key})'),
).context_message('Error while loading coloscheme'))
vim_mode_spec = Spec().oneof(set(list(vim_modes) + ['nc'])).copy
vim_colorscheme_spec = (Spec(
name=name_spec(),
groups=groups_spec().context_message('Error while loading groups (key {key})'),
mode_translations=Spec().unknown_spec(
vim_mode_spec(),
Spec(
colors=Spec().unknown_spec(
color_spec(),
color_spec(),
).optional(),
groups=Spec().unknown_spec(
group_name_spec().func(check_translated_group_name),
group_spec(),
).optional(),
),
).context_message('Error while loading mode translations (key {key})'),
).context_message('Error while loading vim colorscheme'))
generic_keys = set(('exclude_modes', 'include_modes', 'width', 'align', 'name', 'draw_divider', 'priority', 'after', 'before'))
type_keys = {
'function': set(('args', 'module')),
'string': set(('contents', 'type', 'highlight_group', 'divider_highlight_group')),
'filler': set(('type', 'highlight_group', 'divider_highlight_group')),
}
required_keys = {
'function': set(),
'string': set(('contents', 'highlight_group')),
'filler': set(('highlight_group',)),
}
function_keys = set(('args', 'module'))
def check_key_compatibility(segment, data, context, echoerr):
segment_type = segment.get('type', 'function')
if segment_type not in type_keys:
echoerr(context='Error while checking segments (key {key})'.format(key=context_key(context)),
problem='found segment with unknown type {0}'.format(segment_type),
problem_mark=segment_type.mark)
return False, False, True
keys = set(segment)
if not ((keys - generic_keys) < type_keys[segment_type]):
unknown_keys = keys - generic_keys - type_keys[segment_type]
echoerr(context='Error while checking segments (key {key})'.format(key=context_key(context)),
context_mark=context[-1][1].mark,
problem='found keys not used with the current segment type: {0}'.format(
', '.join((unicode(key) for key in unknown_keys))),
problem_mark=list(unknown_keys)[0].mark)
return True, False, True
if not (keys > required_keys[segment_type]):
missing_keys = required_keys[segment_type] - keys
echoerr(context='Error while checking segments (key {key})'.format(key=context_key(context)),
context_mark=context[-1][1].mark,
problem='found missing required keys: {0}'.format(
', '.join((unicode(key) for key in missing_keys))))
return True, False, True
return True, False, False
def check_segment_module(module, data, context, echoerr):
with WithPath(data['import_paths']):
try:
__import__(unicode(module))
except ImportError:
echoerr(context='Error while checking segments (key {key})'.format(key=context_key(context)),
problem='failed to import module {0}'.format(module),
problem_mark=module.mark)
return True, False, True
return True, False, False
def check_full_segment_data(segment, data, context, echoerr):
if 'name' not in segment:
return True, False, False
ext = data['ext']
theme_segment_data = context[0][1].get('segment_data', {})
top_theme_name = data['main_config'].get('ext', {}).get(ext, {}).get('theme', None)
if not top_theme_name or data['theme'] == top_theme_name:
top_segment_data = {}
else:
top_segment_data = data['ext_theme_configs'].get(top_theme_name, {}).get('segment_data', {})
names = [segment['name']]
if segment.get('type', 'function') == 'function':
module = segment.get('module', context[0][1].get('default_module', 'powerline.segments.' + ext))
names.insert(0, unicode(module) + '.' + unicode(names[0]))
segment_copy = segment.copy()
for key in ('before', 'after', 'args', 'contents'):
if key not in segment_copy:
for segment_data in [theme_segment_data, top_segment_data]:
for name in names:
try:
val = segment_data[name][key]
# HACK to keep marks
l = list(segment_data[name])
k = l[l.index(key)]
segment_copy[k] = val
except KeyError:
pass
return check_key_compatibility(segment_copy, data, context, echoerr)
def check_segment_name(name, data, context, echoerr):
ext = data['ext']
if context[-2][1].get('type', 'function') == 'function':
module = context[-2][1].get('module', context[0][1].get('default_module', 'powerline.segments.' + ext))
with WithPath(data['import_paths']):
try:
func = getattr(__import__(unicode(module), fromlist=[unicode(name)]), unicode(name))
except ImportError:
echoerr(context='Error while checking segments (key {key})'.format(key=context_key(context)),
problem='failed to import module {0}'.format(module),
problem_mark=module.mark)
return True, False, True
except AttributeError:
echoerr(context='Error while loading segment function (key {key})'.format(key=context_key(context)),
problem='failed to load function {0} from module {1}'.format(name, module),
problem_mark=match_name.mark)
return True, False, True
if not callable(func):
echoerr(context='Error while checking segments (key {key})'.format(key=context_key(context)),
problem='imported "function" {0} from module {1} is not callable'.format(name, module),
problem_mark=module.mark)
return True, False, True
hl_groups = []
divider_hl_group = None
if func.__doc__:
H_G_USED_STR = 'Highlight groups used: '
D_H_G_USED_STR = 'Divider highlight group used: '
for line in func.__doc__.split('\n'):
if H_G_USED_STR in line:
hl_groups.append(line[line.index(H_G_USED_STR) + len(H_G_USED_STR):])
elif D_H_G_USED_STR in line:
divider_hl_group = line[line.index(D_H_G_USED_STR) + len(D_H_G_USED_STR) + 2:-3]
hadproblem = False
if divider_hl_group:
r = hl_exists(divider_hl_group, data, context, echoerr, allow_gradients=True)
if r:
echoerr(context='Error while checking theme (key {key})'.format(key=context_key(context)),
problem='found highlight group {0} not defined in the following colorschemes: {1}\n(Group name was obtained from function documentation.)'.format(
divider_hl_group, ', '.join(r)),
problem_mark=name.mark)
hadproblem = True
if hl_groups:
greg = re.compile(r'``([^`]+)``( \(gradient\))?')
hl_groups = [[greg.match(subs).groups() for subs in s.split(' or ')] for s in (', '.join(hl_groups)).split(', ')]
for required_pack in hl_groups:
rs = [hl_exists(hl_group, data, context, echoerr, allow_gradients=('force' if gradient else False))
for hl_group, gradient in required_pack]
if all(rs):
echoerr(context='Error while checking theme (key {key})'.format(key=context_key(context)),
problem='found highlight groups list ({0}) with all groups not defined in some colorschemes\n(Group names were taken from function documentation.)'.format(
', '.join((unicode(h[0]) for h in required_pack))),
problem_mark=name.mark)
for r, h in zip(rs, required_pack):
echoerr(context='Error while checking theme (key {key})'.format(key=context_key(context)),
problem='found highlight group {0} not defined in the following colorschemes: {1}'.format(
h[0], ', '.join(r)))
hadproblem = True
else:
r = hl_exists(name, data, context, echoerr, allow_gradients=True)
if r:
echoerr(context='Error while checking theme (key {key})'.format(key=context_key(context)),
problem='found highlight group {0} not defined in the following colorschemes: {1}\n(If not specified otherwise in documentation, highlight group for function segments\nis the same as the function name.)'.format(
name, ', '.join(r)),
problem_mark=name.mark)
hadproblem = True
return True, False, hadproblem
else:
if name not in context[0][1].get('segment_data', {}):
top_theme_name = data['main_config'].get('ext', {}).get(ext, {}).get('theme', None)
if data['theme'] == top_theme_name:
top_theme = {}
else:
top_theme = data['ext_theme_configs'].get(top_theme_name, {})
if name not in top_theme.get('segment_data', {}):
echoerr(context='Error while checking segments (key {key})'.format(key=context_key(context)),
problem='found useless use of name key (such name is not present in theme/segment_data)',
problem_mark=name.mark)
return True, False, False
def hl_exists(hl_group, data, context, echoerr, allow_gradients=False):
ext = data['ext']
if ext not in data['colorscheme_configs']:
# No colorschemes. Error was already reported, no need to report it
# twice
return []
r = []
for colorscheme, cconfig in data['colorscheme_configs'][ext].items():
if hl_group not in cconfig.get('groups', {}):
r.append(colorscheme)
elif not allow_gradients:
group_config = cconfig['groups'][hl_group]
hadgradient = False
for ckey in ('fg', 'bg'):
color = group_config.get(ckey)
if not color:
# No color. Error was already reported.
continue
# Gradients are only allowed for function segments. Note that
# whether *either* color or gradient exists should have been
# already checked
hascolor = color in data['colors_config'].get('colors', {})
hasgradient = color in data['colors_config'].get('gradients', {})
if hasgradient:
hadgradient = True
if allow_gradients is False and not hascolor and hasgradient:
echoerr(context='Error while checking highlight group in theme (key {key})'.format(key=context_key(context)),
context_mark=hl_group.mark,
problem='group {0} is using gradient {1} instead of a color'.format(hl_group, color),
problem_mark=color.mark)
r.append(colorscheme)
continue
if allow_gradients == 'force' and not hadgradient:
echoerr(context='Error while checking highlight group in theme (key {key})'.format(key=context_key(context)),
context_mark=hl_group.mark,
problem='group {0} should have at least one gradient color, but it has no'.format(hl_group),
problem_mark=group_config.mark)
r.append(colorscheme)
return r
def check_highlight_group(hl_group, data, context, echoerr):
r = hl_exists(hl_group, data, context, echoerr)
if r:
echoerr(context='Error while checking theme (key {key})'.format(key=context_key(context)),
problem='found highlight group {0} not defined in the following colorschemes: {1}'.format(
hl_group, ', '.join(r)),
problem_mark=hl_group.mark)
return True, False, True
return True, False, False
def check_highlight_groups(hl_groups, data, context, echoerr):
rs = [hl_exists(hl_group, data, context, echoerr) for hl_group in hl_groups]
if all(rs):
echoerr(context='Error while checking theme (key {key})'.format(key=context_key(context)),
problem='found highlight groups list ({0}) with all groups not defined in some colorschemes'.format(
', '.join((unicode(h) for h in hl_groups))),
problem_mark=hl_groups.mark)
for r, hl_group in zip(rs, hl_groups):
echoerr(context='Error while checking theme (key {key})'.format(key=context_key(context)),
problem='found highlight group {0} not defined in the following colorschemes: {1}'.format(
hl_group, ', '.join(r)),
problem_mark=hl_group.mark)
return True, False, True
return True, False, False
def check_segment_data_key(key, data, context, echoerr):
ext = data['ext']
top_theme_name = data['main_config'].get('ext', {}).get(ext, {}).get('theme', None)
is_top_theme = (data['theme'] == top_theme_name)
if is_top_theme:
themes = data['ext_theme_configs'].values()
else:
themes = [context[0][1]]
for theme in themes:
for segments in theme.get('segments', {}).values():
found = False
for segment in segments:
if 'name' in segment:
if key == segment['name']:
found = True
module = segment.get('module', theme.get('default_module', 'powerline.segments.' + ext))
if key == unicode(module) + '.' + unicode(segment['name']):
found = True
if found:
break
if found:
break
else:
echoerr(context='Error while checking segment data',
problem='found key {0} that cannot be associated with any segment'.format(key),
problem_mark=key.mark)
return True, False, True
return True, False, False
highlight_group_spec = Spec().type(unicode).copy
segment_module_spec = Spec().type(unicode).func(check_segment_module).optional().copy
segments_spec = Spec().optional().list(
Spec(
type=Spec().oneof(type_keys).optional(),
name=Spec().re('^[a-zA-Z_]\w+$').func(check_segment_name).optional(),
exclude_modes=Spec().list(vim_mode_spec()).optional(),
include_modes=Spec().list(vim_mode_spec()).optional(),
draw_divider=Spec().type(bool).optional(),
module=segment_module_spec(),
priority=Spec().cmp('ge', -1).optional(),
after=Spec().type(unicode).optional(),
before=Spec().type(unicode).optional(),
width=Spec().either(Spec().unsigned(), Spec().cmp('eq', 'auto')).optional(),
align=Spec().oneof(set('lr')).optional(),
# FIXME Check args
args=Spec().type(dict).optional(),
contents=Spec().type(unicode).optional(),
highlight_group=Spec().list(
highlight_group_spec().re('^(?:(?!:divider$).)+$',
lambda value: 'it is recommended that only divider highlight group names end with ":divider"')
).func(check_highlight_groups).optional(),
divider_highlight_group=highlight_group_spec().func(check_highlight_group).re(':divider$',
lambda value: 'it is recommended that divider highlight group names end with ":divider"').optional(),
).func(check_full_segment_data),
).copy
theme_spec = (Spec(
default_module=segment_module_spec(),
segment_data=Spec().unknown_spec(
Spec().func(check_segment_data_key),
Spec(
after=Spec().type(unicode).optional(),
before=Spec().type(unicode).optional(),
# FIXME Check args
args=Spec().type(dict).optional(),
contents=Spec().type(unicode).optional(),
),
).optional().context_message('Error while loading segment data (key {key})'),
segments=Spec(
left=segments_spec().context_message('Error while loading segments from left side (key {key})'),
right=segments_spec().context_message('Error while loading segments from right side (key {key})'),
).func(
lambda value, *args: (True, True, not (('left' in value) or ('right' in value))),
lambda value: 'segments dictionary must contain either left, right or both keys'
).context_message('Error while loading segments (key {key})'),
).context_message('Error while loading theme'))
def check(path=None):
search_paths = [path] if path else Powerline.get_config_paths()
dirs = {
'themes': defaultdict(lambda: []),
'colorschemes': defaultdict(lambda: [])
}
for path in reversed(search_paths):
for subdir in ('themes', 'colorschemes'):
d = os.path.join(path, subdir)
if os.path.isdir(d):
for ext in os.listdir(d):
extd = os.path.join(d, ext)
if os.path.isdir(extd):
dirs[subdir][ext].append(extd)
elif os.path.exists(d):
hadproblem = True
sys.stderr.write('Path {0} is supposed to be a directory, but it is not\n'.format(d))
configs = {
'themes': defaultdict(lambda: {}),
'colorschemes': defaultdict(lambda: {})
}
for subdir in ('themes', 'colorschemes'):
for ext in dirs[subdir]:
for d in dirs[subdir][ext]:
for config in os.listdir(d):
if config.endswith('.json'):
configs[subdir][ext][config[:-5]] = os.path.join(d, config)
diff = set(configs['themes']) ^ set(configs['colorschemes'])
if diff:
hadproblem = True
for ext in diff:
sys.stderr.write('{0} extension {1} present only in {2}\n'.format(
ext,
'configuration' if (ext in dirs['themes'] and ext in dirs['colorschemes']) else 'directory',
'themes' if ext in configs['themes'] else 'colorschemes',
))
lhadproblem = [False]
def load_config(stream):
r, hadproblem = load(stream)
if hadproblem:
lhadproblem[0] = True
return r
hadproblem = False
try:
main_config = load_json_config(search_paths, 'config', load=load_config, open=open_file)
except IOError:
main_config = {}
sys.stderr.write('\nConfiguration file not found: config.json\n')
hadproblem = True
except MarkedError as e:
main_config = {}
sys.stderr.write(str(e) + '\n')
hadproblem = True
else:
if main_spec.match(main_config, data={'configs': configs}, context=(('', main_config),))[1]:
hadproblem = True
import_paths = [os.path.expanduser(path) for path in main_config.get('common', {}).get('paths', [])]
try:
colors_config = load_json_config(search_paths, 'colors', load=load_config, open=open_file)
except IOError:
colors_config = {}
sys.stderr.write('\nConfiguration file not found: colors.json\n')
hadproblem = True
except MarkedError as e:
colors_config = {}
sys.stderr.write(str(e) + '\n')
hadproblem = True
else:
if colors_spec.match(colors_config, context=(('', colors_config),))[1]:
hadproblem = True
if lhadproblem[0]:
hadproblem = True
colorscheme_configs = defaultdict(lambda: {})
for ext in configs['colorschemes']:
data = {'ext': ext, 'colors_config': colors_config}
for colorscheme, cfile in configs['colorschemes'][ext].items():
with open_file(cfile) as config_file_fp:
try:
config, lhadproblem = load(config_file_fp)
except MarkedError as e:
sys.stderr.write(str(e) + '\n')
hadproblem = True
continue
if lhadproblem:
hadproblem = True
colorscheme_configs[ext][colorscheme] = config
if ext == 'vim':
spec = vim_colorscheme_spec
else:
spec = colorscheme_spec
if spec.match(config, context=(('', config),), data=data)[1]:
hadproblem = True
theme_configs = defaultdict(lambda: {})
for ext in configs['themes']:
for theme, sfile in configs['themes'][ext].items():
with open_file(sfile) as config_file_fp:
try:
config, lhadproblem = load(config_file_fp)
except MarkedError as e:
sys.stderr.write(str(e) + '\n')
hadproblem = True
continue
if lhadproblem:
hadproblem = True
theme_configs[ext][theme] = config
for ext, configs in theme_configs.items():
data = {'ext': ext, 'colorscheme_configs': colorscheme_configs, 'import_paths': import_paths,
'main_config': main_config, 'ext_theme_configs': configs, 'colors_config': colors_config}
for theme, config in configs.items():
data['theme'] = theme
if theme_spec.match(config, context=(('', config),), data=data)[1]:
hadproblem = True
return hadproblem

View File

@ -0,0 +1,17 @@
__version__ = '3.10'
from .loader import Loader
def load(stream, Loader=Loader):
"""
Parse the first YAML document in a stream
and produce the corresponding Python object.
"""
loader = Loader(stream)
try:
r = loader.get_single_data()
return r, loader.haserrors
finally:
loader.dispose()

View File

@ -0,0 +1,117 @@
__all__ = ['Composer', 'ComposerError']
from .error import MarkedError
from .events import * # NOQA
from .nodes import * # NOQA
class ComposerError(MarkedError):
pass
class Composer:
def __init__(self):
pass
def check_node(self):
# Drop the STREAM-START event.
if self.check_event(StreamStartEvent):
self.get_event()
# If there are more documents available?
return not self.check_event(StreamEndEvent)
def get_node(self):
# Get the root node of the next document.
if not self.check_event(StreamEndEvent):
return self.compose_document()
def get_single_node(self):
# Drop the STREAM-START event.
self.get_event()
# Compose a document if the stream is not empty.
document = None
if not self.check_event(StreamEndEvent):
document = self.compose_document()
# Ensure that the stream contains no more documents.
if not self.check_event(StreamEndEvent):
event = self.get_event()
raise ComposerError("expected a single document in the stream",
document.start_mark, "but found another document",
event.start_mark)
# Drop the STREAM-END event.
self.get_event()
return document
def compose_document(self):
# Drop the DOCUMENT-START event.
self.get_event()
# Compose the root node.
node = self.compose_node(None, None)
# Drop the DOCUMENT-END event.
self.get_event()
return node
def compose_node(self, parent, index):
self.descend_resolver(parent, index)
if self.check_event(ScalarEvent):
node = self.compose_scalar_node()
elif self.check_event(SequenceStartEvent):
node = self.compose_sequence_node()
elif self.check_event(MappingStartEvent):
node = self.compose_mapping_node()
self.ascend_resolver()
return node
def compose_scalar_node(self):
event = self.get_event()
tag = event.tag
if tag is None or tag == '!':
tag = self.resolve(ScalarNode, event.value, event.implicit, event.start_mark)
node = ScalarNode(tag, event.value,
event.start_mark, event.end_mark, style=event.style)
return node
def compose_sequence_node(self):
start_event = self.get_event()
tag = start_event.tag
if tag is None or tag == '!':
tag = self.resolve(SequenceNode, None, start_event.implicit)
node = SequenceNode(tag, [],
start_event.start_mark, None,
flow_style=start_event.flow_style)
index = 0
while not self.check_event(SequenceEndEvent):
node.value.append(self.compose_node(node, index))
index += 1
end_event = self.get_event()
node.end_mark = end_event.end_mark
return node
def compose_mapping_node(self):
start_event = self.get_event()
tag = start_event.tag
if tag is None or tag == '!':
tag = self.resolve(MappingNode, None, start_event.implicit)
node = MappingNode(tag, [],
start_event.start_mark, None,
flow_style=start_event.flow_style)
while not self.check_event(MappingEndEvent):
#key_event = self.peek_event()
item_key = self.compose_node(node, None)
#if item_key in node.value:
# raise ComposerError("while composing a mapping", start_event.start_mark,
# "found duplicate key", key_event.start_mark)
item_value = self.compose_node(node, item_key)
#node.value[item_key] = item_value
node.value.append((item_key, item_value))
end_event = self.get_event()
node.end_mark = end_event.end_mark
return node

View File

@ -0,0 +1,274 @@
__all__ = ['BaseConstructor', 'Constructor', 'ConstructorError']
from .error import MarkedError
from .nodes import * # NOQA
from .markedvalue import gen_marked_value
import collections
import types
from functools import wraps
try:
from __builtin__ import unicode
except ImportError:
unicode = str # NOQA
def marked(func):
@wraps(func)
def f(self, node, *args, **kwargs):
return gen_marked_value(func(self, node, *args, **kwargs), node.start_mark)
return f
class ConstructorError(MarkedError):
pass
class BaseConstructor:
yaml_constructors = {}
def __init__(self):
self.constructed_objects = {}
self.state_generators = []
self.deep_construct = False
def check_data(self):
# If there are more documents available?
return self.check_node()
def get_data(self):
# Construct and return the next document.
if self.check_node():
return self.construct_document(self.get_node())
def get_single_data(self):
# Ensure that the stream contains a single document and construct it.
node = self.get_single_node()
if node is not None:
return self.construct_document(node)
return None
def construct_document(self, node):
data = self.construct_object(node)
while self.state_generators:
state_generators = self.state_generators
self.state_generators = []
for generator in state_generators:
for dummy in generator:
pass
self.constructed_objects = {}
self.deep_construct = False
return data
def construct_object(self, node, deep=False):
if node in self.constructed_objects:
return self.constructed_objects[node]
if deep:
old_deep = self.deep_construct
self.deep_construct = True
constructor = None
tag_suffix = None
if node.tag in self.yaml_constructors:
constructor = self.yaml_constructors[node.tag]
else:
raise ConstructorError(None, None, 'no constructor for tag %s' % node.tag)
if tag_suffix is None:
data = constructor(self, node)
else:
data = constructor(self, tag_suffix, node)
if isinstance(data, types.GeneratorType):
generator = data
data = next(generator)
if self.deep_construct:
for dummy in generator:
pass
else:
self.state_generators.append(generator)
self.constructed_objects[node] = data
if deep:
self.deep_construct = old_deep
return data
@marked
def construct_scalar(self, node):
if not isinstance(node, ScalarNode):
raise ConstructorError(None, None,
"expected a scalar node, but found %s" % node.id,
node.start_mark)
return node.value
def construct_sequence(self, node, deep=False):
if not isinstance(node, SequenceNode):
raise ConstructorError(None, None,
"expected a sequence node, but found %s" % node.id,
node.start_mark)
return [self.construct_object(child, deep=deep)
for child in node.value]
@marked
def construct_mapping(self, node, deep=False):
if not isinstance(node, MappingNode):
raise ConstructorError(None, None,
"expected a mapping node, but found %s" % node.id,
node.start_mark)
mapping = {}
for key_node, value_node in node.value:
key = self.construct_object(key_node, deep=deep)
if not isinstance(key, collections.Hashable):
self.echoerr('While constructing a mapping', node.start_mark,
'found unhashable key', key_node.start_mark)
continue
elif type(key.value) != unicode:
self.echoerr('Error while constructing a mapping', node.start_mark,
'found key that is not a string', key_node.start_mark)
continue
elif key in mapping:
self.echoerr('Error while constructing a mapping', node.start_mark,
'found duplicate key', key_node.start_mark)
continue
value = self.construct_object(value_node, deep=deep)
mapping[key] = value
return mapping
@classmethod
def add_constructor(cls, tag, constructor):
if not 'yaml_constructors' in cls.__dict__:
cls.yaml_constructors = cls.yaml_constructors.copy()
cls.yaml_constructors[tag] = constructor
class Constructor(BaseConstructor):
def construct_scalar(self, node):
if isinstance(node, MappingNode):
for key_node, value_node in node.value:
if key_node.tag == 'tag:yaml.org,2002:value':
return self.construct_scalar(value_node)
return BaseConstructor.construct_scalar(self, node)
def flatten_mapping(self, node):
merge = []
index = 0
while index < len(node.value):
key_node, value_node = node.value[index]
if key_node.tag == 'tag:yaml.org,2002:merge':
del node.value[index]
if isinstance(value_node, MappingNode):
self.flatten_mapping(value_node)
merge.extend(value_node.value)
elif isinstance(value_node, SequenceNode):
submerge = []
for subnode in value_node.value:
if not isinstance(subnode, MappingNode):
raise ConstructorError("while constructing a mapping",
node.start_mark,
"expected a mapping for merging, but found %s"
% subnode.id, subnode.start_mark)
self.flatten_mapping(subnode)
submerge.append(subnode.value)
submerge.reverse()
for value in submerge:
merge.extend(value)
else:
raise ConstructorError("while constructing a mapping", node.start_mark,
"expected a mapping or list of mappings for merging, but found %s"
% value_node.id, value_node.start_mark)
elif key_node.tag == 'tag:yaml.org,2002:value':
key_node.tag = 'tag:yaml.org,2002:str'
index += 1
else:
index += 1
if merge:
node.value = merge + node.value
def construct_mapping(self, node, deep=False):
if isinstance(node, MappingNode):
self.flatten_mapping(node)
return BaseConstructor.construct_mapping(self, node, deep=deep)
@marked
def construct_yaml_null(self, node):
self.construct_scalar(node)
return None
@marked
def construct_yaml_bool(self, node):
value = self.construct_scalar(node).value
return bool(value)
@marked
def construct_yaml_int(self, node):
value = self.construct_scalar(node).value
sign = +1
if value[0] == '-':
sign = -1
if value[0] in '+-':
value = value[1:]
if value == '0':
return 0
else:
return sign * int(value)
@marked
def construct_yaml_float(self, node):
value = self.construct_scalar(node).value
sign = +1
if value[0] == '-':
sign = -1
if value[0] in '+-':
value = value[1:]
else:
return sign * float(value)
def construct_yaml_str(self, node):
return self.construct_scalar(node)
def construct_yaml_seq(self, node):
data = gen_marked_value([], node.start_mark)
yield data
data.extend(self.construct_sequence(node))
def construct_yaml_map(self, node):
data = gen_marked_value({}, node.start_mark)
yield data
value = self.construct_mapping(node)
data.update(value)
def construct_undefined(self, node):
raise ConstructorError(None, None,
"could not determine a constructor for the tag %r" % node.tag,
node.start_mark)
Constructor.add_constructor(
'tag:yaml.org,2002:null',
Constructor.construct_yaml_null)
Constructor.add_constructor(
'tag:yaml.org,2002:bool',
Constructor.construct_yaml_bool)
Constructor.add_constructor(
'tag:yaml.org,2002:int',
Constructor.construct_yaml_int)
Constructor.add_constructor(
'tag:yaml.org,2002:float',
Constructor.construct_yaml_float)
Constructor.add_constructor(
'tag:yaml.org,2002:str',
Constructor.construct_yaml_str)
Constructor.add_constructor(
'tag:yaml.org,2002:seq',
Constructor.construct_yaml_seq)
Constructor.add_constructor(
'tag:yaml.org,2002:map',
Constructor.construct_yaml_map)
Constructor.add_constructor(None,
Constructor.construct_undefined)

View File

@ -0,0 +1,96 @@
__all__ = ['Mark', 'MarkedError', 'echoerr', 'NON_PRINTABLE']
import sys
import re
try:
from __builtin__ import unichr
except ImportError:
unichr = chr # NOQA
NON_PRINTABLE = re.compile('[^\t\n\x20-\x7E' + unichr(0x85) + (unichr(0xA0) + '-' + unichr(0xD7FF)) + (unichr(0xE000) + '-' + unichr(0xFFFD)) + ']')
def repl(s):
return '<x%04x>' % ord(s.group())
def strtrans(s):
return NON_PRINTABLE.sub(repl, s.replace('\t', '>---'))
class Mark:
def __init__(self, name, line, column, buffer, pointer):
self.name = name
self.line = line
self.column = column
self.buffer = buffer
self.pointer = pointer
def get_snippet(self, indent=4, max_length=75):
if self.buffer is None:
return None
head = ''
start = self.pointer
while start > 0 and self.buffer[start - 1] not in '\0\n':
start -= 1
if self.pointer - start > max_length / 2 - 1:
head = ' ... '
start += 5
break
tail = ''
end = self.pointer
while end < len(self.buffer) and self.buffer[end] not in '\0\n':
end += 1
if end - self.pointer > max_length / 2 - 1:
tail = ' ... '
end -= 5
break
snippet = [self.buffer[start:self.pointer], self.buffer[self.pointer], self.buffer[self.pointer + 1:end]]
snippet = [strtrans(s) for s in snippet]
return ' ' * indent + head + ''.join(snippet) + tail + '\n' \
+ ' ' * (indent + len(head) + len(snippet[0])) + '^'
def __str__(self):
snippet = self.get_snippet()
where = " in \"%s\", line %d, column %d" \
% (self.name, self.line + 1, self.column + 1)
if snippet is not None:
where += ":\n" + snippet
if type(where) is str:
return where
else:
return where.encode('utf-8')
def echoerr(*args, **kwargs):
sys.stderr.write('\n')
sys.stderr.write(format_error(*args, **kwargs) + '\n')
def format_error(context=None, context_mark=None, problem=None, problem_mark=None, note=None):
lines = []
if context is not None:
lines.append(context)
if context_mark is not None \
and (problem is None or problem_mark is None
or context_mark.name != problem_mark.name
or context_mark.line != problem_mark.line
or context_mark.column != problem_mark.column):
lines.append(str(context_mark))
if problem is not None:
lines.append(problem)
if problem_mark is not None:
lines.append(str(problem_mark))
if note is not None:
lines.append(note)
return '\n'.join(lines)
class MarkedError(Exception):
def __init__(self, context=None, context_mark=None,
problem=None, problem_mark=None, note=None):
Exception.__init__(self, format_error(context, context_mark, problem,
problem_mark, note))

View File

@ -0,0 +1,97 @@
# Abstract classes.
class Event(object):
def __init__(self, start_mark=None, end_mark=None):
self.start_mark = start_mark
self.end_mark = end_mark
def __repr__(self):
attributes = [key for key in ['implicit', 'value']
if hasattr(self, key)]
arguments = ', '.join(['%s=%r' % (key, getattr(self, key))
for key in attributes])
return '%s(%s)' % (self.__class__.__name__, arguments)
class NodeEvent(Event):
def __init__(self, start_mark=None, end_mark=None):
self.start_mark = start_mark
self.end_mark = end_mark
class CollectionStartEvent(NodeEvent):
def __init__(self, implicit, start_mark=None, end_mark=None,
flow_style=None):
self.tag = None
self.implicit = implicit
self.start_mark = start_mark
self.end_mark = end_mark
self.flow_style = flow_style
class CollectionEndEvent(Event):
pass
# Implementations.
class StreamStartEvent(Event):
def __init__(self, start_mark=None, end_mark=None, encoding=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.encoding = encoding
class StreamEndEvent(Event):
pass
class DocumentStartEvent(Event):
def __init__(self, start_mark=None, end_mark=None,
explicit=None, version=None, tags=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.explicit = explicit
self.version = version
self.tags = tags
class DocumentEndEvent(Event):
def __init__(self, start_mark=None, end_mark=None,
explicit=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.explicit = explicit
class AliasEvent(NodeEvent):
pass
class ScalarEvent(NodeEvent):
def __init__(self, implicit, value,
start_mark=None, end_mark=None, style=None):
self.tag = None
self.implicit = implicit
self.value = value
self.start_mark = start_mark
self.end_mark = end_mark
self.style = style
class SequenceStartEvent(CollectionStartEvent):
pass
class SequenceEndEvent(CollectionEndEvent):
pass
class MappingStartEvent(CollectionStartEvent):
pass
class MappingEndEvent(CollectionEndEvent):
pass

View File

@ -0,0 +1,24 @@
__all__ = ['Loader']
from .reader import Reader
from .scanner import Scanner
from .parser import Parser
from .composer import Composer
from .constructor import Constructor
from .resolver import Resolver
from .error import echoerr
class Loader(Reader, Scanner, Parser, Composer, Constructor, Resolver):
def __init__(self, stream):
Reader.__init__(self, stream)
Scanner.__init__(self)
Parser.__init__(self)
Composer.__init__(self)
Constructor.__init__(self)
Resolver.__init__(self)
self.haserrors = False
def echoerr(self, *args, **kwargs):
echoerr(*args, **kwargs)
self.haserrors = True

View File

@ -0,0 +1,29 @@
__all__ = ['gen_marked_value', 'MarkedValue']
class MarkedValue:
def __init__(self, value, mark):
self.mark = mark
self.value = value
classcache = {}
def gen_marked_value(value, mark):
if value.__class__ in classcache:
Marked = classcache[value.__class__]
else:
class Marked(MarkedValue):
for func in value.__class__.__dict__:
if func not in set(('__init__', '__new__', '__getattribute__')):
if func in set(('__eq__',)):
# HACK to make marked dictionaries always work
exec (('def {0}(self, *args):\n'
' return self.value.{0}(*[arg.value if isinstance(arg, MarkedValue) else arg for arg in args])').format(func))
else:
exec (('def {0}(self, *args, **kwargs):\n'
' return self.value.{0}(*args, **kwargs)\n').format(func))
classcache[value.__class__] = Marked
return Marked(value, mark)

View File

@ -0,0 +1,53 @@
class Node(object):
def __init__(self, tag, value, start_mark, end_mark):
self.tag = tag
self.value = value
self.start_mark = start_mark
self.end_mark = end_mark
def __repr__(self):
value = self.value
#if isinstance(value, list):
# if len(value) == 0:
# value = '<empty>'
# elif len(value) == 1:
# value = '<1 item>'
# else:
# value = '<%d items>' % len(value)
#else:
# if len(value) > 75:
# value = repr(value[:70]+u' ... ')
# else:
# value = repr(value)
value = repr(value)
return '%s(tag=%r, value=%s)' % (self.__class__.__name__, self.tag, value)
class ScalarNode(Node):
id = 'scalar'
def __init__(self, tag, value,
start_mark=None, end_mark=None, style=None):
self.tag = tag
self.value = value
self.start_mark = start_mark
self.end_mark = end_mark
self.style = style
class CollectionNode(Node):
def __init__(self, tag, value,
start_mark=None, end_mark=None, flow_style=None):
self.tag = tag
self.value = value
self.start_mark = start_mark
self.end_mark = end_mark
self.flow_style = flow_style
class SequenceNode(CollectionNode):
id = 'sequence'
class MappingNode(CollectionNode):
id = 'mapping'

View File

@ -0,0 +1,240 @@
__all__ = ['Parser', 'ParserError']
from .error import MarkedError
from .tokens import * # NOQA
from .events import * # NOQA
class ParserError(MarkedError):
pass
class Parser:
def __init__(self):
self.current_event = None
self.yaml_version = None
self.states = []
self.marks = []
self.state = self.parse_stream_start
def dispose(self):
# Reset the state attributes (to clear self-references)
self.states = []
self.state = None
def check_event(self, *choices):
# Check the type of the next event.
if self.current_event is None:
if self.state:
self.current_event = self.state()
if self.current_event is not None:
if not choices:
return True
for choice in choices:
if isinstance(self.current_event, choice):
return True
return False
def peek_event(self):
# Get the next event.
if self.current_event is None:
if self.state:
self.current_event = self.state()
return self.current_event
def get_event(self):
# Get the next event and proceed further.
if self.current_event is None:
if self.state:
self.current_event = self.state()
value = self.current_event
self.current_event = None
return value
# stream ::= STREAM-START implicit_document? explicit_document* STREAM-END
# implicit_document ::= block_node DOCUMENT-END*
# explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END*
def parse_stream_start(self):
# Parse the stream start.
token = self.get_token()
event = StreamStartEvent(token.start_mark, token.end_mark,
encoding=token.encoding)
# Prepare the next state.
self.state = self.parse_implicit_document_start
return event
def parse_implicit_document_start(self):
# Parse an implicit document.
if not self.check_token(StreamEndToken):
token = self.peek_token()
start_mark = end_mark = token.start_mark
event = DocumentStartEvent(start_mark, end_mark, explicit=False)
# Prepare the next state.
self.states.append(self.parse_document_end)
self.state = self.parse_node
return event
else:
return self.parse_document_start()
def parse_document_start(self):
# Parse an explicit document.
if not self.check_token(StreamEndToken):
token = self.peek_token()
self.echoerr(None, None,
"expected '<stream end>', but found %r" % token.id,
token.start_mark)
return StreamEndEvent(token.start_mark, token.end_mark)
else:
# Parse the end of the stream.
token = self.get_token()
event = StreamEndEvent(token.start_mark, token.end_mark)
assert not self.states
assert not self.marks
self.state = None
return event
def parse_document_end(self):
# Parse the document end.
token = self.peek_token()
start_mark = end_mark = token.start_mark
explicit = False
event = DocumentEndEvent(start_mark, end_mark, explicit=explicit)
# Prepare the next state.
self.state = self.parse_document_start
return event
def parse_document_content(self):
return self.parse_node()
def parse_node(self, indentless_sequence=False):
start_mark = end_mark = None
if start_mark is None:
start_mark = end_mark = self.peek_token().start_mark
event = None
implicit = True
if self.check_token(ScalarToken):
token = self.get_token()
end_mark = token.end_mark
if token.plain:
implicit = (True, False)
else:
implicit = (False, True)
event = ScalarEvent(implicit, token.value,
start_mark, end_mark, style=token.style)
self.state = self.states.pop()
elif self.check_token(FlowSequenceStartToken):
end_mark = self.peek_token().end_mark
event = SequenceStartEvent(implicit,
start_mark, end_mark, flow_style=True)
self.state = self.parse_flow_sequence_first_entry
elif self.check_token(FlowMappingStartToken):
end_mark = self.peek_token().end_mark
event = MappingStartEvent(implicit,
start_mark, end_mark, flow_style=True)
self.state = self.parse_flow_mapping_first_key
else:
token = self.peek_token()
raise ParserError("while parsing a flow node", start_mark,
"expected the node content, but found %r" % token.id,
token.start_mark)
return event
def parse_flow_sequence_first_entry(self):
token = self.get_token()
self.marks.append(token.start_mark)
return self.parse_flow_sequence_entry(first=True)
def parse_flow_sequence_entry(self, first=False):
if not self.check_token(FlowSequenceEndToken):
if not first:
if self.check_token(FlowEntryToken):
self.get_token()
if self.check_token(FlowSequenceEndToken):
token = self.peek_token()
self.echoerr("While parsing a flow sequence", self.marks[-1],
"expected sequence value, but got %r" % token.id, token.start_mark)
else:
token = self.peek_token()
raise ParserError("while parsing a flow sequence", self.marks[-1],
"expected ',' or ']', but got %r" % token.id, token.start_mark)
if not self.check_token(FlowSequenceEndToken):
self.states.append(self.parse_flow_sequence_entry)
return self.parse_node()
token = self.get_token()
event = SequenceEndEvent(token.start_mark, token.end_mark)
self.state = self.states.pop()
self.marks.pop()
return event
def parse_flow_sequence_entry_mapping_end(self):
self.state = self.parse_flow_sequence_entry
token = self.peek_token()
return MappingEndEvent(token.start_mark, token.start_mark)
def parse_flow_mapping_first_key(self):
token = self.get_token()
self.marks.append(token.start_mark)
return self.parse_flow_mapping_key(first=True)
def parse_flow_mapping_key(self, first=False):
if not self.check_token(FlowMappingEndToken):
if not first:
if self.check_token(FlowEntryToken):
self.get_token()
if self.check_token(FlowMappingEndToken):
token = self.peek_token()
self.echoerr("While parsing a flow mapping", self.marks[-1],
"expected mapping key, but got %r" % token.id, token.start_mark)
else:
token = self.peek_token()
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected ',' or '}', but got %r" % token.id, token.start_mark)
if self.check_token(KeyToken):
token = self.get_token()
if not self.check_token(ValueToken,
FlowEntryToken, FlowMappingEndToken):
self.states.append(self.parse_flow_mapping_value)
return self.parse_node()
else:
token = self.peek_token()
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected value, but got %r" % token.id, token.start_mark)
elif not self.check_token(FlowMappingEndToken):
token = self.peek_token()
expect_key = self.check_token(ValueToken, FlowEntryToken)
if not expect_key:
self.get_token()
expect_key = self.check_token(ValueToken)
if expect_key:
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected string key, but got %r" % token.id, token.start_mark)
else:
token = self.peek_token()
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected ':', but got %r" % token.id, token.start_mark)
token = self.get_token()
event = MappingEndEvent(token.start_mark, token.end_mark)
self.state = self.states.pop()
self.marks.pop()
return event
def parse_flow_mapping_value(self):
if self.check_token(ValueToken):
token = self.get_token()
if not self.check_token(FlowEntryToken, FlowMappingEndToken):
self.states.append(self.parse_flow_mapping_key)
return self.parse_node()
token = self.peek_token()
raise ParserError("while parsing a flow mapping", self.marks[-1],
"expected mapping value, but got %r" % token.id, token.start_mark)

View File

@ -0,0 +1,136 @@
# This module contains abstractions for the input stream. You don't have to
# looks further, there are no pretty code.
__all__ = ['Reader', 'ReaderError']
from .error import MarkedError, Mark, NON_PRINTABLE
import codecs
try:
from __builtin__ import unicode
except ImportError:
unicode = str # NOQA
class ReaderError(MarkedError):
pass
class Reader(object):
# Reader:
# - determines the data encoding and converts it to a unicode string,
# - checks if characters are in allowed range,
# - adds '\0' to the end.
# Reader accepts
# - a file-like object with its `read` method returning `str`,
# Yeah, it's ugly and slow.
def __init__(self, stream):
self.name = None
self.stream = None
self.stream_pointer = 0
self.eof = True
self.buffer = ''
self.pointer = 0
self.full_buffer = unicode('')
self.full_pointer = 0
self.raw_buffer = None
self.raw_decode = codecs.utf_8_decode
self.encoding = 'utf-8'
self.index = 0
self.line = 0
self.column = 0
self.stream = stream
self.name = getattr(stream, 'name', "<file>")
self.eof = False
self.raw_buffer = None
while not self.eof and (self.raw_buffer is None or len(self.raw_buffer) < 2):
self.update_raw()
self.update(1)
def peek(self, index=0):
try:
return self.buffer[self.pointer + index]
except IndexError:
self.update(index + 1)
return self.buffer[self.pointer + index]
def prefix(self, length=1):
if self.pointer + length >= len(self.buffer):
self.update(length)
return self.buffer[self.pointer:self.pointer + length]
def update_pointer(self, length):
while length:
ch = self.buffer[self.pointer]
self.pointer += 1
self.full_pointer += 1
self.index += 1
if ch == '\n':
self.line += 1
self.column = 0
else:
self.column += 1
length -= 1
def forward(self, length=1):
if self.pointer + length + 1 >= len(self.buffer):
self.update(length + 1)
self.update_pointer(length)
def get_mark(self):
return Mark(self.name, self.line, self.column, self.full_buffer, self.full_pointer)
def check_printable(self, data):
match = NON_PRINTABLE.search(data)
if match:
character = match.group()
self.update_pointer(match.start())
raise ReaderError('while reading from stream', None,
'found special characters which are not allowed',
Mark(self.name, self.line, self.column, self.full_buffer, self.full_pointer))
def update(self, length):
if self.raw_buffer is None:
return
self.buffer = self.buffer[self.pointer:]
self.pointer = 0
while len(self.buffer) < length:
if not self.eof:
self.update_raw()
try:
data, converted = self.raw_decode(self.raw_buffer,
'strict', self.eof)
except UnicodeDecodeError as exc:
character = self.raw_buffer[exc.start]
position = self.stream_pointer - len(self.raw_buffer) + exc.start
data, converted = self.raw_decode(self.raw_buffer[:exc.start], 'strict', self.eof)
self.buffer += data
self.full_buffer += data + '<' + str(ord(character)) + '>'
self.raw_buffer = self.raw_buffer[converted:]
self.update_pointer(exc.start - 1)
raise ReaderError('while reading from stream', None,
'found character #x%04x that cannot be decoded by UTF-8 codec' % ord(character),
Mark(self.name, self.line, self.column, self.full_buffer, position))
self.buffer += data
self.full_buffer += data
self.raw_buffer = self.raw_buffer[converted:]
self.check_printable(data)
if self.eof:
self.buffer += '\0'
self.raw_buffer = None
break
def update_raw(self, size=4096):
data = self.stream.read(size)
if self.raw_buffer is None:
self.raw_buffer = data
else:
self.raw_buffer += data
self.stream_pointer += len(data)
if not data:
self.eof = True

View File

@ -0,0 +1,131 @@
__all__ = ['BaseResolver', 'Resolver']
from .error import MarkedError
from .nodes import * # NOQA
import re
class ResolverError(MarkedError):
pass
class BaseResolver:
DEFAULT_SCALAR_TAG = 'tag:yaml.org,2002:str'
DEFAULT_SEQUENCE_TAG = 'tag:yaml.org,2002:seq'
DEFAULT_MAPPING_TAG = 'tag:yaml.org,2002:map'
yaml_implicit_resolvers = {}
yaml_path_resolvers = {}
def __init__(self):
self.resolver_exact_paths = []
self.resolver_prefix_paths = []
@classmethod
def add_implicit_resolver(cls, tag, regexp, first):
if not 'yaml_implicit_resolvers' in cls.__dict__:
cls.yaml_implicit_resolvers = cls.yaml_implicit_resolvers.copy()
if first is None:
first = [None]
for ch in first:
cls.yaml_implicit_resolvers.setdefault(ch, []).append((tag, regexp))
def descend_resolver(self, current_node, current_index):
if not self.yaml_path_resolvers:
return
exact_paths = {}
prefix_paths = []
if current_node:
depth = len(self.resolver_prefix_paths)
for path, kind in self.resolver_prefix_paths[-1]:
if self.check_resolver_prefix(depth, path, kind,
current_node, current_index):
if len(path) > depth:
prefix_paths.append((path, kind))
else:
exact_paths[kind] = self.yaml_path_resolvers[path, kind]
else:
for path, kind in self.yaml_path_resolvers:
if not path:
exact_paths[kind] = self.yaml_path_resolvers[path, kind]
else:
prefix_paths.append((path, kind))
self.resolver_exact_paths.append(exact_paths)
self.resolver_prefix_paths.append(prefix_paths)
def ascend_resolver(self):
if not self.yaml_path_resolvers:
return
self.resolver_exact_paths.pop()
self.resolver_prefix_paths.pop()
def check_resolver_prefix(self, depth, path, kind,
current_node, current_index):
node_check, index_check = path[depth - 1]
if isinstance(node_check, str):
if current_node.tag != node_check:
return
elif node_check is not None:
if not isinstance(current_node, node_check):
return
if index_check is True and current_index is not None:
return
if (index_check is False or index_check is None) \
and current_index is None:
return
if isinstance(index_check, str):
if not (isinstance(current_index, ScalarNode)
and index_check == current_index.value):
return
elif isinstance(index_check, int) and not isinstance(index_check, bool):
if index_check != current_index:
return
return True
def resolve(self, kind, value, implicit, mark=None):
if kind is ScalarNode and implicit[0]:
if value == '':
resolvers = self.yaml_implicit_resolvers.get('', [])
else:
resolvers = self.yaml_implicit_resolvers.get(value[0], [])
resolvers += self.yaml_implicit_resolvers.get(None, [])
for tag, regexp in resolvers:
if regexp.match(value):
return tag
else:
self.echoerr('While resolving plain scalar', None,
'expected floating-point value, integer, null or boolean, but got %r' % value,
mark)
return self.DEFAULT_SCALAR_TAG
if kind is ScalarNode:
return self.DEFAULT_SCALAR_TAG
elif kind is SequenceNode:
return self.DEFAULT_SEQUENCE_TAG
elif kind is MappingNode:
return self.DEFAULT_MAPPING_TAG
class Resolver(BaseResolver):
pass
Resolver.add_implicit_resolver(
'tag:yaml.org,2002:bool',
re.compile(r'''^(?:true|false)$''', re.X),
list('yYnNtTfFoO'))
Resolver.add_implicit_resolver(
'tag:yaml.org,2002:float',
re.compile(r'^-?(?:0|[1-9]\d*)(?=[.eE])(?:\.\d+)?(?:[eE][-+]?\d+)?$', re.X),
list('-0123456789'))
Resolver.add_implicit_resolver(
'tag:yaml.org,2002:int',
re.compile(r'^(?:0|-?[1-9]\d*)$', re.X),
list('-0123456789'))
Resolver.add_implicit_resolver(
'tag:yaml.org,2002:null',
re.compile(r'^null$', re.X),
['n'])

View File

@ -0,0 +1,468 @@
# Scanner produces tokens of the following types:
# STREAM-START
# STREAM-END
# DOCUMENT-START
# DOCUMENT-END
# FLOW-SEQUENCE-START
# FLOW-MAPPING-START
# FLOW-SEQUENCE-END
# FLOW-MAPPING-END
# FLOW-ENTRY
# KEY
# VALUE
# SCALAR(value, plain, style)
#
# Read comments in the Scanner code for more details.
__all__ = ['Scanner', 'ScannerError']
from .error import MarkedError
from .tokens import * # NOQA
class ScannerError(MarkedError):
pass
class SimpleKey:
# See below simple keys treatment.
def __init__(self, token_number, index, line, column, mark):
self.token_number = token_number
self.index = index
self.line = line
self.column = column
self.mark = mark
class Scanner:
def __init__(self):
"""Initialize the scanner."""
# It is assumed that Scanner and Reader will have a common descendant.
# Reader do the dirty work of checking for BOM and converting the
# input data to Unicode. It also adds NUL to the end.
#
# Reader supports the following methods
# self.peek(i=0) # peek the next i-th character
# self.prefix(l=1) # peek the next l characters
# self.forward(l=1) # read the next l characters and move the pointer.
# Had we reached the end of the stream?
self.done = False
# The number of unclosed '{' and '['. `flow_level == 0` means block
# context.
self.flow_level = 0
# List of processed tokens that are not yet emitted.
self.tokens = []
# Add the STREAM-START token.
self.fetch_stream_start()
# Number of tokens that were emitted through the `get_token` method.
self.tokens_taken = 0
# Variables related to simple keys treatment.
# A simple key is a key that is not denoted by the '?' indicator.
# We emit the KEY token before all keys, so when we find a potential
# simple key, we try to locate the corresponding ':' indicator.
# Simple keys should be limited to a single line.
# Can a simple key start at the current position? A simple key may
# start:
# - after '{', '[', ',' (in the flow context),
self.allow_simple_key = False
# Keep track of possible simple keys. This is a dictionary. The key
# is `flow_level`; there can be no more that one possible simple key
# for each level. The value is a SimpleKey record:
# (token_number, index, line, column, mark)
# A simple key may start with SCALAR(flow), '[', or '{' tokens.
self.possible_simple_keys = {}
# Public methods.
def check_token(self, *choices):
# Check if the next token is one of the given types.
while self.need_more_tokens():
self.fetch_more_tokens()
if self.tokens:
if not choices:
return True
for choice in choices:
if isinstance(self.tokens[0], choice):
return True
return False
def peek_token(self):
# Return the next token, but do not delete if from the queue.
while self.need_more_tokens():
self.fetch_more_tokens()
if self.tokens:
return self.tokens[0]
def get_token(self):
# Return the next token.
while self.need_more_tokens():
self.fetch_more_tokens()
if self.tokens:
self.tokens_taken += 1
return self.tokens.pop(0)
# Private methods.
def need_more_tokens(self):
if self.done:
return False
if not self.tokens:
return True
# The current token may be a potential simple key, so we
# need to look further.
self.stale_possible_simple_keys()
if self.next_possible_simple_key() == self.tokens_taken:
return True
def fetch_more_tokens(self):
# Eat whitespaces and comments until we reach the next token.
self.scan_to_next_token()
# Remove obsolete possible simple keys.
self.stale_possible_simple_keys()
# Peek the next character.
ch = self.peek()
# Is it the end of stream?
if ch == '\0':
return self.fetch_stream_end()
# Note: the order of the following checks is NOT significant.
# Is it the flow sequence start indicator?
if ch == '[':
return self.fetch_flow_sequence_start()
# Is it the flow mapping start indicator?
if ch == '{':
return self.fetch_flow_mapping_start()
# Is it the flow sequence end indicator?
if ch == ']':
return self.fetch_flow_sequence_end()
# Is it the flow mapping end indicator?
if ch == '}':
return self.fetch_flow_mapping_end()
# Is it the flow entry indicator?
if ch == ',':
return self.fetch_flow_entry()
# Is it the value indicator?
if ch == ':' and self.flow_level:
return self.fetch_value()
# Is it a double quoted scalar?
if ch == '\"':
return self.fetch_double()
# It must be a plain scalar then.
if self.check_plain():
return self.fetch_plain()
# No? It's an error. Let's produce a nice error message.
raise ScannerError("while scanning for the next token", None,
"found character %r that cannot start any token" % ch,
self.get_mark())
# Simple keys treatment.
def next_possible_simple_key(self):
# Return the number of the nearest possible simple key. Actually we
# don't need to loop through the whole dictionary. We may replace it
# with the following code:
# if not self.possible_simple_keys:
# return None
# return self.possible_simple_keys[
# min(self.possible_simple_keys.keys())].token_number
min_token_number = None
for level in self.possible_simple_keys:
key = self.possible_simple_keys[level]
if min_token_number is None or key.token_number < min_token_number:
min_token_number = key.token_number
return min_token_number
def stale_possible_simple_keys(self):
# Remove entries that are no longer possible simple keys. According to
# the YAML specification, simple keys
# - should be limited to a single line,
# Disabling this procedure will allow simple keys of any length and
# height (may cause problems if indentation is broken though).
for level in list(self.possible_simple_keys):
key = self.possible_simple_keys[level]
if key.line != self.line:
del self.possible_simple_keys[level]
def save_possible_simple_key(self):
# The next token may start a simple key. We check if it's possible
# and save its position. This function is called for
# SCALAR(flow), '[', and '{'.
# The next token might be a simple key. Let's save it's number and
# position.
if self.allow_simple_key:
self.remove_possible_simple_key()
token_number = self.tokens_taken + len(self.tokens)
key = SimpleKey(token_number,
self.index, self.line, self.column, self.get_mark())
self.possible_simple_keys[self.flow_level] = key
def remove_possible_simple_key(self):
# Remove the saved possible key position at the current flow level.
if self.flow_level in self.possible_simple_keys:
del self.possible_simple_keys[self.flow_level]
# Fetchers.
def fetch_stream_start(self):
# We always add STREAM-START as the first token and STREAM-END as the
# last token.
# Read the token.
mark = self.get_mark()
# Add STREAM-START.
self.tokens.append(StreamStartToken(mark, mark,
encoding=self.encoding))
def fetch_stream_end(self):
# Reset simple keys.
self.remove_possible_simple_key()
self.allow_simple_key = False
self.possible_simple_keys = {}
# Read the token.
mark = self.get_mark()
# Add STREAM-END.
self.tokens.append(StreamEndToken(mark, mark))
# The steam is finished.
self.done = True
def fetch_flow_sequence_start(self):
self.fetch_flow_collection_start(FlowSequenceStartToken)
def fetch_flow_mapping_start(self):
self.fetch_flow_collection_start(FlowMappingStartToken)
def fetch_flow_collection_start(self, TokenClass):
# '[' and '{' may start a simple key.
self.save_possible_simple_key()
# Increase the flow level.
self.flow_level += 1
# Simple keys are allowed after '[' and '{'.
self.allow_simple_key = True
# Add FLOW-SEQUENCE-START or FLOW-MAPPING-START.
start_mark = self.get_mark()
self.forward()
end_mark = self.get_mark()
self.tokens.append(TokenClass(start_mark, end_mark))
def fetch_flow_sequence_end(self):
self.fetch_flow_collection_end(FlowSequenceEndToken)
def fetch_flow_mapping_end(self):
self.fetch_flow_collection_end(FlowMappingEndToken)
def fetch_flow_collection_end(self, TokenClass):
# Reset possible simple key on the current level.
self.remove_possible_simple_key()
# Decrease the flow level.
self.flow_level -= 1
# No simple keys after ']' or '}'.
self.allow_simple_key = False
# Add FLOW-SEQUENCE-END or FLOW-MAPPING-END.
start_mark = self.get_mark()
self.forward()
end_mark = self.get_mark()
self.tokens.append(TokenClass(start_mark, end_mark))
def fetch_value(self):
# Do we determine a simple key?
if self.flow_level in self.possible_simple_keys:
# Add KEY.
key = self.possible_simple_keys[self.flow_level]
del self.possible_simple_keys[self.flow_level]
self.tokens.insert(key.token_number - self.tokens_taken,
KeyToken(key.mark, key.mark))
# There cannot be two simple keys one after another.
self.allow_simple_key = False
# Add VALUE.
start_mark = self.get_mark()
self.forward()
end_mark = self.get_mark()
self.tokens.append(ValueToken(start_mark, end_mark))
def fetch_flow_entry(self):
# Simple keys are allowed after ','.
self.allow_simple_key = True
# Reset possible simple key on the current level.
self.remove_possible_simple_key()
# Add FLOW-ENTRY.
start_mark = self.get_mark()
self.forward()
end_mark = self.get_mark()
self.tokens.append(FlowEntryToken(start_mark, end_mark))
def fetch_double(self):
# A flow scalar could be a simple key.
self.save_possible_simple_key()
# No simple keys after flow scalars.
self.allow_simple_key = False
# Scan and add SCALAR.
self.tokens.append(self.scan_flow_scalar())
def fetch_plain(self):
self.save_possible_simple_key()
# No simple keys after plain scalars.
self.allow_simple_key = False
# Scan and add SCALAR. May change `allow_simple_key`.
self.tokens.append(self.scan_plain())
# Checkers.
def check_plain(self):
return self.peek() in '0123456789-ntf'
# Scanners.
def scan_to_next_token(self):
while self.peek() in ' \t\n':
self.forward()
def scan_flow_scalar(self):
# See the specification for details.
# Note that we loose indentation rules for quoted scalars. Quoted
# scalars don't need to adhere indentation because " and ' clearly
# mark the beginning and the end of them. Therefore we are less
# restrictive then the specification requires. We only need to check
# that document separators are not included in scalars.
chunks = []
start_mark = self.get_mark()
quote = self.peek()
self.forward()
chunks.extend(self.scan_flow_scalar_non_spaces(start_mark))
while self.peek() != quote:
chunks.extend(self.scan_flow_scalar_spaces(start_mark))
chunks.extend(self.scan_flow_scalar_non_spaces(start_mark))
self.forward()
end_mark = self.get_mark()
return ScalarToken(''.join(chunks), False, start_mark, end_mark, '"')
ESCAPE_REPLACEMENTS = {
'b': '\x08',
't': '\x09',
'n': '\x0A',
'f': '\x0C',
'r': '\x0D',
'\"': '\"',
'\\': '\\',
}
ESCAPE_CODES = {
'u': 4,
}
def scan_flow_scalar_non_spaces(self, start_mark):
# See the specification for details.
chunks = []
while True:
length = 0
while self.peek(length) not in '\"\\\0 \t\n':
length += 1
if length:
chunks.append(self.prefix(length))
self.forward(length)
ch = self.peek()
if ch == '\\':
self.forward()
ch = self.peek()
if ch in self.ESCAPE_REPLACEMENTS:
chunks.append(self.ESCAPE_REPLACEMENTS[ch])
self.forward()
elif ch in self.ESCAPE_CODES:
length = self.ESCAPE_CODES[ch]
self.forward()
for k in range(length):
if self.peek(k) not in '0123456789ABCDEFabcdef':
raise ScannerError("while scanning a double-quoted scalar", start_mark,
"expected escape sequence of %d hexdecimal numbers, but found %r" %
(length, self.peek(k)), self.get_mark())
code = int(self.prefix(length), 16)
chunks.append(chr(code))
self.forward(length)
else:
raise ScannerError("while scanning a double-quoted scalar", start_mark,
"found unknown escape character %r" % ch, self.get_mark())
else:
return chunks
def scan_flow_scalar_spaces(self, start_mark):
# See the specification for details.
chunks = []
length = 0
while self.peek(length) in ' \t':
length += 1
whitespaces = self.prefix(length)
self.forward(length)
ch = self.peek()
if ch == '\0':
raise ScannerError("while scanning a quoted scalar", start_mark,
"found unexpected end of stream", self.get_mark())
elif ch == '\n':
raise ScannerError("while scanning a quoted scalar", start_mark,
"found unexpected line end", self.get_mark())
else:
chunks.append(whitespaces)
return chunks
def scan_plain(self):
chunks = []
start_mark = self.get_mark()
spaces = []
while True:
length = 0
while True:
if self.peek(length) not in 'eE.0123456789nul-tr+fas':
break
length += 1
if length == 0:
break
self.allow_simple_key = False
chunks.extend(spaces)
chunks.append(self.prefix(length))
self.forward(length)
end_mark = self.get_mark()
return ScalarToken(''.join(chunks), True, start_mark, end_mark)

View File

@ -0,0 +1,65 @@
class Token(object):
def __init__(self, start_mark, end_mark):
self.start_mark = start_mark
self.end_mark = end_mark
def __repr__(self):
attributes = [key for key in self.__dict__
if not key.endswith('_mark')]
attributes.sort()
arguments = ', '.join(['%s=%r' % (key, getattr(self, key))
for key in attributes])
return '%s(%s)' % (self.__class__.__name__, arguments)
class StreamStartToken(Token):
id = '<stream start>'
def __init__(self, start_mark=None, end_mark=None,
encoding=None):
self.start_mark = start_mark
self.end_mark = end_mark
self.encoding = encoding
class StreamEndToken(Token):
id = '<stream end>'
class FlowSequenceStartToken(Token):
id = '['
class FlowMappingStartToken(Token):
id = '{'
class FlowSequenceEndToken(Token):
id = ']'
class FlowMappingEndToken(Token):
id = '}'
class KeyToken(Token):
id = '?'
class ValueToken(Token):
id = ':'
class FlowEntryToken(Token):
id = ','
class ScalarToken(Token):
id = '<scalar>'
def __init__(self, value, plain, start_mark, end_mark, style=None):
self.value = value
self.plain = plain
self.start_mark = start_mark
self.end_mark = end_mark
self.style = style

View File

@ -18,20 +18,8 @@ class Renderer(object):
self.theme_kwargs = theme_kwargs
self.colorscheme = colorscheme
def add_local_theme(self, matcher, theme):
if matcher in self.local_themes:
raise KeyError('There is already a local theme with given matcher')
self.local_themes[matcher] = theme
def get_theme(self, matcher_info):
for matcher in self.local_themes.keys():
if matcher(matcher_info):
match = self.local_themes[matcher]
if 'config' in match:
match['theme'] = Theme(theme_config=match.pop('config'), top_theme_config=self.theme_config, **self.theme_kwargs)
return match['theme']
else:
return self.theme
return self.theme
def get_highlighting(self, segment, mode):
segment['highlight'] = self.colorscheme.get_highlighting(segment['highlight_group'], mode, segment.get('gradient_level'))

View File

@ -5,6 +5,7 @@ from __future__ import absolute_import
from powerline.bindings.vim import vim_get_func
from powerline.renderer import Renderer
from powerline.colorscheme import ATTR_BOLD, ATTR_ITALIC, ATTR_UNDERLINE
from powerline.theme import Theme
import vim
@ -24,6 +25,21 @@ class VimRenderer(Renderer):
super(VimRenderer, self).__init__(*args, **kwargs)
self.hl_groups = {}
def add_local_theme(self, matcher, theme):
if matcher in self.local_themes:
raise KeyError('There is already a local theme with given matcher')
self.local_themes[matcher] = theme
def get_theme(self, matcher_info):
for matcher in self.local_themes.keys():
if matcher(matcher_info):
match = self.local_themes[matcher]
if 'config' in match:
match['theme'] = Theme(theme_config=match.pop('config'), top_theme_config=self.theme_config, **self.theme_kwargs)
return match['theme']
else:
return self.theme
def render(self, window_id, winidx, current):
'''Render all segments.

View File

@ -26,6 +26,8 @@ def user():
'''Return the current user.
Highlights the user with the ``superuser`` if the effective user ID is 0.
Highlight groups used: ``superuser`` or ``user``. It is recommended to define all highlight groups.
'''
user = os.environ.get('USER')
try:
@ -56,6 +58,11 @@ def cwd(dir_shorten_len=None, dir_limit_depth=None):
shorten parent directory names to this length (e.g. :file:`/long/path/to/powerline` :file:`/l/p/t/powerline`)
:param int dir_limit_depth:
limit directory depth to this number (e.g. :file:`/long/path/to/powerline` :file:`/to/powerline`)
Divider highlight group used: ``cwd:divider``.
Highlight groups used: ``cwd:current_folder`` or ``cwd``. It is recommended to define all highlight groups.
'''
import re
try:
@ -98,6 +105,10 @@ def date(format='%Y-%m-%d', istime=False):
:param str format:
strftime-style date format string
Divider highlight group used: ``time:divider``.
Highlight groups used: ``time`` or ``date``.
'''
return [{
'contents': datetime.now().strftime(format),
@ -172,6 +183,8 @@ def external_ip(query_url='http://ipv4.icanhazip.com/'):
:param str query_url:
URI to query for IP address, should return only the IP address as a text string
Divider highlight group used: ``background:divider``.
'''
return [{'contents': _external_ip(query_url=query_url), 'divider_highlight_group': 'background:divider'}]
@ -185,6 +198,8 @@ def uptime(format='{days:02d}d {hours:02d}h {minutes:02d}m'):
:param str format:
format string, will be passed ``days``, ``hours`` and ``minutes`` as arguments
Divider highlight group used: ``background:divider``.
'''
try:
import psutil
@ -294,6 +309,11 @@ def weather(unit='c', location_query=None, icons=None):
location query for your current location, e.g. ``oslo, norway``
:param dict icons:
dict for overriding default icons, e.g. ``{'heavy_snow' : u''}``
Divider highlight group used: ``background:divider``.
Highlight groups used: ``weather_conditions`` or ``weather``, ``weather_temp_cold`` or ``weather_temp_hot`` or ``weather_temp`` or ``weather``.
Also uses ``weather_conditions_{condition}`` for all weather conditions supported by Yahoo.
'''
import json
@ -359,6 +379,10 @@ def system_load(format='{avg:.1f}', threshold_good=1, threshold_bad=2):
threshold for "good load" highlighting
:param float threshold_bad:
threshold for "bad load" highlighting
Divider highlight group used: ``background:divider``.
Highlight groups used: ``system_load_good`` or ``system_load``, ``system_load_bad`` or ``system_load``, ``system_load_ugly`` or ``system_load``. It is recommended to define all highlight groups.
'''
cpu_num = cpu_count()
ret = []
@ -466,6 +490,8 @@ def email_imap_alert(username, password, server='imap.gmail.com', port=993, fold
e-mail server port
:param str folder:
folder to check for e-mails
Highlight groups used: ``email_alert``.
'''
import imaplib
import re

View File

@ -5,7 +5,10 @@ from powerline.theme import requires_segment_info
@requires_segment_info
def last_status(segment_info):
'''Return last exit code.'''
'''Return last exit code.
Highlight groups used: ``exit_fail``
'''
if not segment_info.last_exit_code:
return None
return [{'contents': str(segment_info.last_exit_code), 'highlight_group': 'exit_fail'}]
@ -13,7 +16,10 @@ def last_status(segment_info):
@requires_segment_info
def last_pipe_status(segment_info):
'''Return last pipe status.'''
'''Return last pipe status.
Highlight groups used: ``exit_fail``, ``exit_success``
'''
if any(segment_info.last_pipe_status):
return [{"contents": str(status), "highlight_group": "exit_fail" if status else "exit_success"}
for status in segment_info.last_pipe_status]

View File

@ -12,6 +12,7 @@ from powerline.bindings.vim import vim_get_func, getbufvar
from powerline.theme import requires_segment_info
from powerline.lib import memoize, humanize_bytes, add_divider_highlight_group
from powerline.lib.vcs import guess
from functools import wraps
from collections import defaultdict
vim_funcs = {
@ -87,6 +88,8 @@ def bufname(segment_info, **kwargs):
def window_cached(func):
cache = {}
@requires_segment_info
@wraps(func)
def ret(segment_info, *args, **kwargs):
window_id = segment_info['window_id']
if segment_info['mode'] == 'nc':
@ -95,8 +98,7 @@ def window_cached(func):
r = func(*args, **kwargs)
cache[window_id] = r
return r
ret = requires_segment_info(ret)
ret.__name__ = func.__name__
return ret
@ -213,6 +215,8 @@ def file_format(segment_info):
'''Return file format (i.e. line ending type).
:return: file format or None if unknown or missing file format
Divider highlight group used: ``background:divider``.
'''
return getbufvar(segment_info['bufnr'], '&fileformat') or None
@ -223,6 +227,8 @@ def file_encoding(segment_info):
'''Return file encoding/character set.
:return: file encoding/character set or None if unknown or missing file encoding
Divider highlight group used: ``background:divider``.
'''
return getbufvar(segment_info['bufnr'], '&fileencoding') or None
@ -233,6 +239,8 @@ def file_type(segment_info):
'''Return file type.
:return: file type or None if unknown file type
Divider highlight group used: ``background:divider``.
'''
return getbufvar(segment_info['bufnr'], '&filetype') or None
@ -243,6 +251,8 @@ def line_percent(segment_info, gradient=False):
:param bool gradient:
highlight the percentage with a color gradient (by default a green to red gradient)
Highlight groups used: ``line_percent_gradient`` (gradient) or ``line_percent``.
'''
line_current = segment_info['window'].cursor[0]
line_last = len(segment_info['buffer'])
@ -271,7 +281,10 @@ def col_current(segment_info):
@window_cached
def virtcol_current():
'''Return current visual column with concealed characters ingored'''
'''Return current visual column with concealed characters ingored
Highlight groups used: ``virtcol_current`` or ``col_current``.
'''
return [{'contents': str(vim_funcs['virtcol']('.')),
'highlight_group': ['virtcol_current', 'col_current']}]
@ -294,7 +307,10 @@ def modified_buffers(text='+ ', join_str=','):
@requires_segment_info
@memoize(2, cache_key=bufnr, cache_reg_func=purgeall_on_shell)
def branch(segment_info):
'''Return the current working branch.'''
'''Return the current working branch.
Divider highlight group used: ``branch:divider``.
'''
repo = guess(path=os.path.abspath(segment_info['buffer'].name or os.getcwd()))
if repo:
return [{
@ -307,7 +323,10 @@ def branch(segment_info):
@requires_segment_info
@memoize(2, cache_key=bufnr, cache_reg_func=purgebuf_on_shell_and_write)
def file_vcs_status(segment_info):
'''Return the VCS status for this buffer.'''
'''Return the VCS status for this buffer.
Highlight groups used: ``file_vcs_status``.
'''
name = segment_info['buffer'].name
if name and not getbufvar(segment_info['bufnr'], '&buftype'):
repo = guess(path=os.path.abspath(name))

View File

@ -65,12 +65,12 @@ class VimPowerline(Powerline):
'g:powerline_theme_overrides__' + name)
def get_local_themes(self, local_themes):
if not local_themes:
return {}
self.get_matcher = gen_matcher_getter(self.ext, self.import_paths)
r = {}
for key, local_theme_name in local_themes.items():
key = self.get_matcher(key)
r[key] = {'config': self.load_theme_config(local_theme_name)}
return r
return dict(((self.get_matcher(key), {'config': self.load_theme_config(val)})
for key, val in local_themes.items()))
def get_config_paths(self):
if vim_exists('g:powerline_config_path'):

14
scripts/powerline-lint Executable file
View File

@ -0,0 +1,14 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''Powerline configuration checker.'''
import argparse
from powerline.lint import check
import sys
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('-p', '--config_path', metavar='PATH')
if __name__ == '__main__':
args = parser.parse_args()
sys.exit(check(args.config_path))

View File

@ -25,6 +25,7 @@ setup(
url='https://github.com/Lokaltog/powerline',
scripts=[
'scripts/powerline',
'scripts/powerline-lint',
],
keywords='',
packages=find_packages(exclude=('tests', 'tests.*')),

View File

@ -5,6 +5,6 @@ if python -c 'import sys; sys.exit(1 * (sys.version_info[0] != 2))' ; then
pip install mercurial bzr
if python -c 'import sys; sys.exit(1 * (sys.version_info[1] >= 7))' ; then
# Python 2.6
pip install unittest2
pip install unittest2 argparse
fi
fi

View File

@ -1,5 +1,6 @@
#!/bin/sh
: ${PYTHON:=python}
FAILED=0
if ${PYTHON} -c 'import sys; sys.exit(1 * (sys.version_info >= (2, 7)))' ; then
# Python 2.6
export PYTHONPATH="${PYTHONPATH}:`realpath .`"
@ -9,5 +10,11 @@ if ${PYTHON} -c 'import sys; sys.exit(1 * (sys.version_info >= (2, 7)))' ; then
fi
done
else
${PYTHON} setup.py test
if ! ${PYTHON} setup.py test ; then
FAILED=1
fi
fi
if ! ${PYTHON} scripts/powerline-lint ; then
FAILED=1
fi
exit $FAILED