Replace memoize with threading where applicable

NOTE: Documentation now gets attached to *classes*, not actual segments. Hiding
      away classes (by changing their names to start with `_`) and/or doing

          self.__doc__ = self.__class__.__doc__

      does not work (hiding classes only hides documentation completely). I am
      not familiar with sphinx enough to say how this should be fixed.

Ref #168
This commit is contained in:
ZyX 2013-03-13 22:38:56 +04:00
parent a4adc92215
commit 62e731314e
8 changed files with 517 additions and 135 deletions

View File

@ -1,12 +1,7 @@
# vim:fileencoding=utf-8:noet
from functools import wraps
try:
# Python>=3.3, the only valid clock source for this job
from time import monotonic as time
except ImportError:
# System time, is affected by clock updates.
from time import time
from powerline.lib.time import monotonic
def default_cache_key(**kwargs):
@ -36,10 +31,10 @@ class memoize(object):
# Handle case when time() appears to be less then cached['time'] due
# to clock updates. Not applicable for monotonic clock, but this
# case is currently rare.
if cached is None or not (cached['time'] < time() < cached['time'] + self.timeout):
if cached is None or not (cached['time'] < monotonic() < cached['time'] + self.timeout):
cached = self.cache[key] = {
'result': func(**kwargs),
'time': time(),
'time': monotonic(),
}
return cached['result']
return decorated_function

125
powerline/lib/threaded.py Normal file
View File

@ -0,0 +1,125 @@
# vim:fileencoding=utf-8:noet
from __future__ import absolute_import
from powerline.lib.time import monotonic
from time import sleep
from threading import Thread, Lock
class ThreadedSegment(Thread):
daemon = True
min_sleep_time = 0.1
def __init__(self):
super(ThreadedSegment, self).__init__()
self.update_lock = Lock()
self.write_lock = Lock()
self.keep_going = True
self.run_once = True
self.did_set_interval = False
def __call__(self, **kwargs):
if self.run_once:
self.set_state(**kwargs)
self.update()
elif not self.is_alive():
self.startup(**kwargs)
with self.write_lock:
return self.render(**kwargs)
def sleep(self, adjust_time):
sleep(max(self.interval - adjust_time, self.min_sleep_time))
def run(self):
while self.keep_going:
start_time = monotonic()
with self.update_lock:
self.update()
self.sleep(monotonic() - start_time)
def shutdown(self):
self.keep_going = False
def set_interval(self, interval=None, **kwargs):
# Allowing “interval” keyword in configuration.
# Note: Here **kwargs is needed to support foreign data, in subclasses
# it can be seen in a number of places in order to support
# .set_interval().
interval = interval or getattr(self, 'interval', 1)
self.interval = interval
self.has_set_interval = True
def set_state(self, **kwargs):
if not self.did_set_interval:
self.set_interval(**kwargs)
def startup(self, **kwargs):
# Normally .update() succeeds to run before value is requested, meaning
# that user is getting values he needs directly at vim startup. Without
# .startup() we will not have to wait long until receiving bug “I opened
# vim, but branch information is only shown after I move cursor”.
self.run_once = False
self.set_state(**kwargs)
if not self.is_alive():
self.start()
def printed(func):
def f(*args, **kwargs):
print(func.__name__)
return func(*args, **kwargs)
return f
class KwThreadedSegment(ThreadedSegment):
drop_interval = 10 * 60
def __init__(self):
super(KwThreadedSegment, self).__init__()
self.queries = {}
self.update_missing = True
@staticmethod
def key(**kwargs):
return frozenset(kwargs.items())
def render(self, **kwargs):
key = self.key(**kwargs)
try:
update_state = self.queries[key][1]
except KeyError:
update_state = self.compute_state(key) if self.update_missing else None
# No locks: render method is already running with write_lock acquired.
self.queries[key] = (monotonic(), update_state)
return self.render_one(update_state, **kwargs)
def update(self):
updates = {}
removes = []
for key, (last_query_time, state) in list(self.queries.items()):
if last_query_time < monotonic() < last_query_time + self.drop_interval:
updates[key] = (last_query_time, self.compute_state(key))
else:
removes.append(key)
with self.write_lock:
self.queries.update(updates)
for key in removes:
self.queries.pop(key)
def set_state(self, **kwargs):
if not self.did_set_interval or ('interval' in kwargs and self.interval > kwargs['interval']):
self.set_interval(**kwargs)
key = self.key(**kwargs)
self.queries[key] = (monotonic(), None)
@staticmethod
def render_one(update_state, **kwargs):
return update_state

