From c334be416d1722bdf4fde694cb357adb872760df Mon Sep 17 00:00:00 2001 From: ZyX Date: Sat, 2 Mar 2013 18:40:13 +0400 Subject: [PATCH] Add tests.lib module with `replace_*()` with statement functions Note: there is new problem: now multiprocessing() globals got assigned None values. It is likely somehow related to extended `sys.modules` utilization --- tests/lib/__init__.py | 111 ++++++++++++++++++++++++++++++++ tests/test_segments.py | 139 ++++++++++++----------------------------- 2 files changed, 152 insertions(+), 98 deletions(-) create mode 100644 tests/lib/__init__.py diff --git a/tests/lib/__init__.py b/tests/lib/__init__.py new file mode 100644 index 00000000..e7df12de --- /dev/null +++ b/tests/lib/__init__.py @@ -0,0 +1,111 @@ +import imp +import sys +import os + + +class Args(object): + theme_option = None + config = None + config_path = None + ext = ['shell'] + renderer_module = None + + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + + +def urllib_read(query_url): + if query_url.startswith('http://ipv'): + if query_url.startswith('http://ipv4.icanhazip.com'): + return '127.0.0.1' + elif query_url.startswith('http://ipv4.icanhazip.com'): + return '2001:4801:7818:6:abc5:ba2c:ff10:275f' + elif query_url.startswith('http://freegeoip.net/json/'): + return '{"city": "Meppen", "region_code": "06", "region_name": "Niedersachsen", "areacode": "", "ip": "82.145.55.16", "zipcode": "49716", "longitude": 7.3167, "country_name": "Germany", "country_code": "DE", "metrocode": "", "latitude": 52.6833}' + elif query_url.startswith('http://query.yahooapis.com/v1/public/'): + return '{"query":{"count":1,"created":"2013-03-02T13:20:22Z","lang":"en-US","results":{"weather":{"rss":{"version":"2.0","geo":"http://www.w3.org/2003/01/geo/wgs84_pos#","yweather":"http://xml.weather.yahoo.com/ns/rss/1.0","channel":{"title":"Yahoo! Weather - Russia, RU","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Russia__RU/*http://weather.yahoo.com/forecast/RSXX1511_c.html","description":"Yahoo! Weather for Russia, RU","language":"en-us","lastBuildDate":"Sat, 02 Mar 2013 4:58 pm MSK","ttl":"60","location":{"city":"Russia","country":"Russia","region":""},"units":{"distance":"km","pressure":"mb","speed":"km/h","temperature":"C"},"wind":{"chill":"-11","direction":"0","speed":""},"atmosphere":{"humidity":"94","pressure":"1006.1","rising":"0","visibility":""},"astronomy":{"sunrise":"10:04 am","sunset":"7:57 pm"},"image":{"title":"Yahoo! Weather","width":"142","height":"18","link":"http://weather.yahoo.com","url":"http://l.yimg.com/a/i/brand/purplelogo//uh/us/news-wea.gif"},"item":{"title":"Conditions for Russia, RU at 4:58 pm MSK","lat":"59.45","long":"108.83","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Russia__RU/*http://weather.yahoo.com/forecast/RSXX1511_c.html","pubDate":"Sat, 02 Mar 2013 4:58 pm MSK","condition":{"code":"30","date":"Sat, 02 Mar 2013 4:58 pm MSK","temp":"-11","text":"Partly Cloudy"},"description":"
\nCurrent Conditions:
\nPartly Cloudy, -11 C
\n
Forecast:
\nSat - Partly Cloudy. High: -9 Low: -19
\nSun - Partly Cloudy. High: -12 Low: -18
\n
\nFull Forecast at Yahoo! Weather

