parent
a279ea236c
commit
d0c4d4e266
|
@ -109,7 +109,8 @@ class ConfigLoader(MultiRunnedThread):
|
||||||
|
|
||||||
:param function condition_function:
|
:param function condition_function:
|
||||||
Function which will be called each ``interval`` seconds. All
|
Function which will be called each ``interval`` seconds. All
|
||||||
exceptions from it will be ignored.
|
exceptions from it will be logged and ignored. IOError exception
|
||||||
|
will be ignored without logging.
|
||||||
:param function function:
|
:param function function:
|
||||||
Function which will be called if condition_function returns
|
Function which will be called if condition_function returns
|
||||||
something that is true. Accepts result of condition_function as an
|
something that is true. Accepts result of condition_function as an
|
||||||
|
@ -179,6 +180,8 @@ class ConfigLoader(MultiRunnedThread):
|
||||||
for condition_function, function in list(functions):
|
for condition_function, function in list(functions):
|
||||||
try:
|
try:
|
||||||
path = condition_function(key)
|
path = condition_function(key)
|
||||||
|
except IOError:
|
||||||
|
pass
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.exception('Error while running condition function for key {0}: {1}', key, str(e))
|
self.exception('Error while running condition function for key {0}: {1}', key, str(e))
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
# vim:fileencoding=utf-8:noet
|
||||||
|
from __future__ import unicode_literals, absolute_import, division
|
||||||
|
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
|
||||||
|
from subprocess import check_call
|
||||||
|
from operator import add
|
||||||
|
from shutil import rmtree
|
||||||
|
|
||||||
|
from powerline import Powerline
|
||||||
|
|
||||||
|
|
||||||
|
CONFIG_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'config')
|
||||||
|
|
||||||
|
|
||||||
|
class TestPowerline(Powerline):
|
||||||
|
def __init__(self, _paths, *args, **kwargs):
|
||||||
|
super(TestPowerline, self).__init__(*args, **kwargs)
|
||||||
|
self._paths = _paths
|
||||||
|
|
||||||
|
def get_config_paths(self):
|
||||||
|
return self._paths
|
||||||
|
|
||||||
|
|
||||||
|
def mkdir_recursive(directory):
|
||||||
|
if os.path.isdir(directory):
|
||||||
|
return
|
||||||
|
mkdir_recursive(os.path.dirname(directory))
|
||||||
|
os.mkdir(directory)
|
||||||
|
|
||||||
|
|
||||||
|
class FSTree(object):
|
||||||
|
__slots__ = ('tree', 'p', 'p_kwargs', 'create_p', 'get_config_paths', 'root')
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
tree,
|
||||||
|
p_kwargs={'run_once': True},
|
||||||
|
root=CONFIG_DIR,
|
||||||
|
get_config_paths=lambda p: (p,),
|
||||||
|
create_p=False
|
||||||
|
):
|
||||||
|
self.tree = tree
|
||||||
|
self.root = root
|
||||||
|
self.get_config_paths = get_config_paths
|
||||||
|
self.create_p = create_p
|
||||||
|
self.p = None
|
||||||
|
self.p_kwargs = p_kwargs
|
||||||
|
|
||||||
|
def __enter__(self, *args):
|
||||||
|
os.mkdir(self.root)
|
||||||
|
for k, v in self.tree.items():
|
||||||
|
fname = os.path.join(self.root, k) + '.json'
|
||||||
|
mkdir_recursive(os.path.dirname(fname))
|
||||||
|
with open(fname, 'w') as F:
|
||||||
|
json.dump(v, F)
|
||||||
|
if self.create_p:
|
||||||
|
self.p = TestPowerline(
|
||||||
|
_paths=self.get_config_paths(self.root),
|
||||||
|
ext='test',
|
||||||
|
renderer_module='tests.lib.config_mock',
|
||||||
|
**self.p_kwargs
|
||||||
|
)
|
||||||
|
if os.environ.get('POWERLINE_RUN_LINT_DURING_TESTS'):
|
||||||
|
try:
|
||||||
|
check_call(['scripts/powerline-lint'] + reduce(add, (
|
||||||
|
['-p', d] for d in self.p.get_config_paths()
|
||||||
|
)))
|
||||||
|
except:
|
||||||
|
self.__exit__()
|
||||||
|
raise
|
||||||
|
return self.p and self.p.__enter__(*args)
|
||||||
|
|
||||||
|
def __exit__(self, *args):
|
||||||
|
try:
|
||||||
|
rmtree(self.root)
|
||||||
|
finally:
|
||||||
|
if self.p:
|
||||||
|
self.p.__exit__(*args)
|
|
@ -0,0 +1,51 @@
|
||||||
|
# vim:fileencoding=utf-8:noet
|
||||||
|
from __future__ import unicode_literals, absolute_import, division
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
from powerline.lib.config import ConfigLoader
|
||||||
|
from tests import TestCase
|
||||||
|
from tests.lib.fsconfig import FSTree
|
||||||
|
|
||||||
|
|
||||||
|
FILE_ROOT = os.path.join(os.path.dirname(__file__), 'cfglib')
|
||||||
|
|
||||||
|
|
||||||
|
class LoadedList(list):
|
||||||
|
def pop_all(self):
|
||||||
|
try:
|
||||||
|
return self[:]
|
||||||
|
finally:
|
||||||
|
self[:] = ()
|
||||||
|
|
||||||
|
|
||||||
|
loaded = LoadedList()
|
||||||
|
|
||||||
|
|
||||||
|
def on_load(key):
|
||||||
|
loaded.append(key)
|
||||||
|
|
||||||
|
|
||||||
|
def check_file(path):
|
||||||
|
if os.path.exists(path):
|
||||||
|
return path
|
||||||
|
else:
|
||||||
|
raise IOError
|
||||||
|
|
||||||
|
|
||||||
|
class TestLoaderCondition(TestCase):
|
||||||
|
def test_update_missing(self):
|
||||||
|
loader = ConfigLoader(run_once=True)
|
||||||
|
fpath = os.path.join(FILE_ROOT, 'file.json')
|
||||||
|
self.assertRaises(IOError, loader.load, fpath)
|
||||||
|
loader.register_missing(check_file, on_load, fpath)
|
||||||
|
loader.update() # This line must not raise IOError
|
||||||
|
with FSTree({'file': {'test': 1}}, root=FILE_ROOT):
|
||||||
|
loader.update()
|
||||||
|
self.assertEqual(loader.load(fpath), {'test': 1})
|
||||||
|
self.assertEqual(loaded.pop_all(), [fpath])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
from tests import main
|
||||||
|
main()
|
Loading…
Reference in New Issue