103
powerline/lib/time.py Normal file
View File

@ -0,0 +1,103 @@
# vim:fileencoding=utf-8:noet
from __future__ import division, absolute_import
try:
try:
# >=python-3.3, Unix
from time import clock_gettime
try:
# >={kernel}-sources-2.6.28
from time import CLOCK_MONOTONIC_RAW as CLOCK_ID
except ImportError:
from time import CLOCK_MONOTONIC as CLOCK_ID # NOQA
monotonic = lambda: clock_gettime(CLOCK_ID)
except ImportError:
# >=python-3.3
from time import monotonic # NOQA
except ImportError:
import ctypes
import sys
try:
if sys.platform == 'win32':
# Windows only
GetTickCount64 = ctypes.windll.kernel32.GetTickCount64
GetTickCount64.restype = ctypes.c_ulonglong
def monotonic(): # NOQA
return GetTickCount64() / 1000
elif sys.platform == 'darwin':
# Mac OS X
from ctypes.util import find_library
libc_name = find_library('c')
if not libc_name:
raise OSError
libc = ctypes.CDLL(libc_name, use_errno=True)
mach_absolute_time = libc.mach_absolute_time
mach_absolute_time.argtypes = ()
mach_absolute_time.restype = ctypes.c_uint64
class mach_timebase_info_data_t(ctypes.Structure):
_fields_ = (
('numer', ctypes.c_uint32),
('denom', ctypes.c_uint32),
)
mach_timebase_info_data_p = ctypes.POINTER(mach_timebase_info_data_t)
_mach_timebase_info = libc.mach_timebase_info
_mach_timebase_info.argtypes = (mach_timebase_info_data_p,)
_mach_timebase_info.restype = ctypes.c_int
def mach_timebase_info():
timebase = mach_timebase_info_data_t()
_mach_timebase_info(ctypes.byref(timebase))
return (timebase.numer, timebase.denom)
timebase = mach_timebase_info()
factor = timebase[0] / timebase[1] * 1e-9
def monotonic(): # NOQA
return mach_absolute_time() * factor
else:
# linux only (no librt on OS X)
import os
# See <bits/time.h>
CLOCK_MONOTONIC = 1
CLOCK_MONOTONIC_RAW = 4
class timespec(ctypes.Structure):
_fields_ = (
('tv_sec', ctypes.c_long),
('tv_nsec', ctypes.c_long)
)
tspec = timespec()
librt = ctypes.CDLL('librt.so.1', use_errno=True)
clock_gettime = librt.clock_gettime
clock_gettime.argtypes = [ctypes.c_int, ctypes.POINTER(timespec)]
if clock_gettime(CLOCK_MONOTONIC_RAW, ctypes.pointer(tspec)) == 0:
# >={kernel}-sources-2.6.28
clock_id = CLOCK_MONOTONIC_RAW
elif clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(tspec)) == 0:
clock_id = CLOCK_MONOTONIC
else:
raise OSError
def monotonic(): # NOQA
if clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(tspec)) != 0:
errno_ = ctypes.get_errno()
raise OSError(errno_, os.strerror(errno_))
return tspec.tv_sec + tspec.tv_nsec / 1e9
except:
from time import time as monotonic # NOQA

View File

@ -11,6 +11,6 @@ except ImportError:
def urllib_read(url):
try:
return urlopen(url, timeout=100).read().decode('utf-8')
return urlopen(url, timeout=10).read().decode('utf-8')
except HTTPError:
return

View File