\n(provided by The Weather Channel)
","forecast":[{"code":"29","date":"2 Mar 2013","day":"Sat","high":"-9","low":"-19","text":"Partly Cloudy"},{"code":"30","date":"3 Mar 2013","day":"Sun","high":"-12","low":"-18","text":"Partly Cloudy"}],"guid":{"isPermaLink":"false","content":"RSXX1511_2013_03_03_7_00_MSK"}}}}}}}}' + else: + raise NotImplementedError + + +class ModuleReplace(object): + def __init__(self, name, new): + self.name = name + self.new = new + + def __enter__(self): + self.old = sys.modules.get(self.name) + if not self.old: + try: + self.old = __import__(self.name) + except ImportError: + pass + sys.modules[self.name] = self.new + + def __exit__(self, *args): + if self.old: + sys.modules[self.name] = self.old + else: + sys.modules.pop(self.name) + + +def replace_module(name, new=None, **kwargs): + if not new: + new = new_module(name, **kwargs) + return ModuleReplace(name, new) + + +def new_module(name, **kwargs): + module = imp.new_module(name) + for k, v in kwargs.items(): + setattr(module, k, v) + return module + + +class ModuleAttrReplace(object): + def __init__(self, module, attr, new): + self.module = module + self.attr = attr + self.new = new + + def __enter__(self): + try: + self.old = getattr(self.module, self.attr) + except AttributeError: + pass + setattr(self.module, self.attr, self.new) + + def __exit__(self, *args): + try: + setattr(self.module, self.attr, self.old) + except AttributeError: + delattr(self.module, self.attr) + + +replace_module_attr = ModuleAttrReplace + + +def replace_module_module(module, name, **kwargs): + return replace_module_attr(module, name, new_module(name, **kwargs)) + + +class EnvReplace(object): + def __init__(self, name, new): + self.name = name + self.new = new + + def __enter__(self): + self.old = os.environ.get(self.name) + os.environ[self.name] = self.new + + def __exit__(self, *args): + if self.old is None: + try: + os.environ.pop(self.name) + except KeyError: + pass + else: + os.environ[self.name] = self.old + + +replace_env = EnvReplace diff --git a/tests/test_segments.py b/tests/test_segments.py index 30197bf1..4d53e353 100644 --- a/tests/test_segments.py +++ b/tests/test_segments.py @@ -5,18 +5,7 @@ from powerline.segments import shell, common import tests.vim as vim_module import sys import os -import imp - - -class Args(object): - theme_option = None - config = None - config_path = None - ext = ['shell'] - renderer_module = None - - def __init__(self, **kwargs): - self.__dict__.update(kwargs) +from .lib import Args, urllib_read, replace_module, replace_module_attr, new_module, replace_module_module, replace_env vim = None @@ -39,51 +28,34 @@ class TestShell(TestCase): class TestCommon(TestCase): def test_hostname(self): - os.environ['SSH_CLIENT'] = '192.168.0.12 40921 22' - socket = imp.new_module('socket') - socket.gethostname = lambda: 'abc' - sys.modules['socket'] = socket - try: - self.assertEqual(common.hostname(), 'abc') - self.assertEqual(common.hostname(only_if_ssh=True), 'abc') - os.environ.pop('SSH_CLIENT') - self.assertEqual(common.hostname(), 'abc') - self.assertEqual(common.hostname(only_if_ssh=True), None) - finally: - sys.modules.pop('socket') + with replace_env('SSH_CLIENT', '192.168.0.12 40921 22'): + with replace_module('socket', gethostname=lambda: 'abc'): + self.assertEqual(common.hostname(), 'abc') + self.assertEqual(common.hostname(only_if_ssh=True), 'abc') + os.environ.pop('SSH_CLIENT') + self.assertEqual(common.hostname(), 'abc') + self.assertEqual(common.hostname(only_if_ssh=True), None) def test_user(self): - new_os = imp.new_module('os') - new_os.environ = {'USER': 'def'} - common.os = new_os - try: + new_os = new_module('os', environ={'USER': 'def'}) + with replace_module_attr(common, 'os', new_os): self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': 'user'}]) new_os.geteuid = lambda: 1 self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': 'user'}]) new_os.geteuid = lambda: 0 self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': ['superuser', 'user']}]) - finally: - common.os = os def test_branch(self): - vcslib = imp.new_module('powerline.lib.vcs') - vcslib.guess = lambda path: Args(branch=lambda: os.path.basename(path)) - sys.modules['powerline.lib.vcs'] = vcslib - try: + vcslib = new_module('powerline.lib.vcs', guess=lambda path: Args(branch=lambda: os.path.basename(path))) + with replace_module('powerline.lib.vcs', vcslib): self.assertEqual(common.branch(), 'tests') vcslib.guess = lambda path: None self.assertEqual(common.branch(), None) - finally: - sys.modules.pop('powerline.lib.vcs') def test_cwd(self): - new_os = imp.new_module('os') - new_os.path = os.path - new_os.environ = {} - new_os.sep = '/' + new_os = new_module('os', path=os.path, environ={}, sep='/') new_os.getcwd = lambda: '/abc/def/ghi/foo/bar' - common.os = new_os - try: + with replace_module_attr(common, 'os', new_os): self.assertEqual(common.cwd(), [{'contents': '/', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'abc', 'divider_highlight_group': 'cwd:divider'}, @@ -130,25 +102,15 @@ class TestCommon(TestCase): new_os.getcwdu = lambda: raises(ValueError()) with self.assertRaises(ValueError): common.cwd(dir_limit_depth=2, dir_shorten_len=2), - finally: - common.os = os def test_date(self): - datetime = imp.new_module('datetime') - datetime.datetime = Args(now=lambda: Args(strftime=lambda fmt: fmt)) - sys.modules['datetime'] = datetime - try: + with replace_module('datetime', datetime=Args(now=lambda: Args(strftime=lambda fmt: fmt))): self.assertEqual(common.date(), [{'contents': '%Y-%m-%d', 'highlight_group': ['date'], 'divider_highlight_group': None}]) self.assertEqual(common.date(format='%H:%M', istime=True), [{'contents': '%H:%M', 'highlight_group': ['time', 'date'], 'divider_highlight_group': 'time:divider'}]) - finally: - sys.modules.pop('datetime') def test_fuzzy_time(self): - datetime = imp.new_module('datetime') time = Args(hour=0, minute=45) - datetime.datetime = Args(now=lambda: time) - sys.modules['datetime'] = datetime - try: + with replace_module('datetime', datetime=Args(now=lambda: time)): self.assertEqual(common.fuzzy_time(), 'quarter to one') time.hour = 23 time.minute = 59 @@ -157,16 +119,10 @@ class TestCommon(TestCase): self.assertEqual(common.fuzzy_time(), 'twenty-five to twelve') time.minute = 60 self.assertEqual(common.fuzzy_time(), 'twelve o\'clock') - finally: - sys.modules.pop('datetime') def test_external_ip(self): - old_urllib_read = common.urllib_read - common.urllib_read = lambda url: '127.0.0.1' - try: + with replace_module_attr(common, 'urllib_read', urllib_read): self.assertEqual(common.external_ip(), [{'contents': '127.0.0.1', 'divider_highlight_group': 'background:divider'}]) - finally: - common.urllib_read = old_urllib_read def test_uptime(self): # TODO @@ -177,43 +133,30 @@ class TestCommon(TestCase): pass def test_system_load(self): - new_os = imp.new_module('os') - new_os.getloadavg = lambda: (7.5, 3.5, 1.5) - multiprocessing = imp.new_module('multiprocessing') - multiprocessing.cpu_count = lambda: 2 - common.os = new_os - sys.modules['multiprocessing'] = multiprocessing - try: - self.assertEqual(common.system_load(), - [{'contents': '7.5 ', 'highlight_group': ['system_load_ugly', 'system_load'], 'draw_divider': True, 'divider_highlight_group': 'background:divider'}, - {'contents': '3.5 ', 'highlight_group': ['system_load_bad', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider'}, - {'contents': '1.5', 'highlight_group': ['system_load_good', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider'}]) - self.assertEqual(common.system_load(format='{avg:.0f}', threshold_good=0, threshold_bad=1), - [{'contents': '8 ', 'highlight_group': ['system_load_ugly', 'system_load'], 'draw_divider': True, 'divider_highlight_group': 'background:divider'}, - {'contents': '4 ', 'highlight_group': ['system_load_ugly', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider'}, - {'contents': '2', 'highlight_group': ['system_load_bad', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider'}]) - finally: - common.os = os - sys.modules.pop('multiprocessing') + with replace_module_module(common, 'os', getloadavg=lambda: (7.5, 3.5, 1.5)): + with replace_module('multiprocessing', cpu_count=lambda: 2): + self.assertEqual(common.system_load(), + [{'contents': '7.5 ', 'highlight_group': ['system_load_ugly', 'system_load'], 'draw_divider': True, 'divider_highlight_group': 'background:divider'}, + {'contents': '3.5 ', 'highlight_group': ['system_load_bad', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider'}, + {'contents': '1.5', 'highlight_group': ['system_load_good', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider'}]) + self.assertEqual(common.system_load(format='{avg:.0f}', threshold_good=0, threshold_bad=1), + [{'contents': '8 ', 'highlight_group': ['system_load_ugly', 'system_load'], 'draw_divider': True, 'divider_highlight_group': 'background:divider'}, + {'contents': '4 ', 'highlight_group': ['system_load_ugly', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider'}, + {'contents': '2', 'highlight_group': ['system_load_bad', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider'}]) def test_cpu_load_percent(self): - psutil = imp.new_module('psutil') - psutil.cpu_percent = lambda **kwargs: 52.3 - sys.modules['psutil'] = psutil - try: + with replace_module('psutil', cpu_percent=lambda **kwargs: 52.3): self.assertEqual(common.cpu_load_percent(), '52%') - finally: - sys.modules.pop('psutil') def test_network_load(self): # TODO pass def test_virtualenv(self): - os.environ['VIRTUAL_ENV'] = '/abc/def/ghi' - self.assertEqual(common.virtualenv(), 'ghi') - os.environ.pop('VIRTUAL_ENV') - self.assertEqual(common.virtualenv(), None) + with replace_env('VIRTUAL_ENV', '/abc/def/ghi'): + self.assertEqual(common.virtualenv(), 'ghi') + os.environ.pop('VIRTUAL_ENV') + self.assertEqual(common.virtualenv(), None) def test_email_imap_alert(self): # TODO @@ -274,15 +217,15 @@ class TestVim(TestCase): def test_file_directory(self): segment_info = vim_module._get_segment_info() self.assertEqual(vim.file_directory(segment_info=segment_info), None) - os.environ['HOME'] = '/home/foo' - vim_module._edit('/tmp/abc') - segment_info = vim_module._get_segment_info() - try: - self.assertEqual(vim.file_directory(segment_info=segment_info), '/tmp/') - os.environ['HOME'] = '/tmp' - self.assertEqual(vim.file_directory(segment_info=segment_info), '~/') - finally: - vim_module._bw(segment_info['bufnr']) + with replace_env('HOME', '/home/foo'): + vim_module._edit('/tmp/abc') + segment_info = vim_module._get_segment_info() + try: + self.assertEqual(vim.file_directory(segment_info=segment_info), '/tmp/') + os.environ['HOME'] = '/tmp' + self.assertEqual(vim.file_directory(segment_info=segment_info), '~/') + finally: + vim_module._bw(segment_info['bufnr']) def test_file_name(self): segment_info = vim_module._get_segment_info()