@ -1,7 +1,6 @@
# vim:fileencoding=utf-8:noet
from __future__ import absolute_import
import os
from powerline.lib.memoize import memoize
vcs_props = (
@ -21,7 +20,6 @@ def generate_directories(path):
yield path
@memoize(100)
def guess(path):
for directory in generate_directories(path):
for vcs, vcs_dir, check in vcs_props:

View File

@ -8,9 +8,12 @@ import socket
from multiprocessing import cpu_count
from powerline.lib import add_divider_highlight_group
from powerline.lib.memoize import memoize
from powerline.lib.url import urllib_read, urllib_urlencode
from powerline.lib.vcs import guess
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment
from powerline.lib.time import monotonic
from powerline.lib.humanize_bytes import humanize_bytes
from collections import namedtuple, defaultdict
def hostname(only_if_ssh=False):
@ -163,12 +166,11 @@ def fuzzy_time():
return ' '.join([minute, hour])
@memoize(600)
def _external_ip(query_url='http://ipv4.icanhazip.com/'):
return urllib_read(query_url).strip()
def external_ip(query_url='http://ipv4.icanhazip.com/'):
class ExternalIpSegment(ThreadedSegment):
'''Return external IP address.
Suggested URIs:
@ -182,7 +184,21 @@ def external_ip(query_url='http://ipv4.icanhazip.com/'):
Divider highlight group used: ``background:divider``.
'''
return [{'contents': _external_ip(query_url=query_url), 'divider_highlight_group': 'background:divider'}]
def set_state(self, query_url='http://ipv4.icanhazip.com/', **kwargs):
super(ExternalIpSegment, self).set_state(**kwargs)
self.query_url = query_url
def update(self):
ip = _external_ip(query_url=self.query_url)
with self.write_lock:
self.ip = ip
def render(self):
return [{'contents': self.ip, 'divider_highlight_group': 'background:divider'}]
external_ip = ExternalIpSegment()
# Weather condition code descriptions available at
@ -261,8 +277,7 @@ weather_conditions_icons = {
}
@memoize(1800)
def weather(unit='c', location_query=None, icons=None):
class WeatherSegment(ThreadedSegment):
'''Return weather from Yahoo! Weather.
Uses GeoIP lookup from http://freegeoip.net/ to automatically determine
@ -284,55 +299,87 @@ def weather(unit='c', location_query=None, icons=None):
Highlight groups used: ``weather_conditions`` or ``weather``, ``weather_temp_cold`` or ``weather_temp_hot`` or ``weather_temp`` or ``weather``.
Also uses ``weather_conditions_{condition}`` for all weather conditions supported by Yahoo.
'''
import json
if not location_query:
interval = 600
def set_state(self, location_query=None, unit='c', **kwargs):
super(WeatherSegment, self).set_state(**kwargs)
self.location = location_query
self.url = None
self.condition = {}
self.unit = unit
def update(self):
import json
if not self.url:
# Do not lock attribute assignments in this branch: they are used
# only in .update()
if not self.location:
try:
location_data = json.loads(urllib_read('http://freegeoip.net/json/' + _external_ip()))
self.location = ','.join([location_data['city'],
location_data['region_name'],
location_data['country_name']])
except (TypeError, ValueError):
return
query_data = {
'q':
'use "http://github.com/yql/yql-tables/raw/master/weather/weather.bylocation.xml" as we;'
'select * from we where location="{0}" and unit="{1}"'.format(self.location, self.unit).encode('utf-8'),
'format': 'json',
}
self.url = 'http://query.yahooapis.com/v1/public/yql?' + urllib_urlencode(query_data)
try:
location = json.loads(urllib_read('http://freegeoip.net/json/' + _external_ip()))
location_query = ','.join([location['city'], location['region_name'], location['country_name']])
except (TypeError, ValueError):
raw_response = urllib_read(self.url)
response = json.loads(raw_response)
condition = response['query']['results']['weather']['rss']['channel']['item']['condition']
condition_code = int(condition['code'])
contents = '{0}°{1}'.format(condition['temp'], self.unit.upper())
temp = int(condition['temp'])
except (KeyError, TypeError, ValueError):
return
try:
icon_names = weather_conditions_codes[condition_code]
except IndexError:
icon_names = (('not_available' if condition_code == 3200 else 'unknown'),)
with self.write_lock:
self.contents = contents
self.temp = temp
self.icon_names = icon_names
def render(self, icons=None, **kwargs):
if not hasattr(self, 'icon_names'):
return None
query_data = {
'q':
'use "http://github.com/yql/yql-tables/raw/master/weather/weather.bylocation.xml" as we;'
'select * from we where location="{0}" and unit="{1}"'.format(location_query, unit).encode('utf-8'),
'format': 'json'
}
try:
url = 'http://query.yahooapis.com/v1/public/yql?' + urllib_urlencode(query_data)
response = json.loads(urllib_read(url))
condition = response['query']['results']['weather']['rss']['channel']['item']['condition']
condition_code = int(condition['code'])
except (KeyError, TypeError, ValueError):
return None
try:
icon_names = weather_conditions_codes[condition_code]
except IndexError:
icon_names = (('not_available' if condition_code == 3200 else 'unknown'),)
for icon_name in self.icon_names:
if icons:
if icon_name in icons:
icon = icons[icon_name]
break
else:
icon = weather_conditions_icons[self.icon_names[-1]]
for icon_name in icon_names:
if icons:
if icon_name in icons:
icon = icons[icon_name]
break
else:
icon = weather_conditions_icons[icon_names[-1]]
groups = ['weather_condition_' + icon_name for icon_name in self.icon_names] + ['weather_conditions', 'weather']
return [
{
'contents': icon + ' ',
'highlight_group': groups,
'divider_highlight_group': 'background:divider',
},
{
'contents': self.contents,
'highlight_group': ['weather_temp_cold' if int(self.temp) < 0 else 'weather_temp_hot', 'weather_temp', 'weather'],
'draw_divider': False,
'divider_highlight_group': 'background:divider',
},
]
groups = ['weather_condition_' + icon_name for icon_name in icon_names] + ['weather_conditions', 'weather']
return [
{
'contents': icon + ' ',
'highlight_group': groups,
'divider_highlight_group': 'background:divider',
},
{
'contents': '{0}°{1}'.format(condition['temp'], unit.upper()),
'highlight_group': ['weather_temp_cold' if int(condition['temp']) < 0 else 'weather_temp_hot', 'weather_temp', 'weather'],
'draw_divider': False,
'divider_highlight_group': 'background:divider',
},
]
weather = WeatherSegment()
def system_load(format='{avg:.1f}', threshold_good=1, threshold_bad=2):
@ -487,8 +534,29 @@ def uptime(format='{days:02d}d {hours:02d}h {minutes:02d}m'):
return format.format(days=int(days), hours=hours, minutes=minutes)
@add_divider_highlight_group('background:divider')
def network_load(interface='eth0', measure_interval=1, suffix='B/s', si_prefix=False):
try:
import psutil
def get_bytes(interface):
io_counters = psutil.network_io_counters(pernic=True)
if_io = io_counters.get(interface)
if not if_io:
return None
return if_io.bytes_recv, if_io.bytes_sent
except ImportError:
def get_bytes(interface):
try:
with open('/sys/class/net/{interface}/statistics/rx_bytes'.format(interface=interface), 'rb') as file_obj:
rx = int(file_obj.read())
with open('/sys/class/net/{interface}/statistics/tx_bytes'.format(interface=interface), 'rb') as file_obj:
tx = int(file_obj.read())
return (rx, tx)
except IOError:
return None
class _NetworkLoadSegment(KwThreadedSegment):
'''Return the network load.
Uses the ``psutil`` module if available for multi-platform compatibility,
@ -497,15 +565,11 @@ def network_load(interface='eth0', measure_interval=1, suffix='B/s', si_prefix=F
:param str interface:
network interface to measure
:param float measure_interval:
interval used to measure the network load (in seconds)
:param str suffix:
string appended to each load string
:param bool si_prefix:
use SI prefix, e.g. MB instead of MiB
'''
import time
from powerline.lib import humanize_bytes
interfaces = {}
@ -516,7 +580,10 @@ def network_load(interface='eth0', measure_interval=1, suffix='B/s', si_prefix=F
def compute_state(self, interface):
if interface in self.interfaces:
idata = self.interfaces[interface]
idata['prev'] = idata['last']
try:
idata['prev'] = idata['last']
except KeyError:
pass
else:
idata = {}
if self.run_once:
@ -527,8 +594,8 @@ def network_load(interface='eth0', measure_interval=1, suffix='B/s', si_prefix=F
idata['last'] = (monotonic(), _get_bytes(interface))
return idata
def render_one(self, idata, suffix='B/s', si_prefix=False, **kwargs):
if 'prev' not in idata:
def render_one(self, idata, format='{recv:>8}{sent:>8}', suffix='B/s', si_prefix=False, **kwargs):
if not idata or 'prev' not in idata:
return None
t1, b1 = idata['prev']
@ -547,7 +614,7 @@ def network_load(interface='eth0', measure_interval=1, suffix='B/s', si_prefix=F
}]
network_load = NetworkLoadSegment()
network_load = _NetworkLoadSegment()
def virtualenv():
@ -555,8 +622,10 @@ def virtualenv():
return os.path.basename(os.environ.get('VIRTUAL_ENV', '')) or None
@memoize(60)
def email_imap_alert(username, password, server='imap.gmail.com', port=993, folder='INBOX'):
IMAPKey = namedtuple('Key', 'username password server port folder')
class EmailIMAPSegment(KwThreadedSegment):
'''Return unread e-mail count for IMAP servers.
:param str username:
@ -572,27 +641,38 @@ def email_imap_alert(username, password, server='imap.gmail.com', port=993, fold
Highlight groups used: ``email_alert``.
'''
import imaplib
import re
if not username or not password:
return None
try:
mail = imaplib.IMAP4_SSL(server, port)
mail.login(username, password)
rc, message = mail.status(folder, '(UNSEEN)')
unread_str = message[0].decode('utf-8')
unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1))
except socket.gaierror:
return None
except imaplib.IMAP4.error as e:
unread_count = str(e)
if not unread_count:
return None
return [{
'highlight_group': 'email_alert',
'contents': str(unread_count),
}]
interval = 60
@staticmethod
def key(username, password, server='imap.gmail.com', port=993, folder='INBOX'):
return IMAPKey(username, password, server, port, folder)
@staticmethod
def compute_state(key):
if not key.username or not key.password:
return None
try:
import imaplib
import re
mail = imaplib.IMAP4_SSL(key.server, key.port)
mail.login(key.username, key.password)
rc, message = mail.status(key.folder, '(UNSEEN)')
unread_str = message[0].decode('utf-8')
unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1))
except socket.gaierror:
return None
except imaplib.IMAP4.error as e:
unread_count = str(e)
if not unread_count:
return None
return [{
'highlight_group': 'email_alert',
'contents': str(unread_count),
}]
email_imap_alert = EmailIMAPSegment()
class NowPlayingSegment(object):

View File

@ -11,9 +11,9 @@ except ImportError:
from powerline.bindings.vim import vim_get_func, getbufvar
from powerline.theme import requires_segment_info
from powerline.lib import add_divider_highlight_group
from powerline.lib.memoize import memoize
from powerline.lib.vcs import guess
from powerline.lib.humanize_bytes import humanize_bytes
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment
from functools import wraps
from collections import defaultdict
@ -309,58 +309,138 @@ def modified_buffers(text='+ ', join_str=','):
return None
class KwWindowThreadedSegment(KwThreadedSegment):
def set_state(self, **kwargs):
for window in vim.windows:
buffer = window.buffer
kwargs['segment_info'] = {'bufnr': buffer.number, 'buffer': buffer}
super(KwWindowThreadedSegment, self).set_state(**kwargs)
class RepositorySegment(KwWindowThreadedSegment):
def __init__(self):
super(RepositorySegment, self).__init__()
self.directories = {}
@staticmethod
def key(segment_info, **kwargs):
# FIXME os.getcwd() is not a proper variant for non-current buffers
return segment_info['buffer'].name or os.getcwd()
def update(self):
# .compute_state() is running only in this method, and only in one
# thread, thus operations with .directories do not need write locks
# (.render() method is not using .directories). If this is changed
# .directories needs redesigning
self.directories.clear()
super(RepositorySegment, self).update()
def compute_state(self, path):
repo = guess(path=path)
if repo:
if repo.directory in self.directories:
return self.directories[repo.directory]
else:
r = self.process_repo(repo)
self.directories[repo.directory] = r
return r
@requires_segment_info
@memoize(2, cache_key=bufnr, cache_reg_func=purgeall_on_shell)
def branch(segment_info, status_colors=True):
class RepositoryStatusSegment(RepositorySegment):
'''Return the status for the current repo.'''
interval = 2
@staticmethod
def process_repo(repo):
return repo.status()
repository_status = RepositoryStatusSegment()
@requires_segment_info
class BranchSegment(RepositorySegment):
'''Return the current working branch.
:param bool status_colors:
determines whether repository status will be used to determine highlighting. Default: True.
determines whether repository status will be used to determine highlighting. Default: False.
Highlight groups used: ``branch_clean``, ``branch_dirty``, ``branch``.
Divider highlight group used: ``branch:divider``.
'''
repo = guess(path=os.path.abspath(segment_info['buffer'].name or os.getcwd()))
if repo:
interval = 0.2
started_repository_status = False
@staticmethod
def process_repo(repo):
return repo.branch()
def render_one(self, update_state, segment_info, status_colors=False, **kwargs):
if not update_state:
return None
if status_colors:
self.started_repository_status = True
return [{
'contents': repo.branch(),
'highlight_group': (['branch_dirty' if repo.status() else 'branch_clean'] if status_colors else []) + ['branch'],
'contents': update_state,
'highlight_group': (['branch_dirty' if repository_status(segment_info=segment_info) else 'branch_clean']
if status_colors else []) + ['branch'],
'divider_highlight_group': 'branch:divider',
}]
return None
}]
def startup(self, **kwargs):
super(BranchSegment, self).startup()
if kwargs.get('status_colors', False):
self.started_repository_status = True
repository_status.startup()
def shutdown(self):
if self.started_repository_status:
repository_status.shutdown()
super(BranchSegment, self).shutdown()
branch = BranchSegment()
@requires_segment_info
@memoize(2, cache_key=bufnr, cache_reg_func=purgebuf_on_shell_and_write)
def file_vcs_status(segment_info):
class FileVCSStatusSegment(KwWindowThreadedSegment):
'''Return the VCS status for this buffer.
Highlight groups used: ``file_vcs_status``.
'''
name = segment_info['buffer'].name
if name and not getbufvar(segment_info['bufnr'], '&buftype'):
repo = guess(path=os.path.abspath(name))
if repo:
status = repo.status(os.path.relpath(name, repo.directory))
if not status:
return None
status = status.strip()
ret = []
for status in status:
ret.append({
'contents': status,
'highlight_group': ['file_vcs_status_' + status, 'file_vcs_status'],
})
return ret
return None
interval = 0.2
@staticmethod
def key(segment_info, **kwargs):
name = segment_info['buffer'].name
skip = not (name and (not getbufvar(segment_info['bufnr'], '&buftype')))
return name, skip
@staticmethod
def compute_state(key):
name, skip = key
if not skip:
repo = guess(path=name)
if repo:
status = repo.status(os.path.relpath(name, repo.directory))
if not status:
return None
status = status.strip()
ret = []
for status in status:
ret.append({
'contents': status,
'highlight_group': ['file_vcs_status_' + status, 'file_vcs_status'],
})
return ret
return None
@requires_segment_info
@memoize(2, cache_key=bufnr, cache_reg_func=purgeall_on_shell)
def repository_status(segment_info):
'''Return the status for the current repo.'''
repo = guess(path=os.path.abspath(segment_info['buffer'].name or os.getcwd()))
if repo:
return repo.status().strip() or None
return None
file_vcs_status = FileVCSStatusSegment()

View File

@ -39,12 +39,13 @@ class TestCommon(TestCase):
def test_user(self):
new_os = new_module('os', environ={'USER': 'def'}, getpid=lambda: 1)
new_psutil = new_module('psutil', Process=lambda pid: Args(username='def'))
with replace_module_attr(common, 'os', new_os), replace_module_attr(common, 'psutil', new_psutil):
self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': 'user'}])
new_os.geteuid = lambda: 1
self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': 'user'}])
new_os.geteuid = lambda: 0
self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': ['superuser', 'user']}])
with replace_module_attr(common, 'os', new_os):
with replace_module_attr(common, 'psutil', new_psutil):
self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': 'user'}])
new_os.geteuid = lambda: 1
self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': 'user'}])
new_os.geteuid = lambda: 0
self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': ['superuser', 'user']}])
def test_branch(self):
with replace_module_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None)):
@ -149,7 +150,7 @@ class TestCommon(TestCase):
{'contents': '2', 'highlight_group': ['system_load_bad', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider'}])
def test_cpu_load_percent(self):
with replace_module('psutil', cpu_percent=lambda **kwargs: 52.3):
with replace_module_module(common, 'psutil', cpu_percent=lambda **kwargs: 52.3):
self.assertEqual(common.cpu_load_percent(), '52%')
def test_network_load(self):