From 96f967f8bdc24c80b218506338684dde16f73dee Mon Sep 17 00:00:00 2001 From: ZyX Date: Sun, 21 Sep 2014 00:49:49 +0400 Subject: [PATCH 1/4] Commit quote changes left from previous branch --- powerline/lint/markedjson/constructor.py | 12 ++++++------ powerline/lint/markedjson/reader.py | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/powerline/lint/markedjson/constructor.py b/powerline/lint/markedjson/constructor.py index 41b7d054..2a95d840 100644 --- a/powerline/lint/markedjson/constructor.py +++ b/powerline/lint/markedjson/constructor.py @@ -116,7 +116,7 @@ class BaseConstructor: if not isinstance(node, nodes.MappingNode): raise ConstructorError( None, None, - "expected a mapping node, but found %s" % node.id, + 'expected a mapping node, but found %s' % node.id, node.start_mark ) mapping = {} @@ -174,9 +174,9 @@ class Constructor(BaseConstructor): for subnode in value_node.value: if not isinstance(subnode, nodes.MappingNode): raise ConstructorError( - "while constructing a mapping", + 'while constructing a mapping', node.start_mark, - "expected a mapping for merging, but found %s" % subnode.id, + 'expected a mapping for merging, but found %s' % subnode.id, subnode.start_mark ) self.flatten_mapping(subnode) @@ -186,9 +186,9 @@ class Constructor(BaseConstructor): merge.extend(value) else: raise ConstructorError( - "while constructing a mapping", + 'while constructing a mapping', node.start_mark, - ("expected a mapping or list of mappings for merging, but found %s" % value_node.id), + ('expected a mapping or list of mappings for merging, but found %s' % value_node.id), value_node.start_mark ) elif key_node.tag == 'tag:yaml.org,2002:value': @@ -255,7 +255,7 @@ class Constructor(BaseConstructor): def construct_undefined(self, node): raise ConstructorError( None, None, - "could not determine a constructor for the tag %r" % node.tag, + 'could not determine a constructor for the tag %r' % node.tag, node.start_mark ) diff --git a/powerline/lint/markedjson/reader.py b/powerline/lint/markedjson/reader.py index 6b329bf9..bb518b06 100644 --- a/powerline/lint/markedjson/reader.py +++ b/powerline/lint/markedjson/reader.py @@ -42,7 +42,7 @@ class Reader(object): self.column = 0 self.stream = stream - self.name = getattr(stream, 'name', "") + self.name = getattr(stream, 'name', '') self.eof = False self.raw_buffer = None From 26365a96209d0830b357e8a1c79a53b0ed626203 Mon Sep 17 00:00:00 2001 From: ZyX Date: Sun, 21 Sep 2014 02:00:37 +0400 Subject: [PATCH 2/4] Split powerline.segments.common into a number of modules Note: some modules (i.e wthr and bat) have their names in order to avoid the situation when module name matches segment name: import powerline.segments.common.weather will import weather segment because `powerline.segments.common` contains line `from powerline.segments.common.weather import weather`. Fixes #1083 --- docs/source/configuration/segments/common.rst | 53 +- powerline/segments/common.py | 1476 ----------------- powerline/segments/common/__init__.py | 22 + powerline/segments/common/bat.py | 222 +++ powerline/segments/common/env.py | 165 ++ powerline/segments/common/mail.py | 74 + powerline/segments/common/net.py | 287 ++++ powerline/segments/common/players.py | 270 +++ powerline/segments/common/sys.py | 174 ++ powerline/segments/common/time.py | 89 + powerline/segments/common/vcs.py | 33 + powerline/segments/common/wthr.py | 229 +++ powerline/segments/shell.py | 2 +- tests/test_segments.py | 453 ++--- 14 files changed, 1862 insertions(+), 1687 deletions(-) delete mode 100644 powerline/segments/common.py create mode 100644 powerline/segments/common/__init__.py create mode 100644 powerline/segments/common/bat.py create mode 100644 powerline/segments/common/env.py create mode 100644 powerline/segments/common/mail.py create mode 100644 powerline/segments/common/net.py create mode 100644 powerline/segments/common/players.py create mode 100644 powerline/segments/common/sys.py create mode 100644 powerline/segments/common/time.py create mode 100644 powerline/segments/common/vcs.py create mode 100644 powerline/segments/common/wthr.py diff --git a/docs/source/configuration/segments/common.rst b/docs/source/configuration/segments/common.rst index 49dc2519..5d52d69a 100644 --- a/docs/source/configuration/segments/common.rst +++ b/docs/source/configuration/segments/common.rst @@ -2,5 +2,56 @@ Common segments *************** -.. automodule:: powerline.segments.common +VCS submodule +============= + +.. automodule:: powerline.segments.common.vcs + :members: + +System properties +================= + +.. automodule:: powerline.segments.common.sys + :members: + +Network +======= + +.. automodule:: powerline.segments.common.net + :members: + +Current environment +=================== + +.. automodule:: powerline.segments.common.env + :members: + +Battery +======= + +.. automodule:: powerline.segments.common.bat + :members: + +Weather +======= + +.. automodule:: powerline.segments.common.wthr + :members: + +Date and time +============= + +.. automodule:: powerline.segments.common.time + :members: + +Mail +==== + +.. automodule:: powerline.segments.common.mail + :members: + +Media players +============= + +.. automodule:: powerline.segments.common.players :members: diff --git a/powerline/segments/common.py b/powerline/segments/common.py deleted file mode 100644 index 89bd69b8..00000000 --- a/powerline/segments/common.py +++ /dev/null @@ -1,1476 +0,0 @@ -# vim:fileencoding=utf-8:noet -from __future__ import (unicode_literals, division, absolute_import, print_function) - -import os -import sys -import re -import socket -import json - -from datetime import datetime -from multiprocessing import cpu_count as _cpu_count -from collections import namedtuple - -from powerline.lib import add_divider_highlight_group -from powerline.lib.shell import asrun, run_cmd -from powerline.lib.url import urllib_read, urllib_urlencode -from powerline.lib.vcs import guess, tree_status -from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment -from powerline.lib.monotonic import monotonic -from powerline.lib.humanize_bytes import humanize_bytes -from powerline.lib.unicode import out_u -from powerline.theme import requires_segment_info, requires_filesystem_watcher -from powerline.segments import Segment, with_docstring - - -cpu_count = None - - -@requires_segment_info -def environment(pl, segment_info, variable=None): - '''Return the value of any defined environment variable - - :param string variable: - The environment variable to return if found - ''' - return segment_info['environ'].get(variable, None) - - -@requires_segment_info -def hostname(pl, segment_info, only_if_ssh=False, exclude_domain=False): - '''Return the current hostname. - - :param bool only_if_ssh: - only return the hostname if currently in an SSH session - :param bool exclude_domain: - return the hostname without domain if there is one - ''' - if only_if_ssh and not segment_info['environ'].get('SSH_CLIENT'): - return None - if exclude_domain: - return socket.gethostname().split('.')[0] - return socket.gethostname() - - -@requires_filesystem_watcher -@requires_segment_info -def branch(pl, segment_info, create_watcher, status_colors=False): - '''Return the current VCS branch. - - :param bool status_colors: - determines whether repository status will be used to determine highlighting. Default: False. - - Highlight groups used: ``branch_clean``, ``branch_dirty``, ``branch``. - ''' - name = segment_info['getcwd']() - repo = guess(path=name, create_watcher=create_watcher) - if repo is not None: - branch = repo.branch() - scol = ['branch'] - if status_colors: - try: - status = tree_status(repo, pl) - except Exception as e: - pl.exception('Failed to compute tree status: {0}', str(e)) - status = '?' - scol.insert(0, 'branch_dirty' if status and status.strip() else 'branch_clean') - return [{ - 'contents': branch, - 'highlight_group': scol, - }] - - -@requires_segment_info -class CwdSegment(Segment): - def argspecobjs(self): - for obj in super(CwdSegment, self).argspecobjs(): - yield obj - yield 'get_shortened_path', self.get_shortened_path - - def omitted_args(self, name, method): - if method is self.get_shortened_path: - return (0, 1, 2) - else: - return super(CwdSegment, self).omitted_args(name, method) - - def get_shortened_path(self, pl, segment_info, shorten_home=True, **kwargs): - try: - path = out_u(segment_info['getcwd']()) - except OSError as e: - if e.errno == 2: - # user most probably deleted the directory - # this happens when removing files from Mercurial repos for example - pl.warn('Current directory not found') - return '[not found]' - else: - raise - if shorten_home: - home = segment_info['home'] - if home: - home = out_u(home) - if path.startswith(home): - path = '~' + path[len(home):] - return path - - def __call__(self, pl, segment_info, - dir_shorten_len=None, - dir_limit_depth=None, - use_path_separator=False, - ellipsis='...', - **kwargs): - cwd = self.get_shortened_path(pl, segment_info, **kwargs) - cwd_split = cwd.split(os.sep) - cwd_split_len = len(cwd_split) - cwd = [i[0:dir_shorten_len] if dir_shorten_len and i else i for i in cwd_split[:-1]] + [cwd_split[-1]] - if dir_limit_depth and cwd_split_len > dir_limit_depth + 1: - del(cwd[0:-dir_limit_depth]) - if ellipsis is not None: - cwd.insert(0, ellipsis) - ret = [] - if not cwd[0]: - cwd[0] = '/' - draw_inner_divider = not use_path_separator - for part in cwd: - if not part: - continue - if use_path_separator: - part += os.sep - ret.append({ - 'contents': part, - 'divider_highlight_group': 'cwd:divider', - 'draw_inner_divider': draw_inner_divider, - }) - ret[-1]['highlight_group'] = ['cwd:current_folder', 'cwd'] - if use_path_separator: - ret[-1]['contents'] = ret[-1]['contents'][:-1] - if len(ret) > 1 and ret[0]['contents'][0] == os.sep: - ret[0]['contents'] = ret[0]['contents'][1:] - return ret - - -cwd = with_docstring(CwdSegment(), -'''Return the current working directory. - -Returns a segment list to create a breadcrumb-like effect. - -:param int dir_shorten_len: - shorten parent directory names to this length (e.g. - :file:`/long/path/to/powerline` → :file:`/l/p/t/powerline`) -:param int dir_limit_depth: - limit directory depth to this number (e.g. - :file:`/long/path/to/powerline` → :file:`⋯/to/powerline`) -:param bool use_path_separator: - Use path separator in place of soft divider. -:param bool shorten_home: - Shorten home directory to ``~``. -:param str ellipsis: - Specifies what to use in place of omitted directories. Use None to not - show this subsegment at all. - -Divider highlight group used: ``cwd:divider``. - -Highlight groups used: ``cwd:current_folder`` or ``cwd``. It is recommended to define all highlight groups. -''') - - -def date(pl, format='%Y-%m-%d', istime=False): - '''Return the current date. - - :param str format: - strftime-style date format string - :param bool istime: - If true then segment uses ``time`` highlight group. - - Divider highlight group used: ``time:divider``. - - Highlight groups used: ``time`` or ``date``. - ''' - return [{ - 'contents': datetime.now().strftime(format), - 'highlight_group': (['time'] if istime else []) + ['date'], - 'divider_highlight_group': 'time:divider' if istime else None, - }] - - -UNICODE_TEXT_TRANSLATION = { - ord('\''): '’', - ord('-'): '‐', -} - - -def fuzzy_time(pl, unicode_text=False): - '''Display the current time as fuzzy time, e.g. "quarter past six". - - :param bool unicode_text: - If true then hyphenminuses (regular ASCII ``-``) and single quotes are - replaced with unicode dashes and apostrophes. - ''' - hour_str = ['twelve', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten', 'eleven'] - minute_str = { - 5: 'five past', - 10: 'ten past', - 15: 'quarter past', - 20: 'twenty past', - 25: 'twenty-five past', - 30: 'half past', - 35: 'twenty-five to', - 40: 'twenty to', - 45: 'quarter to', - 50: 'ten to', - 55: 'five to', - } - special_case_str = { - (23, 58): 'round about midnight', - (23, 59): 'round about midnight', - (0, 0): 'midnight', - (0, 1): 'round about midnight', - (0, 2): 'round about midnight', - (12, 0): 'noon', - } - - now = datetime.now() - - try: - return special_case_str[(now.hour, now.minute)] - except KeyError: - pass - - hour = now.hour - if now.minute > 32: - if hour == 23: - hour = 0 - else: - hour += 1 - if hour > 11: - hour = hour - 12 - hour = hour_str[hour] - - minute = int(round(now.minute / 5.0) * 5) - if minute == 60 or minute == 0: - result = ' '.join([hour, 'o\'clock']) - else: - minute = minute_str[minute] - result = ' '.join([minute, hour]) - - if unicode_text: - result = result.translate(UNICODE_TEXT_TRANSLATION) - - return result - - -def _external_ip(query_url='http://ipv4.icanhazip.com/'): - return urllib_read(query_url).strip() - - -class ExternalIpSegment(ThreadedSegment): - interval = 300 - - def set_state(self, query_url='http://ipv4.icanhazip.com/', **kwargs): - self.query_url = query_url - super(ExternalIpSegment, self).set_state(**kwargs) - - def update(self, old_ip): - return _external_ip(query_url=self.query_url) - - def render(self, ip, **kwargs): - if not ip: - return None - return [{'contents': ip, 'divider_highlight_group': 'background:divider'}] - - -external_ip = with_docstring(ExternalIpSegment(), -'''Return external IP address. - -:param str query_url: - URI to query for IP address, should return only the IP address as a text string - - Suggested URIs: - - * http://ipv4.icanhazip.com/ - * http://ipv6.icanhazip.com/ - * http://icanhazip.com/ (returns IPv6 address if available, else IPv4) - -Divider highlight group used: ``background:divider``. -''') - - -try: - import netifaces -except ImportError: - def internal_ip(pl, interface='detect', ipv=4): - return None -else: - _interface_starts = { - 'eth': 10, # Regular ethernet adapters : eth1 - 'enp': 10, # Regular ethernet adapters, Gentoo : enp2s0 - 'ath': 9, # Atheros WiFi adapters : ath0 - 'wlan': 9, # Other WiFi adapters : wlan1 - 'wlp': 9, # Other WiFi adapters, Gentoo : wlp5s0 - 'teredo': 1, # miredo interface : teredo - 'lo': -10, # Loopback interface : lo - } - - _interface_start_re = re.compile(r'^([a-z]+?)(\d|$)') - - def _interface_key(interface): - match = _interface_start_re.match(interface) - if match: - try: - base = _interface_starts[match.group(1)] * 100 - except KeyError: - base = 500 - if match.group(2): - return base - int(match.group(2)) - else: - return base - else: - return 0 - - def internal_ip(pl, interface='detect', ipv=4): - if interface == 'detect': - try: - interface = next(iter(sorted(netifaces.interfaces(), key=_interface_key, reverse=True))) - except StopIteration: - pl.info('No network interfaces found') - return None - addrs = netifaces.ifaddresses(interface) - try: - return addrs[netifaces.AF_INET6 if ipv == 6 else netifaces.AF_INET][0]['addr'] - except (KeyError, IndexError): - return None - - -internal_ip = with_docstring(internal_ip, -'''Return internal IP address - -Requires ``netifaces`` module to work properly. - -:param str interface: - Interface on which IP will be checked. Use ``detect`` to automatically - detect interface. In this case interfaces with lower numbers will be - preferred over interfaces with similar names. Order of preference based on - names: - - #. ``eth`` and ``enp`` followed by number or the end of string. - #. ``ath``, ``wlan`` and ``wlp`` followed by number or the end of string. - #. ``teredo`` followed by number or the end of string. - #. Any other interface that is not ``lo*``. - #. ``lo`` followed by number or the end of string. -:param int ipv: - 4 or 6 for ipv4 and ipv6 respectively, depending on which IP address you - need exactly. -''') - - -# Weather condition code descriptions available at -# http://developer.yahoo.com/weather/#codes -weather_conditions_codes = ( - ('tornado', 'stormy'), # 0 - ('tropical_storm', 'stormy'), # 1 - ('hurricane', 'stormy'), # 2 - ('severe_thunderstorms', 'stormy'), # 3 - ('thunderstorms', 'stormy'), # 4 - ('mixed_rain_and_snow', 'rainy' ), # 5 - ('mixed_rain_and_sleet', 'rainy' ), # 6 - ('mixed_snow_and_sleet', 'snowy' ), # 7 - ('freezing_drizzle', 'rainy' ), # 8 - ('drizzle', 'rainy' ), # 9 - ('freezing_rain', 'rainy' ), # 10 - ('showers', 'rainy' ), # 11 - ('showers', 'rainy' ), # 12 - ('snow_flurries', 'snowy' ), # 13 - ('light_snow_showers', 'snowy' ), # 14 - ('blowing_snow', 'snowy' ), # 15 - ('snow', 'snowy' ), # 16 - ('hail', 'snowy' ), # 17 - ('sleet', 'snowy' ), # 18 - ('dust', 'foggy' ), # 19 - ('fog', 'foggy' ), # 20 - ('haze', 'foggy' ), # 21 - ('smoky', 'foggy' ), # 22 - ('blustery', 'foggy' ), # 23 - ('windy', ), # 24 - ('cold', 'day' ), # 25 - ('clouds', 'cloudy'), # 26 - ('mostly_cloudy_night', 'cloudy'), # 27 - ('mostly_cloudy_day', 'cloudy'), # 28 - ('partly_cloudy_night', 'cloudy'), # 29 - ('partly_cloudy_day', 'cloudy'), # 30 - ('clear_night', 'night' ), # 31 - ('sun', 'sunny' ), # 32 - ('fair_night', 'night' ), # 33 - ('fair_day', 'day' ), # 34 - ('mixed_rain_and_hail', 'rainy' ), # 35 - ('hot', 'sunny' ), # 36 - ('isolated_thunderstorms', 'stormy'), # 37 - ('scattered_thunderstorms', 'stormy'), # 38 - ('scattered_thunderstorms', 'stormy'), # 39 - ('scattered_showers', 'rainy' ), # 40 - ('heavy_snow', 'snowy' ), # 41 - ('scattered_snow_showers', 'snowy' ), # 42 - ('heavy_snow', 'snowy' ), # 43 - ('partly_cloudy', 'cloudy'), # 44 - ('thundershowers', 'rainy' ), # 45 - ('snow_showers', 'snowy' ), # 46 - ('isolated_thundershowers', 'rainy' ), # 47 -) -# ('day', (25, 34)), -# ('rainy', (5, 6, 8, 9, 10, 11, 12, 35, 40, 45, 47)), -# ('cloudy', (26, 27, 28, 29, 30, 44)), -# ('snowy', (7, 13, 14, 15, 16, 17, 18, 41, 42, 43, 46)), -# ('stormy', (0, 1, 2, 3, 4, 37, 38, 39)), -# ('foggy', (19, 20, 21, 22, 23)), -# ('sunny', (32, 36)), -# ('night', (31, 33))): -weather_conditions_icons = { - 'day': 'DAY', - 'blustery': 'WIND', - 'rainy': 'RAIN', - 'cloudy': 'CLOUDS', - 'snowy': 'SNOW', - 'stormy': 'STORM', - 'foggy': 'FOG', - 'sunny': 'SUN', - 'night': 'NIGHT', - 'windy': 'WINDY', - 'not_available': 'NA', - 'unknown': 'UKN', -} - -temp_conversions = { - 'C': lambda temp: temp, - 'F': lambda temp: (temp * 9 / 5) + 32, - 'K': lambda temp: temp + 273.15, -} - -# Note: there are also unicode characters for units: ℃, ℉ and K -temp_units = { - 'C': '°C', - 'F': '°F', - 'K': 'K', -} - - -class WeatherSegment(KwThreadedSegment): - interval = 600 - default_location = None - location_urls = {} - - @staticmethod - def key(location_query=None, **kwargs): - return location_query - - def get_request_url(self, location_query): - try: - return self.location_urls[location_query] - except KeyError: - if location_query is None: - location_data = json.loads(urllib_read('http://freegeoip.net/json/')) - location = ','.join(( - location_data['city'], - location_data['region_name'], - location_data['country_code'] - )) - self.info('Location returned by freegeoip is {0}', location) - else: - location = location_query - query_data = { - 'q': - 'use "https://raw.githubusercontent.com/yql/yql-tables/master/weather/weather.bylocation.xml" as we;' - 'select * from we where location="{0}" and unit="c"'.format(location).encode('utf-8'), - 'format': 'json', - } - self.location_urls[location_query] = url = ( - 'http://query.yahooapis.com/v1/public/yql?' + urllib_urlencode(query_data)) - return url - - def compute_state(self, location_query): - url = self.get_request_url(location_query) - raw_response = urllib_read(url) - if not raw_response: - self.error('Failed to get response') - return - response = json.loads(raw_response) - condition = response['query']['results']['weather']['rss']['channel']['item']['condition'] - condition_code = int(condition['code']) - temp = float(condition['temp']) - - try: - icon_names = weather_conditions_codes[condition_code] - except IndexError: - if condition_code == 3200: - icon_names = ('not_available',) - self.warn('Weather is not available for location {0}', self.location) - else: - icon_names = ('unknown',) - self.error('Unknown condition code: {0}', condition_code) - - return (temp, icon_names) - - def render_one(self, weather, icons=None, unit='C', temp_format=None, temp_coldest=-30, temp_hottest=40, **kwargs): - if not weather: - return None - - temp, icon_names = weather - - for icon_name in icon_names: - if icons: - if icon_name in icons: - icon = icons[icon_name] - break - else: - icon = weather_conditions_icons[icon_names[-1]] - - temp_format = temp_format or ('{temp:.0f}' + temp_units[unit]) - converted_temp = temp_conversions[unit](temp) - if temp <= temp_coldest: - gradient_level = 0 - elif temp >= temp_hottest: - gradient_level = 100 - else: - gradient_level = (temp - temp_coldest) * 100.0 / (temp_hottest - temp_coldest) - groups = ['weather_condition_' + icon_name for icon_name in icon_names] + ['weather_conditions', 'weather'] - return [ - { - 'contents': icon + ' ', - 'highlight_group': groups, - 'divider_highlight_group': 'background:divider', - }, - { - 'contents': temp_format.format(temp=converted_temp), - 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], - 'divider_highlight_group': 'background:divider', - 'gradient_level': gradient_level, - }, - ] - - -weather = with_docstring(WeatherSegment(), -'''Return weather from Yahoo! Weather. - -Uses GeoIP lookup from http://freegeoip.net/ to automatically determine -your current location. This should be changed if you’re in a VPN or if your -IP address is registered at another location. - -Returns a list of colorized icon and temperature segments depending on -weather conditions. - -:param str unit: - temperature unit, can be one of ``F``, ``C`` or ``K`` -:param str location_query: - location query for your current location, e.g. ``oslo, norway`` -:param dict icons: - dict for overriding default icons, e.g. ``{'heavy_snow' : u'❆'}`` -:param str temp_format: - format string, receives ``temp`` as an argument. Should also hold unit. -:param float temp_coldest: - coldest temperature. Any temperature below it will have gradient level equal - to zero. -:param float temp_hottest: - hottest temperature. Any temperature above it will have gradient level equal - to 100. Temperatures between ``temp_coldest`` and ``temp_hottest`` receive - gradient level that indicates relative position in this interval - (``100 * (cur-coldest) / (hottest-coldest)``). - -Divider highlight group used: ``background:divider``. - -Highlight groups used: ``weather_conditions`` or ``weather``, ``weather_temp_gradient`` (gradient) or ``weather``. -Also uses ``weather_conditions_{condition}`` for all weather conditions supported by Yahoo. -''') - - -def system_load(pl, format='{avg:.1f}', threshold_good=1, threshold_bad=2, track_cpu_count=False): - '''Return system load average. - - Highlights using ``system_load_good``, ``system_load_bad`` and - ``system_load_ugly`` highlighting groups, depending on the thresholds - passed to the function. - - :param str format: - format string, receives ``avg`` as an argument - :param float threshold_good: - threshold for gradient level 0: any normalized load average below this - value will have this gradient level. - :param float threshold_bad: - threshold for gradient level 100: any normalized load average above this - value will have this gradient level. Load averages between - ``threshold_good`` and ``threshold_bad`` receive gradient level that - indicates relative position in this interval: - (``100 * (cur-good) / (bad-good)``). - Note: both parameters are checked against normalized load averages. - :param bool track_cpu_count: - if True powerline will continuously poll the system to detect changes - in the number of CPUs. - - Divider highlight group used: ``background:divider``. - - Highlight groups used: ``system_load_gradient`` (gradient) or ``system_load``. - ''' - global cpu_count - try: - cpu_num = cpu_count = _cpu_count() if cpu_count is None or track_cpu_count else cpu_count - except NotImplementedError: - pl.warn('Unable to get CPU count: method is not implemented') - return None - ret = [] - for avg in os.getloadavg(): - normalized = avg / cpu_num - if normalized < threshold_good: - gradient_level = 0 - elif normalized < threshold_bad: - gradient_level = (normalized - threshold_good) * 100.0 / (threshold_bad - threshold_good) - else: - gradient_level = 100 - ret.append({ - 'contents': format.format(avg=avg), - 'highlight_group': ['system_load_gradient', 'system_load'], - 'divider_highlight_group': 'background:divider', - 'gradient_level': gradient_level, - }) - ret[0]['contents'] += ' ' - ret[1]['contents'] += ' ' - return ret - - -try: - import psutil - - def _get_bytes(interface): - try: - io_counters = psutil.net_io_counters(pernic=True) - except AttributeError: - io_counters = psutil.network_io_counters(pernic=True) - if_io = io_counters.get(interface) - if not if_io: - return None - return if_io.bytes_recv, if_io.bytes_sent - - def _get_interfaces(): - io_counters = psutil.network_io_counters(pernic=True) - for interface, data in io_counters.items(): - if data: - yield interface, data.bytes_recv, data.bytes_sent - - # psutil-2.0.0: psutil.Process.username is unbound method - if callable(psutil.Process.username): - def _get_user(segment_info): - return psutil.Process(os.getpid()).username() - # pre psutil-2.0.0: psutil.Process.username has type property - else: - def _get_user(segment_info): - return psutil.Process(os.getpid()).username - - class CPULoadPercentSegment(ThreadedSegment): - interval = 1 - - def update(self, old_cpu): - return psutil.cpu_percent(interval=None) - - def run(self): - while not self.shutdown_event.is_set(): - try: - self.update_value = psutil.cpu_percent(interval=self.interval) - except Exception as e: - self.exception('Exception while calculating cpu_percent: {0}', str(e)) - - def render(self, cpu_percent, format='{0:.0f}%', **kwargs): - if not cpu_percent: - return None - return [{ - 'contents': format.format(cpu_percent), - 'gradient_level': cpu_percent, - 'highlight_group': ['cpu_load_percent_gradient', 'cpu_load_percent'], - }] -except ImportError: - def _get_bytes(interface): - with open('/sys/class/net/{interface}/statistics/rx_bytes'.format(interface=interface), 'rb') as file_obj: - rx = int(file_obj.read()) - with open('/sys/class/net/{interface}/statistics/tx_bytes'.format(interface=interface), 'rb') as file_obj: - tx = int(file_obj.read()) - return (rx, tx) - - def _get_interfaces(): - for interface in os.listdir('/sys/class/net'): - x = _get_bytes(interface) - if x is not None: - yield interface, x[0], x[1] - - def _get_user(segment_info): - return segment_info['environ'].get('USER', None) - - class CPULoadPercentSegment(ThreadedSegment): - interval = 1 - - @staticmethod - def startup(**kwargs): - pass - - @staticmethod - def start(): - pass - - @staticmethod - def shutdown(): - pass - - @staticmethod - def render(cpu_percent, pl, format='{0:.0f}%', **kwargs): - pl.warn('Module “psutil” is not installed, thus CPU load is not available') - return None - - -cpu_load_percent = with_docstring(CPULoadPercentSegment(), -'''Return the average CPU load as a percentage. - -Requires the ``psutil`` module. - -:param str format: - Output format. Accepts measured CPU load as the first argument. - -Highlight groups used: ``cpu_load_percent_gradient`` (gradient) or ``cpu_load_percent``. -''') - - -username = False -# os.geteuid is not available on windows -_geteuid = getattr(os, 'geteuid', lambda: 1) - - -def user(pl, segment_info=None, hide_user=None): - '''Return the current user. - - :param str hide_user: - Omit showing segment for users with names equal to this string. - - Highlights the user with the ``superuser`` if the effective user ID is 0. - - Highlight groups used: ``superuser`` or ``user``. It is recommended to define all highlight groups. - ''' - global username - if username is False: - username = _get_user(segment_info) - if username is None: - pl.warn('Failed to get username') - return None - if username == hide_user: - return None - euid = _geteuid() - return [{ - 'contents': username, - 'highlight_group': ['user'] if euid != 0 else ['superuser', 'user'], - }] -if 'psutil' not in globals(): - user = requires_segment_info(user) - - -if os.path.exists('/proc/uptime'): - def _get_uptime(): - with open('/proc/uptime', 'r') as f: - return int(float(f.readline().split()[0])) -elif 'psutil' in globals(): - from time import time - - def _get_uptime(): - # psutil.BOOT_TIME is not subject to clock adjustments, but time() is. - # Thus it is a fallback to /proc/uptime reading and not the reverse. - return int(time() - psutil.BOOT_TIME) -else: - def _get_uptime(): - raise NotImplementedError - - -@add_divider_highlight_group('background:divider') -def uptime(pl, days_format='{days:d}d', hours_format=' {hours:d}h', minutes_format=' {minutes:d}m', seconds_format=' {seconds:d}s', shorten_len=3): - '''Return system uptime. - - :param str days_format: - day format string, will be passed ``days`` as the argument - :param str hours_format: - hour format string, will be passed ``hours`` as the argument - :param str minutes_format: - minute format string, will be passed ``minutes`` as the argument - :param str seconds_format: - second format string, will be passed ``seconds`` as the argument - :param int shorten_len: - shorten the amount of units (days, hours, etc.) displayed - - Divider highlight group used: ``background:divider``. - ''' - try: - seconds = _get_uptime() - except NotImplementedError: - pl.warn('Unable to get uptime. You should install psutil module') - return None - minutes, seconds = divmod(seconds, 60) - hours, minutes = divmod(minutes, 60) - days, hours = divmod(hours, 24) - time_formatted = list(filter(None, [ - days_format.format(days=days) if days and days_format else None, - hours_format.format(hours=hours) if hours and hours_format else None, - minutes_format.format(minutes=minutes) if minutes and minutes_format else None, - seconds_format.format(seconds=seconds) if seconds and seconds_format else None, - ]))[0:shorten_len] - return ''.join(time_formatted).strip() - - -class NetworkLoadSegment(KwThreadedSegment): - interfaces = {} - replace_num_pat = re.compile(r'[a-zA-Z]+') - - @staticmethod - def key(interface='detect', **kwargs): - return interface - - def compute_state(self, interface): - if interface == 'detect': - proc_exists = getattr(self, 'proc_exists', None) - if proc_exists is None: - proc_exists = self.proc_exists = os.path.exists('/proc/net/route') - if proc_exists: - # Look for default interface in routing table - with open('/proc/net/route', 'rb') as f: - for line in f.readlines(): - parts = line.split() - if len(parts) > 1: - iface, destination = parts[:2] - if not destination.replace(b'0', b''): - interface = iface.decode('utf-8') - break - if interface == 'detect': - # Choose interface with most total activity, excluding some - # well known interface names - interface, total = 'eth0', -1 - for name, rx, tx in _get_interfaces(): - base = self.replace_num_pat.match(name) - if None in (base, rx, tx) or base.group() in ('lo', 'vmnet', 'sit'): - continue - activity = rx + tx - if activity > total: - total = activity - interface = name - - try: - idata = self.interfaces[interface] - try: - idata['prev'] = idata['last'] - except KeyError: - pass - except KeyError: - idata = {} - if self.run_once: - idata['prev'] = (monotonic(), _get_bytes(interface)) - self.shutdown_event.wait(self.interval) - self.interfaces[interface] = idata - - idata['last'] = (monotonic(), _get_bytes(interface)) - return idata.copy() - - def render_one(self, idata, recv_format='DL {value:>8}', sent_format='UL {value:>8}', suffix='B/s', si_prefix=False, **kwargs): - if not idata or 'prev' not in idata: - return None - - t1, b1 = idata['prev'] - t2, b2 = idata['last'] - measure_interval = t2 - t1 - - if None in (b1, b2): - return None - - r = [] - for i, key in zip((0, 1), ('recv', 'sent')): - format = locals()[key + '_format'] - try: - value = (b2[i] - b1[i]) / measure_interval - except ZeroDivisionError: - self.warn('Measure interval zero.') - value = 0 - max_key = key + '_max' - is_gradient = max_key in kwargs - hl_groups = ['network_load_' + key, 'network_load'] - if is_gradient: - hl_groups[:0] = (group + '_gradient' for group in hl_groups) - r.append({ - 'contents': format.format(value=humanize_bytes(value, suffix, si_prefix)), - 'divider_highlight_group': 'background:divider', - 'highlight_group': hl_groups, - }) - if is_gradient: - max = kwargs[max_key] - if value >= max: - r[-1]['gradient_level'] = 100 - else: - r[-1]['gradient_level'] = value * 100.0 / max - - return r - - -network_load = with_docstring(NetworkLoadSegment(), -'''Return the network load. - -Uses the ``psutil`` module if available for multi-platform compatibility, -falls back to reading -:file:`/sys/class/net/{interface}/statistics/{rx,tx}_bytes`. - -:param str interface: - network interface to measure (use the special value "detect" to have powerline try to auto-detect the network interface) -:param str suffix: - string appended to each load string -:param bool si_prefix: - use SI prefix, e.g. MB instead of MiB -:param str recv_format: - format string, receives ``value`` as argument -:param str sent_format: - format string, receives ``value`` as argument -:param float recv_max: - maximum number of received bytes per second. Is only used to compute - gradient level -:param float sent_max: - maximum number of sent bytes per second. Is only used to compute gradient - level - -Divider highlight group used: ``background:divider``. - -Highlight groups used: ``network_load_sent_gradient`` (gradient) or ``network_load_recv_gradient`` (gradient) or ``network_load_gradient`` (gradient), ``network_load_sent`` or ``network_load_recv`` or ``network_load``. -''') - - -@requires_segment_info -def virtualenv(pl, segment_info): - '''Return the name of the current Python virtualenv.''' - return os.path.basename(segment_info['environ'].get('VIRTUAL_ENV', '')) or None - - -_IMAPKey = namedtuple('Key', 'username password server port folder') - - -class EmailIMAPSegment(KwThreadedSegment): - interval = 60 - - @staticmethod - def key(username, password, server='imap.gmail.com', port=993, folder='INBOX', **kwargs): - return _IMAPKey(username, password, server, port, folder) - - def compute_state(self, key): - if not key.username or not key.password: - self.warn('Username and password are not configured') - return None - try: - import imaplib - except imaplib.IMAP4.error as e: - unread_count = str(e) - else: - mail = imaplib.IMAP4_SSL(key.server, key.port) - mail.login(key.username, key.password) - rc, message = mail.status(key.folder, '(UNSEEN)') - unread_str = message[0].decode('utf-8') - unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1)) - return unread_count - - @staticmethod - def render_one(unread_count, max_msgs=None, **kwargs): - if not unread_count: - return None - elif type(unread_count) != int or not max_msgs: - return [{ - 'contents': str(unread_count), - 'highlight_group': ['email_alert'], - }] - else: - return [{ - 'contents': str(unread_count), - 'highlight_group': ['email_alert_gradient', 'email_alert'], - 'gradient_level': min(unread_count * 100.0 / max_msgs, 100), - }] - - -email_imap_alert = with_docstring(EmailIMAPSegment(), -'''Return unread e-mail count for IMAP servers. - -:param str username: - login username -:param str password: - login password -:param str server: - e-mail server -:param int port: - e-mail server port -:param str folder: - folder to check for e-mails -:param int max_msgs: - Maximum number of messages. If there are more messages then max_msgs then it - will use gradient level equal to 100, otherwise gradient level is equal to - ``100 * msgs_num / max_msgs``. If not present gradient is not computed. - -Highlight groups used: ``email_alert_gradient`` (gradient), ``email_alert``. -''') - - -STATE_SYMBOLS = { - 'fallback': '', - 'play': '>', - 'pause': '~', - 'stop': 'X', -} - - -class NowPlayingSegment(Segment): - def __call__(self, player='mpd', format='{state_symbol} {artist} - {title} ({total})', state_symbols=STATE_SYMBOLS, **kwargs): - player_func = getattr(self, 'player_{0}'.format(player)) - stats = { - 'state': 'fallback', - 'album': None, - 'artist': None, - 'title': None, - 'elapsed': None, - 'total': None, - } - func_stats = player_func(**kwargs) - if not func_stats: - return None - stats.update(func_stats) - stats['state_symbol'] = state_symbols.get(stats['state']) - return format.format(**stats) - - @staticmethod - def _convert_state(state): - state = state.lower() - if 'play' in state: - return 'play' - if 'pause' in state: - return 'pause' - if 'stop' in state: - return 'stop' - - @staticmethod - def _convert_seconds(seconds): - return '{0:.0f}:{1:02.0f}'.format(*divmod(float(seconds), 60)) - - def player_cmus(self, pl): - '''Return cmus player information. - - cmus-remote -Q returns data with multi-level information i.e. - status playing - file - tag artist - tag title - tag .. - tag n - set continue - set repeat - set .. - set n - - For the information we are looking for we don’t really care if we’re on - the tag level or the set level. The dictionary comprehension in this - method takes anything in ignore_levels and brings the key inside that - to the first level of the dictionary. - ''' - now_playing_str = run_cmd(pl, ['cmus-remote', '-Q']) - if not now_playing_str: - return - ignore_levels = ('tag', 'set',) - now_playing = dict(((token[0] if token[0] not in ignore_levels else token[1], - (' '.join(token[1:]) if token[0] not in ignore_levels else - ' '.join(token[2:]))) for token in [line.split(' ') for line in now_playing_str.split('\n')[:-1]])) - state = self._convert_state(now_playing.get('status')) - return { - 'state': state, - 'album': now_playing.get('album'), - 'artist': now_playing.get('artist'), - 'title': now_playing.get('title'), - 'elapsed': self._convert_seconds(now_playing.get('position', 0)), - 'total': self._convert_seconds(now_playing.get('duration', 0)), - } - - def player_mpd(self, pl, host='localhost', port=6600): - try: - import mpd - except ImportError: - now_playing = run_cmd(pl, ['mpc', 'current', '-f', '%album%\n%artist%\n%title%\n%time%', '-h', str(host), '-p', str(port)]) - if not now_playing: - return - now_playing = now_playing.split('\n') - return { - 'album': now_playing[0], - 'artist': now_playing[1], - 'title': now_playing[2], - 'total': now_playing[3], - } - else: - client = mpd.MPDClient() - client.connect(host, port) - now_playing = client.currentsong() - if not now_playing: - return - status = client.status() - client.close() - client.disconnect() - return { - 'state': status.get('state'), - 'album': now_playing.get('album'), - 'artist': now_playing.get('artist'), - 'title': now_playing.get('title'), - 'elapsed': self._convert_seconds(now_playing.get('elapsed', 0)), - 'total': self._convert_seconds(now_playing.get('time', 0)), - } - - def player_dbus(self, player_name, bus_name, player_path, iface_prop, iface_player): - try: - import dbus - except ImportError: - pl.exception('Could not add {0} segment: requires dbus module', player_name) - return - bus = dbus.SessionBus() - try: - player = bus.get_object(bus_name, player_path) - iface = dbus.Interface(player, iface_prop) - info = iface.Get(iface_player, 'Metadata') - status = iface.Get(iface_player, 'PlaybackStatus') - except dbus.exceptions.DBusException: - return - if not info: - return - album = out_u(info.get('xesam:album')) - title = out_u(info.get('xesam:title')) - artist = info.get('xesam:artist') - state = self._convert_state(status) - if artist: - artist = out_u(artist[0]) - return { - 'state': state, - 'album': album, - 'artist': artist, - 'title': title, - 'total': self._convert_seconds(info.get('mpris:length') / 1e6), - } - - def player_spotify_dbus(self, pl): - return self.player_dbus( - player_name='Spotify', - bus_name='com.spotify.qt', - player_path='/', - iface_prop='org.freedesktop.DBus.Properties', - iface_player='org.freedesktop.MediaPlayer2', - ) - - def player_clementine(self, pl): - return self.player_dbus( - player_name='Clementine', - bus_name='org.mpris.MediaPlayer2.clementine', - player_path='/org/mpris/MediaPlayer2', - iface_prop='org.freedesktop.DBus.Properties', - iface_player='org.mpris.MediaPlayer2.Player', - ) - - def player_spotify_apple_script(self, pl): - status_delimiter = '-~`/=' - ascript = ''' - tell application "System Events" - set process_list to (name of every process) - end tell - - if process_list contains "Spotify" then - tell application "Spotify" - if player state is playing or player state is paused then - set track_name to name of current track - set artist_name to artist of current track - set album_name to album of current track - set track_length to duration of current track - set now_playing to "" & player state & "{0}" & album_name & "{0}" & artist_name & "{0}" & track_name & "{0}" & track_length - return now_playing - else - return player state - end if - - end tell - else - return "stopped" - end if - '''.format(status_delimiter) - - spotify = asrun(pl, ascript) - if not asrun: - return None - - spotify_status = spotify.split(status_delimiter) - state = self._convert_state(spotify_status[0]) - if state == 'stop': - return None - return { - 'state': state, - 'album': spotify_status[1], - 'artist': spotify_status[2], - 'title': spotify_status[3], - 'total': self._convert_seconds(int(spotify_status[4])) - } - - try: - __import__('dbus') - except ImportError: - if sys.platform.startswith('darwin'): - player_spotify = player_spotify_apple_script - else: - player_spotify = player_spotify_dbus - else: - player_spotify = player_spotify_dbus - - def player_rhythmbox(self, pl): - now_playing = run_cmd(pl, ['rhythmbox-client', '--no-start', '--no-present', '--print-playing-format', '%at\n%aa\n%tt\n%te\n%td']) - if not now_playing: - return - now_playing = now_playing.split('\n') - return { - 'album': now_playing[0], - 'artist': now_playing[1], - 'title': now_playing[2], - 'elapsed': now_playing[3], - 'total': now_playing[4], - } - - def player_rdio(self, pl): - status_delimiter = '-~`/=' - ascript = ''' - tell application "System Events" - set rdio_active to the count(every process whose name is "Rdio") - if rdio_active is 0 then - return - end if - end tell - tell application "Rdio" - set rdio_name to the name of the current track - set rdio_artist to the artist of the current track - set rdio_album to the album of the current track - set rdio_duration to the duration of the current track - set rdio_state to the player state - set rdio_elapsed to the player position - return rdio_name & "{0}" & rdio_artist & "{0}" & rdio_album & "{0}" & rdio_elapsed & "{0}" & rdio_duration & "{0}" & rdio_state - end tell - '''.format(status_delimiter) - now_playing = asrun(pl, ascript) - if not now_playing: - return - now_playing = now_playing.split('\n') - if len(now_playing) != 6: - return - state = self._convert_state(now_playing[5]) - total = self._convert_seconds(now_playing[4]) - elapsed = self._convert_seconds(float(now_playing[3]) * float(now_playing[4]) / 100) - return { - 'title': now_playing[0], - 'artist': now_playing[1], - 'album': now_playing[2], - 'elapsed': elapsed, - 'total': total, - 'state': state, - 'state_symbol': self.STATE_SYMBOLS.get(state) - } -now_playing = NowPlayingSegment() - - -def _get_battery(pl): - try: - import dbus - except ImportError: - pl.debug('Not using DBUS+UPower as dbus is not available') - else: - try: - bus = dbus.SystemBus() - except Exception as e: - pl.exception('Failed to connect to system bus: {0}', str(e)) - else: - interface = 'org.freedesktop.UPower' - try: - up = bus.get_object(interface, '/org/freedesktop/UPower') - except dbus.exceptions.DBusException as e: - if getattr(e, '_dbus_error_name', '').endswidth('ServiceUnknown'): - pl.debug('Not using DBUS+UPower as UPower is not available via dbus') - else: - pl.exception('Failed to get UPower service with dbus: {0}', str(e)) - else: - devinterface = 'org.freedesktop.DBus.Properties' - devtype_name = interface + '.Device' - for devpath in up.EnumerateDevices(dbus_interface=interface): - dev = bus.get_object(interface, devpath) - devget = lambda what: dev.Get( - devtype_name, - what, - dbus_interface=devinterface - ) - if int(devget('Type')) != 2: - pl.debug('Not using DBUS+UPower with {0}: invalid type', devpath) - continue - if not bool(devget('IsPresent')): - pl.debug('Not using DBUS+UPower with {0}: not present', devpath) - continue - if not bool(devget('PowerSupply')): - pl.debug('Not using DBUS+UPower with {0}: not a power supply', devpath) - continue - pl.debug('Using DBUS+UPower with {0}', devpath) - return lambda pl: float( - dbus.Interface(dev, dbus_interface=devinterface).Get( - devtype_name, - 'Percentage' - ) - ) - pl.debug('Not using DBUS+UPower as no batteries were found') - - if os.path.isdir('/sys/class/power_supply'): - linux_bat_fmt = '/sys/class/power_supply/{0}/capacity' - for linux_bat in os.listdir('/sys/class/power_supply'): - cap_path = linux_bat_fmt.format(linux_bat) - if linux_bat.startswith('BAT') and os.path.exists(cap_path): - pl.debug('Using /sys/class/power_supply with battery {0}', linux_bat) - - def _get_capacity(pl): - with open(cap_path, 'r') as f: - return int(float(f.readline().split()[0])) - - return _get_capacity - pl.debug('Not using /sys/class/power_supply as no batteries were found') - else: - pl.debug('Not using /sys/class/power_supply: no directory') - - try: - from shutil import which # Python-3.3 and later - except ImportError: - pl.info('Using dumb “which” which only checks for file in /usr/bin') - which = lambda f: (lambda fp: os.path.exists(fp) and fp)(os.path.join('/usr/bin', f)) - - if which('pmset'): - pl.debug('Using pmset') - - def _get_capacity(pl): - import re - battery_summary = run_cmd(pl, ['pmset', '-g', 'batt']) - battery_percent = re.search(r'(\d+)%', battery_summary).group(1) - return int(battery_percent) - - return _get_capacity - else: - pl.debug('Not using pmset: executable not found') - - if sys.platform.startswith('win'): - # From http://stackoverflow.com/a/21083571/273566, reworked - try: - from win32com.client import GetObject - except ImportError: - pl.debug('Not using win32com.client as it is not available') - else: - try: - wmi = GetObject('winmgmts:') - except Exception as e: - pl.exception('Failed to run GetObject from win32com.client: {0}', str(e)) - else: - for battery in wmi.InstancesOf('Win32_Battery'): - pl.debug('Using win32com.client with Win32_Battery') - - def _get_capacity(pl): - # http://msdn.microsoft.com/en-us/library/aa394074(v=vs.85).aspx - return battery.EstimatedChargeRemaining - - return _get_capacity - pl.debug('Not using win32com.client as no batteries were found') - - from ctypes import Structure, c_byte, c_ulong, windll, byref - - class PowerClass(Structure): - _fields_ = [ - ('ACLineStatus', c_byte), - ('BatteryFlag', c_byte), - ('BatteryLifePercent', c_byte), - ('Reserved1', c_byte), - ('BatteryLifeTime', c_ulong), - ('BatteryFullLifeTime', c_ulong) - ] - - def _get_capacity(pl): - powerclass = PowerClass() - result = windll.kernel32.GetSystemPowerStatus(byref(powerclass)) - # http://msdn.microsoft.com/en-us/library/windows/desktop/aa372693(v=vs.85).aspx - if result: - return None - return powerclass.BatteryLifePercent - - if _get_capacity() is None: - pl.debug('Not using GetSystemPowerStatus because it failed') - else: - pl.debug('Using GetSystemPowerStatus') - - return _get_capacity - - raise NotImplementedError - - -def _get_capacity(pl): - global _get_capacity - - def _failing_get_capacity(pl): - raise NotImplementedError - - try: - _get_capacity = _get_battery(pl) - except NotImplementedError: - _get_capacity = _failing_get_capacity - except Exception as e: - pl.exception('Exception while obtaining battery capacity getter: {0}', str(e)) - _get_capacity = _failing_get_capacity - return _get_capacity(pl) - - -def battery(pl, format='{capacity:3.0%}', steps=5, gamify=False, full_heart='O', empty_heart='O'): - '''Return battery charge status. - - :param str format: - Percent format in case gamify is False. - :param int steps: - Number of discrete steps to show between 0% and 100% capacity if gamify - is True. - :param bool gamify: - Measure in hearts (♥) instead of percentages. For full hearts - ``battery_full`` highlighting group is preferred, for empty hearts there - is ``battery_empty``. - :param str full_heart: - Heart displayed for “full” part of battery. - :param str empty_heart: - Heart displayed for “used” part of battery. It is also displayed using - another gradient level and highlighting group, so it is OK for it to be - the same as full_heart as long as necessary highlighting groups are - defined. - - ``battery_gradient`` and ``battery`` groups are used in any case, first is - preferred. - - Highlight groups used: ``battery_full`` or ``battery_gradient`` (gradient) or ``battery``, ``battery_empty`` or ``battery_gradient`` (gradient) or ``battery``. - ''' - try: - capacity = _get_capacity(pl) - except NotImplementedError: - pl.info('Unable to get battery capacity.') - return None - ret = [] - if gamify: - denom = int(steps) - numer = int(denom * capacity / 100) - ret.append({ - 'contents': full_heart * numer, - 'draw_inner_divider': False, - 'highlight_group': ['battery_full', 'battery_gradient', 'battery'], - # Using zero as “nothing to worry about”: it is least alert color. - 'gradient_level': 0, - }) - ret.append({ - 'contents': empty_heart * (denom - numer), - 'draw_inner_divider': False, - 'highlight_group': ['battery_empty', 'battery_gradient', 'battery'], - # Using a hundred as it is most alert color. - 'gradient_level': 100, - }) - else: - ret.append({ - 'contents': format.format(capacity=(capacity / 100.0)), - 'highlight_group': ['battery_gradient', 'battery'], - # Gradients are “least alert – most alert” by default, capacity has - # the opposite semantics. - 'gradient_level': 100 - capacity, - }) - return ret diff --git a/powerline/segments/common/__init__.py b/powerline/segments/common/__init__.py new file mode 100644 index 00000000..3a67901e --- /dev/null +++ b/powerline/segments/common/__init__.py @@ -0,0 +1,22 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + + +from powerline.segments.common.vcs import branch # NOQA +from powerline.segments.common.sys import cpu_load_percent # NOQA +from powerline.segments.common.sys import uptime # NOQA +from powerline.segments.common.sys import system_load # NOQA +from powerline.segments.common.net import hostname # NOQA +from powerline.segments.common.net import external_ip # NOQA +from powerline.segments.common.net import internal_ip # NOQA +from powerline.segments.common.net import network_load # NOQA +from powerline.segments.common.env import cwd # NOQA +from powerline.segments.common.env import user # NOQA +from powerline.segments.common.env import environment # NOQA +from powerline.segments.common.env import virtualenv # NOQA +from powerline.segments.common.bat import battery # NOQA +from powerline.segments.common.wthr import weather # NOQA +from powerline.segments.common.time import date # NOQA +from powerline.segments.common.time import fuzzy_time # NOQA +from powerline.segments.common.mail import email_imap_alert # NOQA +from powerline.segments.common.players import now_playing # NOQA diff --git a/powerline/segments/common/bat.py b/powerline/segments/common/bat.py new file mode 100644 index 00000000..713bc149 --- /dev/null +++ b/powerline/segments/common/bat.py @@ -0,0 +1,222 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import os +import sys +import re + +from powerline.lib.shell import run_cmd + + +# XXX Warning: module name must not be equal to the segment name as long as this +# segment is imported into powerline.segments.common module. + + +def _get_battery(pl): + try: + import dbus + except ImportError: + pl.debug('Not using DBUS+UPower as dbus is not available') + else: + try: + bus = dbus.SystemBus() + except Exception as e: + pl.exception('Failed to connect to system bus: {0}', str(e)) + else: + interface = 'org.freedesktop.UPower' + try: + up = bus.get_object(interface, '/org/freedesktop/UPower') + except dbus.exceptions.DBusException as e: + if getattr(e, '_dbus_error_name', '').endswidth('ServiceUnknown'): + pl.debug('Not using DBUS+UPower as UPower is not available via dbus') + else: + pl.exception('Failed to get UPower service with dbus: {0}', str(e)) + else: + devinterface = 'org.freedesktop.DBus.Properties' + devtype_name = interface + '.Device' + for devpath in up.EnumerateDevices(dbus_interface=interface): + dev = bus.get_object(interface, devpath) + devget = lambda what: dev.Get( + devtype_name, + what, + dbus_interface=devinterface + ) + if int(devget('Type')) != 2: + pl.debug('Not using DBUS+UPower with {0}: invalid type', devpath) + continue + if not bool(devget('IsPresent')): + pl.debug('Not using DBUS+UPower with {0}: not present', devpath) + continue + if not bool(devget('PowerSupply')): + pl.debug('Not using DBUS+UPower with {0}: not a power supply', devpath) + continue + pl.debug('Using DBUS+UPower with {0}', devpath) + return lambda pl: float( + dbus.Interface(dev, dbus_interface=devinterface).Get( + devtype_name, + 'Percentage' + ) + ) + pl.debug('Not using DBUS+UPower as no batteries were found') + + if os.path.isdir('/sys/class/power_supply'): + linux_bat_fmt = '/sys/class/power_supply/{0}/capacity' + for linux_bat in os.listdir('/sys/class/power_supply'): + cap_path = linux_bat_fmt.format(linux_bat) + if linux_bat.startswith('BAT') and os.path.exists(cap_path): + pl.debug('Using /sys/class/power_supply with battery {0}', linux_bat) + + def _get_capacity(pl): + with open(cap_path, 'r') as f: + return int(float(f.readline().split()[0])) + + return _get_capacity + pl.debug('Not using /sys/class/power_supply as no batteries were found') + else: + pl.debug('Not using /sys/class/power_supply: no directory') + + try: + from shutil import which # Python-3.3 and later + except ImportError: + pl.info('Using dumb “which” which only checks for file in /usr/bin') + which = lambda f: (lambda fp: os.path.exists(fp) and fp)(os.path.join('/usr/bin', f)) + + if which('pmset'): + pl.debug('Using pmset') + + BATTERY_PERCENT_RE = re.compile(r'(\d+)%') + + def _get_capacity(pl): + battery_summary = run_cmd(pl, ['pmset', '-g', 'batt']) + battery_percent = BATTERY_PERCENT_RE.search(battery_summary).group(1) + return int(battery_percent) + + return _get_capacity + else: + pl.debug('Not using pmset: executable not found') + + if sys.platform.startswith('win'): + # From http://stackoverflow.com/a/21083571/273566, reworked + try: + from win32com.client import GetObject + except ImportError: + pl.debug('Not using win32com.client as it is not available') + else: + try: + wmi = GetObject('winmgmts:') + except Exception as e: + pl.exception('Failed to run GetObject from win32com.client: {0}', str(e)) + else: + for battery in wmi.InstancesOf('Win32_Battery'): + pl.debug('Using win32com.client with Win32_Battery') + + def _get_capacity(pl): + # http://msdn.microsoft.com/en-us/library/aa394074(v=vs.85).aspx + return battery.EstimatedChargeRemaining + + return _get_capacity + pl.debug('Not using win32com.client as no batteries were found') + + from ctypes import Structure, c_byte, c_ulong, windll, byref + + class PowerClass(Structure): + _fields_ = [ + ('ACLineStatus', c_byte), + ('BatteryFlag', c_byte), + ('BatteryLifePercent', c_byte), + ('Reserved1', c_byte), + ('BatteryLifeTime', c_ulong), + ('BatteryFullLifeTime', c_ulong) + ] + + def _get_capacity(pl): + powerclass = PowerClass() + result = windll.kernel32.GetSystemPowerStatus(byref(powerclass)) + # http://msdn.microsoft.com/en-us/library/windows/desktop/aa372693(v=vs.85).aspx + if result: + return None + return powerclass.BatteryLifePercent + + if _get_capacity() is None: + pl.debug('Not using GetSystemPowerStatus because it failed') + else: + pl.debug('Using GetSystemPowerStatus') + + return _get_capacity + + raise NotImplementedError + + +def _get_capacity(pl): + global _get_capacity + + def _failing_get_capacity(pl): + raise NotImplementedError + + try: + _get_capacity = _get_battery(pl) + except NotImplementedError: + _get_capacity = _failing_get_capacity + except Exception as e: + pl.exception('Exception while obtaining battery capacity getter: {0}', str(e)) + _get_capacity = _failing_get_capacity + return _get_capacity(pl) + + +def battery(pl, format='{capacity:3.0%}', steps=5, gamify=False, full_heart='O', empty_heart='O'): + '''Return battery charge status. + + :param str format: + Percent format in case gamify is False. + :param int steps: + Number of discrete steps to show between 0% and 100% capacity if gamify + is True. + :param bool gamify: + Measure in hearts (♥) instead of percentages. For full hearts + ``battery_full`` highlighting group is preferred, for empty hearts there + is ``battery_empty``. + :param str full_heart: + Heart displayed for “full” part of battery. + :param str empty_heart: + Heart displayed for “used” part of battery. It is also displayed using + another gradient level and highlighting group, so it is OK for it to be + the same as full_heart as long as necessary highlighting groups are + defined. + + ``battery_gradient`` and ``battery`` groups are used in any case, first is + preferred. + + Highlight groups used: ``battery_full`` or ``battery_gradient`` (gradient) or ``battery``, ``battery_empty`` or ``battery_gradient`` (gradient) or ``battery``. + ''' + try: + capacity = _get_capacity(pl) + except NotImplementedError: + pl.info('Unable to get battery capacity.') + return None + ret = [] + if gamify: + denom = int(steps) + numer = int(denom * capacity / 100) + ret.append({ + 'contents': full_heart * numer, + 'draw_inner_divider': False, + 'highlight_group': ['battery_full', 'battery_gradient', 'battery'], + # Using zero as “nothing to worry about”: it is least alert color. + 'gradient_level': 0, + }) + ret.append({ + 'contents': empty_heart * (denom - numer), + 'draw_inner_divider': False, + 'highlight_group': ['battery_empty', 'battery_gradient', 'battery'], + # Using a hundred as it is most alert color. + 'gradient_level': 100, + }) + else: + ret.append({ + 'contents': format.format(capacity=(capacity / 100.0)), + 'highlight_group': ['battery_gradient', 'battery'], + # Gradients are “least alert – most alert” by default, capacity has + # the opposite semantics. + 'gradient_level': 100 - capacity, + }) + return ret diff --git a/powerline/segments/common/env.py b/powerline/segments/common/env.py new file mode 100644 index 00000000..8041e500 --- /dev/null +++ b/powerline/segments/common/env.py @@ -0,0 +1,165 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import os + +from powerline.lib.unicode import out_u +from powerline.theme import requires_segment_info +from powerline.segments import Segment, with_docstring + + +@requires_segment_info +def environment(pl, segment_info, variable=None): + '''Return the value of any defined environment variable + + :param string variable: + The environment variable to return if found + ''' + return segment_info['environ'].get(variable, None) + + +@requires_segment_info +def virtualenv(pl, segment_info): + '''Return the name of the current Python virtualenv.''' + return os.path.basename(segment_info['environ'].get('VIRTUAL_ENV', '')) or None + + +@requires_segment_info +class CwdSegment(Segment): + def argspecobjs(self): + for obj in super(CwdSegment, self).argspecobjs(): + yield obj + yield 'get_shortened_path', self.get_shortened_path + + def omitted_args(self, name, method): + if method is self.get_shortened_path: + return (0, 1, 2) + else: + return super(CwdSegment, self).omitted_args(name, method) + + def get_shortened_path(self, pl, segment_info, shorten_home=True, **kwargs): + try: + path = out_u(segment_info['getcwd']()) + except OSError as e: + if e.errno == 2: + # user most probably deleted the directory + # this happens when removing files from Mercurial repos for example + pl.warn('Current directory not found') + return '[not found]' + else: + raise + if shorten_home: + home = segment_info['home'] + if home: + home = out_u(home) + if path.startswith(home): + path = '~' + path[len(home):] + return path + + def __call__(self, pl, segment_info, + dir_shorten_len=None, + dir_limit_depth=None, + use_path_separator=False, + ellipsis='...', + **kwargs): + cwd = self.get_shortened_path(pl, segment_info, **kwargs) + cwd_split = cwd.split(os.sep) + cwd_split_len = len(cwd_split) + cwd = [i[0:dir_shorten_len] if dir_shorten_len and i else i for i in cwd_split[:-1]] + [cwd_split[-1]] + if dir_limit_depth and cwd_split_len > dir_limit_depth + 1: + del(cwd[0:-dir_limit_depth]) + if ellipsis is not None: + cwd.insert(0, ellipsis) + ret = [] + if not cwd[0]: + cwd[0] = '/' + draw_inner_divider = not use_path_separator + for part in cwd: + if not part: + continue + if use_path_separator: + part += os.sep + ret.append({ + 'contents': part, + 'divider_highlight_group': 'cwd:divider', + 'draw_inner_divider': draw_inner_divider, + }) + ret[-1]['highlight_group'] = ['cwd:current_folder', 'cwd'] + if use_path_separator: + ret[-1]['contents'] = ret[-1]['contents'][:-1] + if len(ret) > 1 and ret[0]['contents'][0] == os.sep: + ret[0]['contents'] = ret[0]['contents'][1:] + return ret + + +cwd = with_docstring(CwdSegment(), +'''Return the current working directory. + +Returns a segment list to create a breadcrumb-like effect. + +:param int dir_shorten_len: + shorten parent directory names to this length (e.g. + :file:`/long/path/to/powerline` → :file:`/l/p/t/powerline`) +:param int dir_limit_depth: + limit directory depth to this number (e.g. + :file:`/long/path/to/powerline` → :file:`⋯/to/powerline`) +:param bool use_path_separator: + Use path separator in place of soft divider. +:param bool shorten_home: + Shorten home directory to ``~``. +:param str ellipsis: + Specifies what to use in place of omitted directories. Use None to not + show this subsegment at all. + +Divider highlight group used: ``cwd:divider``. + +Highlight groups used: ``cwd:current_folder`` or ``cwd``. It is recommended to define all highlight groups. +''') + + +try: + import psutil + + # psutil-2.0.0: psutil.Process.username is unbound method + if callable(psutil.Process.username): + def _get_user(segment_info): + return psutil.Process(os.getpid()).username() + # pre psutil-2.0.0: psutil.Process.username has type property + else: + def _get_user(segment_info): + return psutil.Process(os.getpid()).username +except ImportError: + def _get_user(segment_info): + return segment_info['environ'].get('USER', None) + + +username = False +# os.geteuid is not available on windows +_geteuid = getattr(os, 'geteuid', lambda: 1) + + +def user(pl, segment_info=None, hide_user=None): + '''Return the current user. + + :param str hide_user: + Omit showing segment for users with names equal to this string. + + Highlights the user with the ``superuser`` if the effective user ID is 0. + + Highlight groups used: ``superuser`` or ``user``. It is recommended to define all highlight groups. + ''' + global username + if username is False: + username = _get_user(segment_info) + if username is None: + pl.warn('Failed to get username') + return None + if username == hide_user: + return None + euid = _geteuid() + return [{ + 'contents': username, + 'highlight_group': ['user'] if euid != 0 else ['superuser', 'user'], + }] +if 'psutil' not in globals(): + user = requires_segment_info(user) diff --git a/powerline/segments/common/mail.py b/powerline/segments/common/mail.py new file mode 100644 index 00000000..dc266f36 --- /dev/null +++ b/powerline/segments/common/mail.py @@ -0,0 +1,74 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import re + +from collections import namedtuple + +from powerline.lib.threaded import KwThreadedSegment +from powerline.segments import with_docstring + + +_IMAPKey = namedtuple('Key', 'username password server port folder') + + +class EmailIMAPSegment(KwThreadedSegment): + interval = 60 + + @staticmethod + def key(username, password, server='imap.gmail.com', port=993, folder='INBOX', **kwargs): + return _IMAPKey(username, password, server, port, folder) + + def compute_state(self, key): + if not key.username or not key.password: + self.warn('Username and password are not configured') + return None + try: + import imaplib + except imaplib.IMAP4.error as e: + unread_count = str(e) + else: + mail = imaplib.IMAP4_SSL(key.server, key.port) + mail.login(key.username, key.password) + rc, message = mail.status(key.folder, '(UNSEEN)') + unread_str = message[0].decode('utf-8') + unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1)) + return unread_count + + @staticmethod + def render_one(unread_count, max_msgs=None, **kwargs): + if not unread_count: + return None + elif type(unread_count) != int or not max_msgs: + return [{ + 'contents': str(unread_count), + 'highlight_group': ['email_alert'], + }] + else: + return [{ + 'contents': str(unread_count), + 'highlight_group': ['email_alert_gradient', 'email_alert'], + 'gradient_level': min(unread_count * 100.0 / max_msgs, 100), + }] + + +email_imap_alert = with_docstring(EmailIMAPSegment(), +'''Return unread e-mail count for IMAP servers. + +:param str username: + login username +:param str password: + login password +:param str server: + e-mail server +:param int port: + e-mail server port +:param str folder: + folder to check for e-mails +:param int max_msgs: + Maximum number of messages. If there are more messages then max_msgs then it + will use gradient level equal to 100, otherwise gradient level is equal to + ``100 * msgs_num / max_msgs``. If not present gradient is not computed. + +Highlight groups used: ``email_alert_gradient`` (gradient), ``email_alert``. +''') diff --git a/powerline/segments/common/net.py b/powerline/segments/common/net.py new file mode 100644 index 00000000..3e4ccb24 --- /dev/null +++ b/powerline/segments/common/net.py @@ -0,0 +1,287 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import re +import os +import socket + +from powerline.lib.url import urllib_read +from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment +from powerline.lib.monotonic import monotonic +from powerline.lib.humanize_bytes import humanize_bytes +from powerline.segments import with_docstring +from powerline.theme import requires_segment_info + + +@requires_segment_info +def hostname(pl, segment_info, only_if_ssh=False, exclude_domain=False): + '''Return the current hostname. + + :param bool only_if_ssh: + only return the hostname if currently in an SSH session + :param bool exclude_domain: + return the hostname without domain if there is one + ''' + if only_if_ssh and not segment_info['environ'].get('SSH_CLIENT'): + return None + if exclude_domain: + return socket.gethostname().split('.')[0] + return socket.gethostname() + + +def _external_ip(query_url='http://ipv4.icanhazip.com/'): + return urllib_read(query_url).strip() + + +class ExternalIpSegment(ThreadedSegment): + interval = 300 + + def set_state(self, query_url='http://ipv4.icanhazip.com/', **kwargs): + self.query_url = query_url + super(ExternalIpSegment, self).set_state(**kwargs) + + def update(self, old_ip): + return _external_ip(query_url=self.query_url) + + def render(self, ip, **kwargs): + if not ip: + return None + return [{'contents': ip, 'divider_highlight_group': 'background:divider'}] + + +external_ip = with_docstring(ExternalIpSegment(), +'''Return external IP address. + +:param str query_url: + URI to query for IP address, should return only the IP address as a text string + + Suggested URIs: + + * http://ipv4.icanhazip.com/ + * http://ipv6.icanhazip.com/ + * http://icanhazip.com/ (returns IPv6 address if available, else IPv4) + +Divider highlight group used: ``background:divider``. +''') + + +try: + import netifaces +except ImportError: + def internal_ip(pl, interface='detect', ipv=4): + return None +else: + _interface_starts = { + 'eth': 10, # Regular ethernet adapters : eth1 + 'enp': 10, # Regular ethernet adapters, Gentoo : enp2s0 + 'ath': 9, # Atheros WiFi adapters : ath0 + 'wlan': 9, # Other WiFi adapters : wlan1 + 'wlp': 9, # Other WiFi adapters, Gentoo : wlp5s0 + 'teredo': 1, # miredo interface : teredo + 'lo': -10, # Loopback interface : lo + } + + _interface_start_re = re.compile(r'^([a-z]+?)(\d|$)') + + def _interface_key(interface): + match = _interface_start_re.match(interface) + if match: + try: + base = _interface_starts[match.group(1)] * 100 + except KeyError: + base = 500 + if match.group(2): + return base - int(match.group(2)) + else: + return base + else: + return 0 + + def internal_ip(pl, interface='detect', ipv=4): + if interface == 'detect': + try: + interface = next(iter(sorted(netifaces.interfaces(), key=_interface_key, reverse=True))) + except StopIteration: + pl.info('No network interfaces found') + return None + addrs = netifaces.ifaddresses(interface) + try: + return addrs[netifaces.AF_INET6 if ipv == 6 else netifaces.AF_INET][0]['addr'] + except (KeyError, IndexError): + return None + + +internal_ip = with_docstring(internal_ip, +'''Return internal IP address + +Requires ``netifaces`` module to work properly. + +:param str interface: + Interface on which IP will be checked. Use ``detect`` to automatically + detect interface. In this case interfaces with lower numbers will be + preferred over interfaces with similar names. Order of preference based on + names: + + #. ``eth`` and ``enp`` followed by number or the end of string. + #. ``ath``, ``wlan`` and ``wlp`` followed by number or the end of string. + #. ``teredo`` followed by number or the end of string. + #. Any other interface that is not ``lo*``. + #. ``lo`` followed by number or the end of string. +:param int ipv: + 4 or 6 for ipv4 and ipv6 respectively, depending on which IP address you + need exactly. +''') + + +try: + import psutil + + def _get_bytes(interface): + try: + io_counters = psutil.net_io_counters(pernic=True) + except AttributeError: + io_counters = psutil.network_io_counters(pernic=True) + if_io = io_counters.get(interface) + if not if_io: + return None + return if_io.bytes_recv, if_io.bytes_sent + + def _get_interfaces(): + io_counters = psutil.network_io_counters(pernic=True) + for interface, data in io_counters.items(): + if data: + yield interface, data.bytes_recv, data.bytes_sent +except ImportError: + def _get_bytes(interface): + with open('/sys/class/net/{interface}/statistics/rx_bytes'.format(interface=interface), 'rb') as file_obj: + rx = int(file_obj.read()) + with open('/sys/class/net/{interface}/statistics/tx_bytes'.format(interface=interface), 'rb') as file_obj: + tx = int(file_obj.read()) + return (rx, tx) + + def _get_interfaces(): + for interface in os.listdir('/sys/class/net'): + x = _get_bytes(interface) + if x is not None: + yield interface, x[0], x[1] + + +class NetworkLoadSegment(KwThreadedSegment): + interfaces = {} + replace_num_pat = re.compile(r'[a-zA-Z]+') + + @staticmethod + def key(interface='detect', **kwargs): + return interface + + def compute_state(self, interface): + if interface == 'detect': + proc_exists = getattr(self, 'proc_exists', None) + if proc_exists is None: + proc_exists = self.proc_exists = os.path.exists('/proc/net/route') + if proc_exists: + # Look for default interface in routing table + with open('/proc/net/route', 'rb') as f: + for line in f.readlines(): + parts = line.split() + if len(parts) > 1: + iface, destination = parts[:2] + if not destination.replace(b'0', b''): + interface = iface.decode('utf-8') + break + if interface == 'detect': + # Choose interface with most total activity, excluding some + # well known interface names + interface, total = 'eth0', -1 + for name, rx, tx in _get_interfaces(): + base = self.replace_num_pat.match(name) + if None in (base, rx, tx) or base.group() in ('lo', 'vmnet', 'sit'): + continue + activity = rx + tx + if activity > total: + total = activity + interface = name + + try: + idata = self.interfaces[interface] + try: + idata['prev'] = idata['last'] + except KeyError: + pass + except KeyError: + idata = {} + if self.run_once: + idata['prev'] = (monotonic(), _get_bytes(interface)) + self.shutdown_event.wait(self.interval) + self.interfaces[interface] = idata + + idata['last'] = (monotonic(), _get_bytes(interface)) + return idata.copy() + + def render_one(self, idata, recv_format='DL {value:>8}', sent_format='UL {value:>8}', suffix='B/s', si_prefix=False, **kwargs): + if not idata or 'prev' not in idata: + return None + + t1, b1 = idata['prev'] + t2, b2 = idata['last'] + measure_interval = t2 - t1 + + if None in (b1, b2): + return None + + r = [] + for i, key in zip((0, 1), ('recv', 'sent')): + format = locals()[key + '_format'] + try: + value = (b2[i] - b1[i]) / measure_interval + except ZeroDivisionError: + self.warn('Measure interval zero.') + value = 0 + max_key = key + '_max' + is_gradient = max_key in kwargs + hl_groups = ['network_load_' + key, 'network_load'] + if is_gradient: + hl_groups[:0] = (group + '_gradient' for group in hl_groups) + r.append({ + 'contents': format.format(value=humanize_bytes(value, suffix, si_prefix)), + 'divider_highlight_group': 'background:divider', + 'highlight_group': hl_groups, + }) + if is_gradient: + max = kwargs[max_key] + if value >= max: + r[-1]['gradient_level'] = 100 + else: + r[-1]['gradient_level'] = value * 100.0 / max + + return r + + +network_load = with_docstring(NetworkLoadSegment(), +'''Return the network load. + +Uses the ``psutil`` module if available for multi-platform compatibility, +falls back to reading +:file:`/sys/class/net/{interface}/statistics/{rx,tx}_bytes`. + +:param str interface: + network interface to measure (use the special value "detect" to have powerline try to auto-detect the network interface) +:param str suffix: + string appended to each load string +:param bool si_prefix: + use SI prefix, e.g. MB instead of MiB +:param str recv_format: + format string, receives ``value`` as argument +:param str sent_format: + format string, receives ``value`` as argument +:param float recv_max: + maximum number of received bytes per second. Is only used to compute + gradient level +:param float sent_max: + maximum number of sent bytes per second. Is only used to compute gradient + level + +Divider highlight group used: ``background:divider``. + +Highlight groups used: ``network_load_sent_gradient`` (gradient) or ``network_load_recv_gradient`` (gradient) or ``network_load_gradient`` (gradient), ``network_load_sent`` or ``network_load_recv`` or ``network_load``. +''') diff --git a/powerline/segments/common/players.py b/powerline/segments/common/players.py new file mode 100644 index 00000000..589e419e --- /dev/null +++ b/powerline/segments/common/players.py @@ -0,0 +1,270 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import sys + +from powerline.lib.shell import asrun, run_cmd +from powerline.lib.unicode import out_u +from powerline.segments import Segment + + +STATE_SYMBOLS = { + 'fallback': '', + 'play': '>', + 'pause': '~', + 'stop': 'X', +} + + +class NowPlayingSegment(Segment): + def __call__(self, player='mpd', format='{state_symbol} {artist} - {title} ({total})', state_symbols=STATE_SYMBOLS, **kwargs): + player_func = getattr(self, 'player_{0}'.format(player)) + stats = { + 'state': 'fallback', + 'album': None, + 'artist': None, + 'title': None, + 'elapsed': None, + 'total': None, + } + func_stats = player_func(**kwargs) + if not func_stats: + return None + stats.update(func_stats) + stats['state_symbol'] = state_symbols.get(stats['state']) + return format.format(**stats) + + @staticmethod + def _convert_state(state): + state = state.lower() + if 'play' in state: + return 'play' + if 'pause' in state: + return 'pause' + if 'stop' in state: + return 'stop' + + @staticmethod + def _convert_seconds(seconds): + return '{0:.0f}:{1:02.0f}'.format(*divmod(float(seconds), 60)) + + def player_cmus(self, pl): + '''Return cmus player information. + + cmus-remote -Q returns data with multi-level information i.e. + status playing + file + tag artist + tag title + tag .. + tag n + set continue + set repeat + set .. + set n + + For the information we are looking for we don’t really care if we’re on + the tag level or the set level. The dictionary comprehension in this + method takes anything in ignore_levels and brings the key inside that + to the first level of the dictionary. + ''' + now_playing_str = run_cmd(pl, ['cmus-remote', '-Q']) + if not now_playing_str: + return + ignore_levels = ('tag', 'set',) + now_playing = dict(((token[0] if token[0] not in ignore_levels else token[1], + (' '.join(token[1:]) if token[0] not in ignore_levels else + ' '.join(token[2:]))) for token in [line.split(' ') for line in now_playing_str.split('\n')[:-1]])) + state = self._convert_state(now_playing.get('status')) + return { + 'state': state, + 'album': now_playing.get('album'), + 'artist': now_playing.get('artist'), + 'title': now_playing.get('title'), + 'elapsed': self._convert_seconds(now_playing.get('position', 0)), + 'total': self._convert_seconds(now_playing.get('duration', 0)), + } + + def player_mpd(self, pl, host='localhost', port=6600): + try: + import mpd + except ImportError: + now_playing = run_cmd(pl, ['mpc', 'current', '-f', '%album%\n%artist%\n%title%\n%time%', '-h', str(host), '-p', str(port)]) + if not now_playing: + return + now_playing = now_playing.split('\n') + return { + 'album': now_playing[0], + 'artist': now_playing[1], + 'title': now_playing[2], + 'total': now_playing[3], + } + else: + client = mpd.MPDClient() + client.connect(host, port) + now_playing = client.currentsong() + if not now_playing: + return + status = client.status() + client.close() + client.disconnect() + return { + 'state': status.get('state'), + 'album': now_playing.get('album'), + 'artist': now_playing.get('artist'), + 'title': now_playing.get('title'), + 'elapsed': self._convert_seconds(now_playing.get('elapsed', 0)), + 'total': self._convert_seconds(now_playing.get('time', 0)), + } + + def player_dbus(self, player_name, bus_name, player_path, iface_prop, iface_player): + try: + import dbus + except ImportError: + self.exception('Could not add {0} segment: requires dbus module', player_name) + return + bus = dbus.SessionBus() + try: + player = bus.get_object(bus_name, player_path) + iface = dbus.Interface(player, iface_prop) + info = iface.Get(iface_player, 'Metadata') + status = iface.Get(iface_player, 'PlaybackStatus') + except dbus.exceptions.DBusException: + return + if not info: + return + album = out_u(info.get('xesam:album')) + title = out_u(info.get('xesam:title')) + artist = info.get('xesam:artist') + state = self._convert_state(status) + if artist: + artist = out_u(artist[0]) + return { + 'state': state, + 'album': album, + 'artist': artist, + 'title': title, + 'total': self._convert_seconds(info.get('mpris:length') / 1e6), + } + + def player_spotify_dbus(self, pl): + return self.player_dbus( + player_name='Spotify', + bus_name='com.spotify.qt', + player_path='/', + iface_prop='org.freedesktop.DBus.Properties', + iface_player='org.freedesktop.MediaPlayer2', + ) + + def player_clementine(self, pl): + return self.player_dbus( + player_name='Clementine', + bus_name='org.mpris.MediaPlayer2.clementine', + player_path='/org/mpris/MediaPlayer2', + iface_prop='org.freedesktop.DBus.Properties', + iface_player='org.mpris.MediaPlayer2.Player', + ) + + def player_spotify_apple_script(self, pl): + status_delimiter = '-~`/=' + ascript = ''' + tell application "System Events" + set process_list to (name of every process) + end tell + + if process_list contains "Spotify" then + tell application "Spotify" + if player state is playing or player state is paused then + set track_name to name of current track + set artist_name to artist of current track + set album_name to album of current track + set track_length to duration of current track + set now_playing to "" & player state & "{0}" & album_name & "{0}" & artist_name & "{0}" & track_name & "{0}" & track_length + return now_playing + else + return player state + end if + + end tell + else + return "stopped" + end if + '''.format(status_delimiter) + + spotify = asrun(pl, ascript) + if not asrun: + return None + + spotify_status = spotify.split(status_delimiter) + state = self._convert_state(spotify_status[0]) + if state == 'stop': + return None + return { + 'state': state, + 'album': spotify_status[1], + 'artist': spotify_status[2], + 'title': spotify_status[3], + 'total': self._convert_seconds(int(spotify_status[4])) + } + + try: + __import__('dbus') + except ImportError: + if sys.platform.startswith('darwin'): + player_spotify = player_spotify_apple_script + else: + player_spotify = player_spotify_dbus + else: + player_spotify = player_spotify_dbus + + def player_rhythmbox(self, pl): + now_playing = run_cmd(pl, ['rhythmbox-client', '--no-start', '--no-present', '--print-playing-format', '%at\n%aa\n%tt\n%te\n%td']) + if not now_playing: + return + now_playing = now_playing.split('\n') + return { + 'album': now_playing[0], + 'artist': now_playing[1], + 'title': now_playing[2], + 'elapsed': now_playing[3], + 'total': now_playing[4], + } + + def player_rdio(self, pl): + status_delimiter = '-~`/=' + ascript = ''' + tell application "System Events" + set rdio_active to the count(every process whose name is "Rdio") + if rdio_active is 0 then + return + end if + end tell + tell application "Rdio" + set rdio_name to the name of the current track + set rdio_artist to the artist of the current track + set rdio_album to the album of the current track + set rdio_duration to the duration of the current track + set rdio_state to the player state + set rdio_elapsed to the player position + return rdio_name & "{0}" & rdio_artist & "{0}" & rdio_album & "{0}" & rdio_elapsed & "{0}" & rdio_duration & "{0}" & rdio_state + end tell + '''.format(status_delimiter) + now_playing = asrun(pl, ascript) + if not now_playing: + return + now_playing = now_playing.split('\n') + if len(now_playing) != 6: + return + state = self._convert_state(now_playing[5]) + total = self._convert_seconds(now_playing[4]) + elapsed = self._convert_seconds(float(now_playing[3]) * float(now_playing[4]) / 100) + return { + 'title': now_playing[0], + 'artist': now_playing[1], + 'album': now_playing[2], + 'elapsed': elapsed, + 'total': total, + 'state': state, + 'state_symbol': self.STATE_SYMBOLS.get(state) + } +now_playing = NowPlayingSegment() diff --git a/powerline/segments/common/sys.py b/powerline/segments/common/sys.py new file mode 100644 index 00000000..d3a56307 --- /dev/null +++ b/powerline/segments/common/sys.py @@ -0,0 +1,174 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import os + +from multiprocessing import cpu_count as _cpu_count + +from powerline.lib.threaded import ThreadedSegment +from powerline.lib import add_divider_highlight_group +from powerline.segments import with_docstring + + +cpu_count = None + + +def system_load(pl, format='{avg:.1f}', threshold_good=1, threshold_bad=2, track_cpu_count=False): + '''Return system load average. + + Highlights using ``system_load_good``, ``system_load_bad`` and + ``system_load_ugly`` highlighting groups, depending on the thresholds + passed to the function. + + :param str format: + format string, receives ``avg`` as an argument + :param float threshold_good: + threshold for gradient level 0: any normalized load average below this + value will have this gradient level. + :param float threshold_bad: + threshold for gradient level 100: any normalized load average above this + value will have this gradient level. Load averages between + ``threshold_good`` and ``threshold_bad`` receive gradient level that + indicates relative position in this interval: + (``100 * (cur-good) / (bad-good)``). + Note: both parameters are checked against normalized load averages. + :param bool track_cpu_count: + if True powerline will continuously poll the system to detect changes + in the number of CPUs. + + Divider highlight group used: ``background:divider``. + + Highlight groups used: ``system_load_gradient`` (gradient) or ``system_load``. + ''' + global cpu_count + try: + cpu_num = cpu_count = _cpu_count() if cpu_count is None or track_cpu_count else cpu_count + except NotImplementedError: + pl.warn('Unable to get CPU count: method is not implemented') + return None + ret = [] + for avg in os.getloadavg(): + normalized = avg / cpu_num + if normalized < threshold_good: + gradient_level = 0 + elif normalized < threshold_bad: + gradient_level = (normalized - threshold_good) * 100.0 / (threshold_bad - threshold_good) + else: + gradient_level = 100 + ret.append({ + 'contents': format.format(avg=avg), + 'highlight_group': ['system_load_gradient', 'system_load'], + 'divider_highlight_group': 'background:divider', + 'gradient_level': gradient_level, + }) + ret[0]['contents'] += ' ' + ret[1]['contents'] += ' ' + return ret + + +try: + import psutil + + class CPULoadPercentSegment(ThreadedSegment): + interval = 1 + + def update(self, old_cpu): + return psutil.cpu_percent(interval=None) + + def run(self): + while not self.shutdown_event.is_set(): + try: + self.update_value = psutil.cpu_percent(interval=self.interval) + except Exception as e: + self.exception('Exception while calculating cpu_percent: {0}', str(e)) + + def render(self, cpu_percent, format='{0:.0f}%', **kwargs): + if not cpu_percent: + return None + return [{ + 'contents': format.format(cpu_percent), + 'gradient_level': cpu_percent, + 'highlight_group': ['cpu_load_percent_gradient', 'cpu_load_percent'], + }] +except ImportError: + class CPULoadPercentSegment(ThreadedSegment): + interval = 1 + + @staticmethod + def startup(**kwargs): + pass + + @staticmethod + def start(): + pass + + @staticmethod + def shutdown(): + pass + + @staticmethod + def render(cpu_percent, pl, format='{0:.0f}%', **kwargs): + pl.warn('Module “psutil” is not installed, thus CPU load is not available') + return None + + +cpu_load_percent = with_docstring(CPULoadPercentSegment(), +'''Return the average CPU load as a percentage. + +Requires the ``psutil`` module. + +:param str format: + Output format. Accepts measured CPU load as the first argument. + +Highlight groups used: ``cpu_load_percent_gradient`` (gradient) or ``cpu_load_percent``. +''') + + +if os.path.exists('/proc/uptime'): + def _get_uptime(): + with open('/proc/uptime', 'r') as f: + return int(float(f.readline().split()[0])) +elif 'psutil' in globals(): + from time import time + + def _get_uptime(): + # psutil.BOOT_TIME is not subject to clock adjustments, but time() is. + # Thus it is a fallback to /proc/uptime reading and not the reverse. + return int(time() - psutil.BOOT_TIME) +else: + def _get_uptime(): + raise NotImplementedError + + +@add_divider_highlight_group('background:divider') +def uptime(pl, days_format='{days:d}d', hours_format=' {hours:d}h', minutes_format=' {minutes:d}m', seconds_format=' {seconds:d}s', shorten_len=3): + '''Return system uptime. + + :param str days_format: + day format string, will be passed ``days`` as the argument + :param str hours_format: + hour format string, will be passed ``hours`` as the argument + :param str minutes_format: + minute format string, will be passed ``minutes`` as the argument + :param str seconds_format: + second format string, will be passed ``seconds`` as the argument + :param int shorten_len: + shorten the amount of units (days, hours, etc.) displayed + + Divider highlight group used: ``background:divider``. + ''' + try: + seconds = _get_uptime() + except NotImplementedError: + pl.warn('Unable to get uptime. You should install psutil module') + return None + minutes, seconds = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + days, hours = divmod(hours, 24) + time_formatted = list(filter(None, [ + days_format.format(days=days) if days and days_format else None, + hours_format.format(hours=hours) if hours and hours_format else None, + minutes_format.format(minutes=minutes) if minutes and minutes_format else None, + seconds_format.format(seconds=seconds) if seconds and seconds_format else None, + ]))[0:shorten_len] + return ''.join(time_formatted).strip() diff --git a/powerline/segments/common/time.py b/powerline/segments/common/time.py new file mode 100644 index 00000000..f13e196e --- /dev/null +++ b/powerline/segments/common/time.py @@ -0,0 +1,89 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +from datetime import datetime + + +def date(pl, format='%Y-%m-%d', istime=False): + '''Return the current date. + + :param str format: + strftime-style date format string + :param bool istime: + If true then segment uses ``time`` highlight group. + + Divider highlight group used: ``time:divider``. + + Highlight groups used: ``time`` or ``date``. + ''' + return [{ + 'contents': datetime.now().strftime(format), + 'highlight_group': (['time'] if istime else []) + ['date'], + 'divider_highlight_group': 'time:divider' if istime else None, + }] + + +UNICODE_TEXT_TRANSLATION = { + ord('\''): '’', + ord('-'): '‐', +} + + +def fuzzy_time(pl, unicode_text=False): + '''Display the current time as fuzzy time, e.g. "quarter past six". + + :param bool unicode_text: + If true then hyphenminuses (regular ASCII ``-``) and single quotes are + replaced with unicode dashes and apostrophes. + ''' + hour_str = ['twelve', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten', 'eleven'] + minute_str = { + 5: 'five past', + 10: 'ten past', + 15: 'quarter past', + 20: 'twenty past', + 25: 'twenty-five past', + 30: 'half past', + 35: 'twenty-five to', + 40: 'twenty to', + 45: 'quarter to', + 50: 'ten to', + 55: 'five to', + } + special_case_str = { + (23, 58): 'round about midnight', + (23, 59): 'round about midnight', + (0, 0): 'midnight', + (0, 1): 'round about midnight', + (0, 2): 'round about midnight', + (12, 0): 'noon', + } + + now = datetime.now() + + try: + return special_case_str[(now.hour, now.minute)] + except KeyError: + pass + + hour = now.hour + if now.minute > 32: + if hour == 23: + hour = 0 + else: + hour += 1 + if hour > 11: + hour = hour - 12 + hour = hour_str[hour] + + minute = int(round(now.minute / 5.0) * 5) + if minute == 60 or minute == 0: + result = ' '.join([hour, 'o\'clock']) + else: + minute = minute_str[minute] + result = ' '.join([minute, hour]) + + if unicode_text: + result = result.translate(UNICODE_TEXT_TRANSLATION) + + return result diff --git a/powerline/segments/common/vcs.py b/powerline/segments/common/vcs.py new file mode 100644 index 00000000..a6311bc5 --- /dev/null +++ b/powerline/segments/common/vcs.py @@ -0,0 +1,33 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +from powerline.lib.vcs import guess, tree_status +from powerline.theme import requires_segment_info, requires_filesystem_watcher + + +@requires_filesystem_watcher +@requires_segment_info +def branch(pl, segment_info, create_watcher, status_colors=False): + '''Return the current VCS branch. + + :param bool status_colors: + determines whether repository status will be used to determine highlighting. Default: False. + + Highlight groups used: ``branch_clean``, ``branch_dirty``, ``branch``. + ''' + name = segment_info['getcwd']() + repo = guess(path=name, create_watcher=create_watcher) + if repo is not None: + branch = repo.branch() + scol = ['branch'] + if status_colors: + try: + status = tree_status(repo, pl) + except Exception as e: + pl.exception('Failed to compute tree status: {0}', str(e)) + status = '?' + scol.insert(0, 'branch_dirty' if status and status.strip() else 'branch_clean') + return [{ + 'contents': branch, + 'highlight_group': scol, + }] diff --git a/powerline/segments/common/wthr.py b/powerline/segments/common/wthr.py new file mode 100644 index 00000000..8819806f --- /dev/null +++ b/powerline/segments/common/wthr.py @@ -0,0 +1,229 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import json + +from powerline.lib.url import urllib_read, urllib_urlencode +from powerline.lib.threaded import KwThreadedSegment +from powerline.segments import with_docstring + + +# XXX Warning: module name must not be equal to the segment name as long as this +# segment is imported into powerline.segments.common module. + + +# Weather condition code descriptions available at +# http://developer.yahoo.com/weather/#codes +weather_conditions_codes = ( + ('tornado', 'stormy'), # 0 + ('tropical_storm', 'stormy'), # 1 + ('hurricane', 'stormy'), # 2 + ('severe_thunderstorms', 'stormy'), # 3 + ('thunderstorms', 'stormy'), # 4 + ('mixed_rain_and_snow', 'rainy' ), # 5 + ('mixed_rain_and_sleet', 'rainy' ), # 6 + ('mixed_snow_and_sleet', 'snowy' ), # 7 + ('freezing_drizzle', 'rainy' ), # 8 + ('drizzle', 'rainy' ), # 9 + ('freezing_rain', 'rainy' ), # 10 + ('showers', 'rainy' ), # 11 + ('showers', 'rainy' ), # 12 + ('snow_flurries', 'snowy' ), # 13 + ('light_snow_showers', 'snowy' ), # 14 + ('blowing_snow', 'snowy' ), # 15 + ('snow', 'snowy' ), # 16 + ('hail', 'snowy' ), # 17 + ('sleet', 'snowy' ), # 18 + ('dust', 'foggy' ), # 19 + ('fog', 'foggy' ), # 20 + ('haze', 'foggy' ), # 21 + ('smoky', 'foggy' ), # 22 + ('blustery', 'foggy' ), # 23 + ('windy', ), # 24 + ('cold', 'day' ), # 25 + ('clouds', 'cloudy'), # 26 + ('mostly_cloudy_night', 'cloudy'), # 27 + ('mostly_cloudy_day', 'cloudy'), # 28 + ('partly_cloudy_night', 'cloudy'), # 29 + ('partly_cloudy_day', 'cloudy'), # 30 + ('clear_night', 'night' ), # 31 + ('sun', 'sunny' ), # 32 + ('fair_night', 'night' ), # 33 + ('fair_day', 'day' ), # 34 + ('mixed_rain_and_hail', 'rainy' ), # 35 + ('hot', 'sunny' ), # 36 + ('isolated_thunderstorms', 'stormy'), # 37 + ('scattered_thunderstorms', 'stormy'), # 38 + ('scattered_thunderstorms', 'stormy'), # 39 + ('scattered_showers', 'rainy' ), # 40 + ('heavy_snow', 'snowy' ), # 41 + ('scattered_snow_showers', 'snowy' ), # 42 + ('heavy_snow', 'snowy' ), # 43 + ('partly_cloudy', 'cloudy'), # 44 + ('thundershowers', 'rainy' ), # 45 + ('snow_showers', 'snowy' ), # 46 + ('isolated_thundershowers', 'rainy' ), # 47 +) +# ('day', (25, 34)), +# ('rainy', (5, 6, 8, 9, 10, 11, 12, 35, 40, 45, 47)), +# ('cloudy', (26, 27, 28, 29, 30, 44)), +# ('snowy', (7, 13, 14, 15, 16, 17, 18, 41, 42, 43, 46)), +# ('stormy', (0, 1, 2, 3, 4, 37, 38, 39)), +# ('foggy', (19, 20, 21, 22, 23)), +# ('sunny', (32, 36)), +# ('night', (31, 33))): +weather_conditions_icons = { + 'day': 'DAY', + 'blustery': 'WIND', + 'rainy': 'RAIN', + 'cloudy': 'CLOUDS', + 'snowy': 'SNOW', + 'stormy': 'STORM', + 'foggy': 'FOG', + 'sunny': 'SUN', + 'night': 'NIGHT', + 'windy': 'WINDY', + 'not_available': 'NA', + 'unknown': 'UKN', +} + +temp_conversions = { + 'C': lambda temp: temp, + 'F': lambda temp: (temp * 9 / 5) + 32, + 'K': lambda temp: temp + 273.15, +} + +# Note: there are also unicode characters for units: ℃, ℉ and K +temp_units = { + 'C': '°C', + 'F': '°F', + 'K': 'K', +} + + +class WeatherSegment(KwThreadedSegment): + interval = 600 + default_location = None + location_urls = {} + + @staticmethod + def key(location_query=None, **kwargs): + return location_query + + def get_request_url(self, location_query): + try: + return self.location_urls[location_query] + except KeyError: + if location_query is None: + location_data = json.loads(urllib_read('http://freegeoip.net/json/')) + location = ','.join(( + location_data['city'], + location_data['region_name'], + location_data['country_code'] + )) + self.info('Location returned by freegeoip is {0}', location) + else: + location = location_query + query_data = { + 'q': + 'use "https://raw.githubusercontent.com/yql/yql-tables/master/weather/weather.bylocation.xml" as we;' + 'select * from we where location="{0}" and unit="c"'.format(location).encode('utf-8'), + 'format': 'json', + } + self.location_urls[location_query] = url = ( + 'http://query.yahooapis.com/v1/public/yql?' + urllib_urlencode(query_data)) + return url + + def compute_state(self, location_query): + url = self.get_request_url(location_query) + raw_response = urllib_read(url) + if not raw_response: + self.error('Failed to get response') + return + response = json.loads(raw_response) + condition = response['query']['results']['weather']['rss']['channel']['item']['condition'] + condition_code = int(condition['code']) + temp = float(condition['temp']) + + try: + icon_names = weather_conditions_codes[condition_code] + except IndexError: + if condition_code == 3200: + icon_names = ('not_available',) + self.warn('Weather is not available for location {0}', self.location) + else: + icon_names = ('unknown',) + self.error('Unknown condition code: {0}', condition_code) + + return (temp, icon_names) + + def render_one(self, weather, icons=None, unit='C', temp_format=None, temp_coldest=-30, temp_hottest=40, **kwargs): + if not weather: + return None + + temp, icon_names = weather + + for icon_name in icon_names: + if icons: + if icon_name in icons: + icon = icons[icon_name] + break + else: + icon = weather_conditions_icons[icon_names[-1]] + + temp_format = temp_format or ('{temp:.0f}' + temp_units[unit]) + converted_temp = temp_conversions[unit](temp) + if temp <= temp_coldest: + gradient_level = 0 + elif temp >= temp_hottest: + gradient_level = 100 + else: + gradient_level = (temp - temp_coldest) * 100.0 / (temp_hottest - temp_coldest) + groups = ['weather_condition_' + icon_name for icon_name in icon_names] + ['weather_conditions', 'weather'] + return [ + { + 'contents': icon + ' ', + 'highlight_group': groups, + 'divider_highlight_group': 'background:divider', + }, + { + 'contents': temp_format.format(temp=converted_temp), + 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], + 'divider_highlight_group': 'background:divider', + 'gradient_level': gradient_level, + }, + ] + + +weather = with_docstring(WeatherSegment(), +'''Return weather from Yahoo! Weather. + +Uses GeoIP lookup from http://freegeoip.net/ to automatically determine +your current location. This should be changed if you’re in a VPN or if your +IP address is registered at another location. + +Returns a list of colorized icon and temperature segments depending on +weather conditions. + +:param str unit: + temperature unit, can be one of ``F``, ``C`` or ``K`` +:param str location_query: + location query for your current location, e.g. ``oslo, norway`` +:param dict icons: + dict for overriding default icons, e.g. ``{'heavy_snow' : u'❆'}`` +:param str temp_format: + format string, receives ``temp`` as an argument. Should also hold unit. +:param float temp_coldest: + coldest temperature. Any temperature below it will have gradient level equal + to zero. +:param float temp_hottest: + hottest temperature. Any temperature above it will have gradient level equal + to 100. Temperatures between ``temp_coldest`` and ``temp_hottest`` receive + gradient level that indicates relative position in this interval + (``100 * (cur-coldest) / (hottest-coldest)``). + +Divider highlight group used: ``background:divider``. + +Highlight groups used: ``weather_conditions`` or ``weather``, ``weather_temp_gradient`` (gradient) or ``weather``. +Also uses ``weather_conditions_{condition}`` for all weather conditions supported by Yahoo. +''') diff --git a/powerline/segments/shell.py b/powerline/segments/shell.py index 6572483f..1b763723 100644 --- a/powerline/segments/shell.py +++ b/powerline/segments/shell.py @@ -3,7 +3,7 @@ from __future__ import (unicode_literals, division, absolute_import, print_funct from powerline.theme import requires_segment_info from powerline.segments import with_docstring -from powerline.segments.common import CwdSegment +from powerline.segments.common.env import CwdSegment from powerline.lib.unicode import out_u diff --git a/tests/test_segments.py b/tests/test_segments.py index 7de3b611..4f82e810 100644 --- a/tests/test_segments.py +++ b/tests/test_segments.py @@ -273,12 +273,6 @@ class TestShell(TestCase): cwd[0] = ValueError() self.assertRaises(ValueError, shell.cwd, pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2) - def test_date(self): - pl = Pl() - with replace_attr(common, 'datetime', Args(now=lambda: Args(strftime=lambda fmt: fmt))): - self.assertEqual(common.date(pl=pl), [{'contents': '%Y-%m-%d', 'highlight_group': ['date'], 'divider_highlight_group': None}]) - self.assertEqual(common.date(pl=pl, format='%H:%M', istime=True), [{'contents': '%H:%M', 'highlight_group': ['time', 'date'], 'divider_highlight_group': 'time:divider'}]) - class TestTmux(TestCase): def test_attached_clients(self): @@ -295,26 +289,155 @@ class TestTmux(TestCase): class TestCommon(TestCase): + @classmethod + def setUpClass(cls): + module = __import__(str('powerline.segments.common.{0}'.format(cls.module_name))) + cls.module = getattr(module.segments.common, str(cls.module_name)) + + +class TestNet(TestCommon): + module_name = 'net' + def test_hostname(self): pl = Pl() with replace_env('SSH_CLIENT', '192.168.0.12 40921 22') as segment_info: - with replace_module_module(common, 'socket', gethostname=lambda: 'abc'): + with replace_module_module(self.module, 'socket', gethostname=lambda: 'abc'): self.assertEqual(common.hostname(pl=pl, segment_info=segment_info), 'abc') self.assertEqual(common.hostname(pl=pl, segment_info=segment_info, only_if_ssh=True), 'abc') - with replace_module_module(common, 'socket', gethostname=lambda: 'abc.mydomain'): + with replace_module_module(self.module, 'socket', gethostname=lambda: 'abc.mydomain'): self.assertEqual(common.hostname(pl=pl, segment_info=segment_info), 'abc.mydomain') self.assertEqual(common.hostname(pl=pl, segment_info=segment_info, exclude_domain=True), 'abc') self.assertEqual(common.hostname(pl=pl, segment_info=segment_info, only_if_ssh=True), 'abc.mydomain') self.assertEqual(common.hostname(pl=pl, segment_info=segment_info, only_if_ssh=True, exclude_domain=True), 'abc') segment_info['environ'].pop('SSH_CLIENT') - with replace_module_module(common, 'socket', gethostname=lambda: 'abc'): + with replace_module_module(self.module, 'socket', gethostname=lambda: 'abc'): self.assertEqual(common.hostname(pl=pl, segment_info=segment_info), 'abc') self.assertEqual(common.hostname(pl=pl, segment_info=segment_info, only_if_ssh=True), None) - with replace_module_module(common, 'socket', gethostname=lambda: 'abc.mydomain'): + with replace_module_module(self.module, 'socket', gethostname=lambda: 'abc.mydomain'): self.assertEqual(common.hostname(pl=pl, segment_info=segment_info), 'abc.mydomain') self.assertEqual(common.hostname(pl=pl, segment_info=segment_info, exclude_domain=True), 'abc') self.assertEqual(common.hostname(pl=pl, segment_info=segment_info, only_if_ssh=True, exclude_domain=True), None) + def test_external_ip(self): + pl = Pl() + with replace_attr(self.module, 'urllib_read', urllib_read): + self.assertEqual(common.external_ip(pl=pl), [{'contents': '127.0.0.1', 'divider_highlight_group': 'background:divider'}]) + + def test_internal_ip(self): + try: + import netifaces + except ImportError: + raise SkipTest('netifaces module is not available') + pl = Pl() + addr = { + 'enp2s0': { + netifaces.AF_INET: [{'addr': '192.168.100.200'}], + netifaces.AF_INET6: [{'addr': 'feff::5446:5eff:fe5a:7777%enp2s0'}] + }, + 'lo': { + netifaces.AF_INET: [{'addr': '127.0.0.1'}], + netifaces.AF_INET6: [{'addr': '::1'}] + }, + 'teredo': { + netifaces.AF_INET6: [{'addr': 'feff::5446:5eff:fe5a:7777'}] + }, + } + interfaces = ['lo', 'enp2s0', 'teredo'] + with replace_module_module( + self.module, 'netifaces', + interfaces=(lambda: interfaces), + ifaddresses=(lambda interface: addr[interface]), + AF_INET=netifaces.AF_INET, + AF_INET6=netifaces.AF_INET6, + ): + self.assertEqual(common.internal_ip(pl=pl), '192.168.100.200') + self.assertEqual(common.internal_ip(pl=pl, interface='detect'), '192.168.100.200') + self.assertEqual(common.internal_ip(pl=pl, interface='lo'), '127.0.0.1') + self.assertEqual(common.internal_ip(pl=pl, interface='teredo'), None) + self.assertEqual(common.internal_ip(pl=pl, ipv=4), '192.168.100.200') + self.assertEqual(common.internal_ip(pl=pl, interface='detect', ipv=4), '192.168.100.200') + self.assertEqual(common.internal_ip(pl=pl, interface='lo', ipv=4), '127.0.0.1') + self.assertEqual(common.internal_ip(pl=pl, interface='teredo', ipv=4), None) + self.assertEqual(common.internal_ip(pl=pl, ipv=6), 'feff::5446:5eff:fe5a:7777%enp2s0') + self.assertEqual(common.internal_ip(pl=pl, interface='detect', ipv=6), 'feff::5446:5eff:fe5a:7777%enp2s0') + self.assertEqual(common.internal_ip(pl=pl, interface='lo', ipv=6), '::1') + self.assertEqual(common.internal_ip(pl=pl, interface='teredo', ipv=6), 'feff::5446:5eff:fe5a:7777') + interfaces[1:2] = () + self.assertEqual(common.internal_ip(pl=pl, ipv=6), 'feff::5446:5eff:fe5a:7777') + interfaces[1:2] = () + self.assertEqual(common.internal_ip(pl=pl, ipv=6), '::1') + interfaces[:] = () + self.assertEqual(common.internal_ip(pl=pl, ipv=6), None) + + def test_network_load(self): + from time import sleep + + def gb(interface): + return None + + f = [gb] + + def _get_bytes(interface): + return f[0](interface) + + pl = Pl() + + with replace_attr(self.module, '_get_bytes', _get_bytes): + common.network_load.startup(pl=pl) + try: + self.assertEqual(common.network_load(pl=pl, interface='eth0'), None) + sleep(common.network_load.interval) + self.assertEqual(common.network_load(pl=pl, interface='eth0'), None) + while 'prev' not in common.network_load.interfaces.get('eth0', {}): + sleep(0.1) + self.assertEqual(common.network_load(pl=pl, interface='eth0'), None) + + l = [0, 0] + + def gb2(interface): + l[0] += 1200 + l[1] += 2400 + return tuple(l) + f[0] = gb2 + + while not common.network_load.interfaces.get('eth0', {}).get('prev', (None, None))[1]: + sleep(0.1) + self.assertEqual(common.network_load(pl=pl, interface='eth0'), [ + {'divider_highlight_group': 'background:divider', 'contents': 'DL 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']}, + {'divider_highlight_group': 'background:divider', 'contents': 'UL 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']}, + ]) + self.assertEqual(common.network_load(pl=pl, interface='eth0', recv_format='r {value}', sent_format='s {value}'), [ + {'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']}, + {'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']}, + ]) + self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', suffix='bps', interface='eth0'), [ + {'divider_highlight_group': 'background:divider', 'contents': 'r 1 Kibps', 'highlight_group': ['network_load_recv', 'network_load']}, + {'divider_highlight_group': 'background:divider', 'contents': 's 2 Kibps', 'highlight_group': ['network_load_sent', 'network_load']}, + ]) + self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', si_prefix=True, interface='eth0'), [ + {'divider_highlight_group': 'background:divider', 'contents': 'r 1 kB/s', 'highlight_group': ['network_load_recv', 'network_load']}, + {'divider_highlight_group': 'background:divider', 'contents': 's 2 kB/s', 'highlight_group': ['network_load_sent', 'network_load']}, + ]) + self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', recv_max=0, interface='eth0'), [ + {'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv_gradient', 'network_load_gradient', 'network_load_recv', 'network_load'], 'gradient_level': 100}, + {'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']}, + ]) + + class ApproxEqual(object): + def __eq__(self, i): + return abs(i - 50.0) < 1 + + self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', sent_max=4800, interface='eth0'), [ + {'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']}, + {'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent_gradient', 'network_load_gradient', 'network_load_sent', 'network_load'], 'gradient_level': ApproxEqual()}, + ]) + finally: + common.network_load.shutdown() + + +class TestEnv(TestCommon): + module_name = 'env' + def test_user(self): new_os = new_module('os', getpid=lambda: 1) @@ -332,9 +455,9 @@ class TestCommon(TestCase): pl = Pl() with replace_env('USER', 'def') as segment_info: common.username = False - with replace_attr(common, 'os', new_os): - with replace_attr(common, 'psutil', new_psutil): - with replace_attr(common, '_geteuid', lambda: 5): + with replace_attr(self.module, 'os', new_os): + with replace_attr(self.module, 'psutil', new_psutil): + with replace_attr(self.module, '_geteuid', lambda: 5): self.assertEqual(common.user(pl=pl, segment_info=segment_info), [ {'contents': 'def', 'highlight_group': ['user']} ]) @@ -342,38 +465,11 @@ class TestCommon(TestCase): {'contents': 'def', 'highlight_group': ['user']} ]) self.assertEqual(common.user(pl=pl, segment_info=segment_info, hide_user='def'), None) - with replace_attr(common, '_geteuid', lambda: 0): + with replace_attr(self.module, '_geteuid', lambda: 0): self.assertEqual(common.user(pl=pl, segment_info=segment_info), [ {'contents': 'def', 'highlight_group': ['superuser', 'user']} ]) - def test_branch(self): - pl = Pl() - create_watcher = get_fallback_create_watcher() - segment_info = {'getcwd': os.getcwd} - branch = partial(common.branch, pl=pl, create_watcher=create_watcher) - with replace_attr(common, 'guess', get_dummy_guess(status=lambda: None, directory='/tmp/tests')): - with replace_attr(common, 'tree_status', lambda repo, pl: None): - self.assertEqual(branch(segment_info=segment_info, status_colors=False), [ - {'highlight_group': ['branch'], 'contents': 'tests'} - ]) - self.assertEqual(branch(segment_info=segment_info, status_colors=True), [ - {'contents': 'tests', 'highlight_group': ['branch_clean', 'branch']} - ]) - with replace_attr(common, 'guess', get_dummy_guess(status=lambda: 'D ', directory='/tmp/tests')): - with replace_attr(common, 'tree_status', lambda repo, pl: 'D '): - self.assertEqual(branch(segment_info=segment_info, status_colors=False), [ - {'highlight_group': ['branch'], 'contents': 'tests'} - ]) - self.assertEqual(branch(segment_info=segment_info, status_colors=True), [ - {'contents': 'tests', 'highlight_group': ['branch_dirty', 'branch']} - ]) - self.assertEqual(branch(segment_info=segment_info, status_colors=False), [ - {'highlight_group': ['branch'], 'contents': 'tests'} - ]) - with replace_attr(common, 'guess', lambda path, create_watcher: None): - self.assertEqual(branch(segment_info=segment_info, status_colors=False), None) - def test_cwd(self): new_os = new_module('os', path=os.path, sep='/') pl = Pl() @@ -387,7 +483,7 @@ class TestCommon(TestCase): return wd segment_info = {'getcwd': getcwd, 'home': None} - with replace_attr(common, 'os', new_os): + with replace_attr(self.module, 'os', new_os): cwd[0] = '/abc/def/ghi/foo/bar' self.assertEqual(common.cwd(pl=pl, segment_info=segment_info), [ {'contents': '/', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, @@ -473,16 +569,67 @@ class TestCommon(TestCase): cwd[0] = ValueError() self.assertRaises(ValueError, common.cwd, pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2) + def test_virtualenv(self): + pl = Pl() + with replace_env('VIRTUAL_ENV', '/abc/def/ghi') as segment_info: + self.assertEqual(common.virtualenv(pl=pl, segment_info=segment_info), 'ghi') + segment_info['environ'].pop('VIRTUAL_ENV') + self.assertEqual(common.virtualenv(pl=pl, segment_info=segment_info), None) + + def test_environment(self): + pl = Pl() + variable = 'FOO' + value = 'bar' + with replace_env(variable, value) as segment_info: + self.assertEqual(common.environment(pl=pl, segment_info=segment_info, variable=variable), value) + segment_info['environ'].pop(variable) + self.assertEqual(common.environment(pl=pl, segment_info=segment_info, variable=variable), None) + + +class TestVcs(TestCommon): + module_name = 'vcs' + + def test_branch(self): + pl = Pl() + create_watcher = get_fallback_create_watcher() + segment_info = {'getcwd': os.getcwd} + branch = partial(common.branch, pl=pl, create_watcher=create_watcher) + with replace_attr(self.module, 'guess', get_dummy_guess(status=lambda: None, directory='/tmp/tests')): + with replace_attr(self.module, 'tree_status', lambda repo, pl: None): + self.assertEqual(branch(segment_info=segment_info, status_colors=False), [ + {'highlight_group': ['branch'], 'contents': 'tests'} + ]) + self.assertEqual(branch(segment_info=segment_info, status_colors=True), [ + {'contents': 'tests', 'highlight_group': ['branch_clean', 'branch']} + ]) + with replace_attr(self.module, 'guess', get_dummy_guess(status=lambda: 'D ', directory='/tmp/tests')): + with replace_attr(self.module, 'tree_status', lambda repo, pl: 'D '): + self.assertEqual(branch(segment_info=segment_info, status_colors=False), [ + {'highlight_group': ['branch'], 'contents': 'tests'} + ]) + self.assertEqual(branch(segment_info=segment_info, status_colors=True), [ + {'contents': 'tests', 'highlight_group': ['branch_dirty', 'branch']} + ]) + self.assertEqual(branch(segment_info=segment_info, status_colors=False), [ + {'highlight_group': ['branch'], 'contents': 'tests'} + ]) + with replace_attr(self.module, 'guess', lambda path, create_watcher: None): + self.assertEqual(branch(segment_info=segment_info, status_colors=False), None) + + +class TestTime(TestCommon): + module_name = 'time' + def test_date(self): pl = Pl() - with replace_attr(common, 'datetime', Args(now=lambda: Args(strftime=lambda fmt: fmt))): + with replace_attr(self.module, 'datetime', Args(now=lambda: Args(strftime=lambda fmt: fmt))): self.assertEqual(common.date(pl=pl), [{'contents': '%Y-%m-%d', 'highlight_group': ['date'], 'divider_highlight_group': None}]) self.assertEqual(common.date(pl=pl, format='%H:%M', istime=True), [{'contents': '%H:%M', 'highlight_group': ['time', 'date'], 'divider_highlight_group': 'time:divider'}]) def test_fuzzy_time(self): time = Args(hour=0, minute=45) pl = Pl() - with replace_attr(common, 'datetime', Args(now=lambda: time)): + with replace_attr(self.module, 'datetime', Args(now=lambda: time)): self.assertEqual(common.fuzzy_time(pl=pl), 'quarter to one') time.hour = 23 time.minute = 59 @@ -500,19 +647,18 @@ class TestCommon(TestCase): time.minute = 60 self.assertEqual(common.fuzzy_time(pl=pl, unicode_text=True), 'twelve o’clock') - def test_external_ip(self): - pl = Pl() - with replace_attr(common, 'urllib_read', urllib_read): - self.assertEqual(common.external_ip(pl=pl), [{'contents': '127.0.0.1', 'divider_highlight_group': 'background:divider'}]) + +class TestSys(TestCommon): + module_name = 'sys' def test_uptime(self): pl = Pl() - with replace_attr(common, '_get_uptime', lambda: 259200): + with replace_attr(self.module, '_get_uptime', lambda: 259200): self.assertEqual(common.uptime(pl=pl), [{'contents': '3d', 'divider_highlight_group': 'background:divider'}]) - with replace_attr(common, '_get_uptime', lambda: 93784): + with replace_attr(self.module, '_get_uptime', lambda: 93784): self.assertEqual(common.uptime(pl=pl), [{'contents': '1d 2h 3m', 'divider_highlight_group': 'background:divider'}]) self.assertEqual(common.uptime(pl=pl, shorten_len=4), [{'contents': '1d 2h 3m 4s', 'divider_highlight_group': 'background:divider'}]) - with replace_attr(common, '_get_uptime', lambda: 65536): + with replace_attr(self.module, '_get_uptime', lambda: 65536): self.assertEqual(common.uptime(pl=pl), [{'contents': '18h 12m 16s', 'divider_highlight_group': 'background:divider'}]) self.assertEqual(common.uptime(pl=pl, shorten_len=2), [{'contents': '18h 12m', 'divider_highlight_group': 'background:divider'}]) self.assertEqual(common.uptime(pl=pl, shorten_len=1), [{'contents': '18h', 'divider_highlight_group': 'background:divider'}]) @@ -520,12 +666,45 @@ class TestCommon(TestCase): def _get_uptime(): raise NotImplementedError - with replace_attr(common, '_get_uptime', _get_uptime): + with replace_attr(self.module, '_get_uptime', _get_uptime): self.assertEqual(common.uptime(pl=pl), None) + def test_system_load(self): + pl = Pl() + with replace_module_module(self.module, 'os', getloadavg=lambda: (7.5, 3.5, 1.5)): + with replace_attr(self.module, '_cpu_count', lambda: 2): + self.assertEqual(common.system_load(pl=pl), [ + {'contents': '7.5 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, + {'contents': '3.5 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 75.0}, + {'contents': '1.5', 'highlight_group': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 0} + ]) + self.assertEqual(common.system_load(pl=pl, format='{avg:.0f}', threshold_good=0, threshold_bad=1), [ + {'contents': '8 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, + {'contents': '4 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, + {'contents': '2', 'highlight_group': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 75.0} + ]) + + def test_cpu_load_percent(self): + pl = Pl() + with replace_module_module(self.module, 'psutil', cpu_percent=lambda **kwargs: 52.3): + self.assertEqual(common.cpu_load_percent(pl=pl), [{ + 'contents': '52%', + 'gradient_level': 52.3, + 'highlight_group': ['cpu_load_percent_gradient', 'cpu_load_percent'], + }]) + self.assertEqual(common.cpu_load_percent(pl=pl, format='{0:.1f}%'), [{ + 'contents': '52.3%', + 'gradient_level': 52.3, + 'highlight_group': ['cpu_load_percent_gradient', 'cpu_load_percent'], + }]) + + +class TestWthr(TestCommon): + module_name = 'wthr' + def test_weather(self): pl = Pl() - with replace_attr(common, 'urllib_read', urllib_read): + with replace_attr(self.module, 'urllib_read', urllib_read): self.assertEqual(common.weather(pl=pl), [ {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': 'CLOUDS '}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0} @@ -558,7 +737,7 @@ class TestCommon(TestCase): {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': 'CLOUDS '}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9.0e+00C', 'gradient_level': 30.0} ]) - with replace_attr(common, 'urllib_read', urllib_read): + with replace_attr(self.module, 'urllib_read', urllib_read): common.weather.startup(pl=pl, location_query='Meppen,06,DE') self.assertEqual(common.weather(pl=pl), [ {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': 'CLOUDS '}, @@ -570,131 +749,33 @@ class TestCommon(TestCase): ]) common.weather.shutdown() - def test_system_load(self): - pl = Pl() - with replace_module_module(common, 'os', getloadavg=lambda: (7.5, 3.5, 1.5)): - with replace_attr(common, '_cpu_count', lambda: 2): - self.assertEqual(common.system_load(pl=pl), [ - {'contents': '7.5 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, - {'contents': '3.5 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 75.0}, - {'contents': '1.5', 'highlight_group': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 0} - ]) - self.assertEqual(common.system_load(pl=pl, format='{avg:.0f}', threshold_good=0, threshold_bad=1), [ - {'contents': '8 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, - {'contents': '4 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, - {'contents': '2', 'highlight_group': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 75.0} - ]) - def test_cpu_load_percent(self): - pl = Pl() - with replace_module_module(common, 'psutil', cpu_percent=lambda **kwargs: 52.3): - self.assertEqual(common.cpu_load_percent(pl=pl), [{ - 'contents': '52%', - 'gradient_level': 52.3, - 'highlight_group': ['cpu_load_percent_gradient', 'cpu_load_percent'], - }]) - self.assertEqual(common.cpu_load_percent(pl=pl, format='{0:.1f}%'), [{ - 'contents': '52.3%', - 'gradient_level': 52.3, - 'highlight_group': ['cpu_load_percent_gradient', 'cpu_load_percent'], - }]) - - def test_network_load(self): - from time import sleep - - def gb(interface): - return None - - f = [gb] - - def _get_bytes(interface): - return f[0](interface) - - pl = Pl() - - with replace_attr(common, '_get_bytes', _get_bytes): - common.network_load.startup(pl=pl) - try: - self.assertEqual(common.network_load(pl=pl, interface='eth0'), None) - sleep(common.network_load.interval) - self.assertEqual(common.network_load(pl=pl, interface='eth0'), None) - while 'prev' not in common.network_load.interfaces.get('eth0', {}): - sleep(0.1) - self.assertEqual(common.network_load(pl=pl, interface='eth0'), None) - - l = [0, 0] - - def gb2(interface): - l[0] += 1200 - l[1] += 2400 - return tuple(l) - f[0] = gb2 - - while not common.network_load.interfaces.get('eth0', {}).get('prev', (None, None))[1]: - sleep(0.1) - self.assertEqual(common.network_load(pl=pl, interface='eth0'), [ - {'divider_highlight_group': 'background:divider', 'contents': 'DL 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']}, - {'divider_highlight_group': 'background:divider', 'contents': 'UL 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']}, - ]) - self.assertEqual(common.network_load(pl=pl, interface='eth0', recv_format='r {value}', sent_format='s {value}'), [ - {'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']}, - {'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']}, - ]) - self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', suffix='bps', interface='eth0'), [ - {'divider_highlight_group': 'background:divider', 'contents': 'r 1 Kibps', 'highlight_group': ['network_load_recv', 'network_load']}, - {'divider_highlight_group': 'background:divider', 'contents': 's 2 Kibps', 'highlight_group': ['network_load_sent', 'network_load']}, - ]) - self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', si_prefix=True, interface='eth0'), [ - {'divider_highlight_group': 'background:divider', 'contents': 'r 1 kB/s', 'highlight_group': ['network_load_recv', 'network_load']}, - {'divider_highlight_group': 'background:divider', 'contents': 's 2 kB/s', 'highlight_group': ['network_load_sent', 'network_load']}, - ]) - self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', recv_max=0, interface='eth0'), [ - {'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv_gradient', 'network_load_gradient', 'network_load_recv', 'network_load'], 'gradient_level': 100}, - {'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']}, - ]) - - class ApproxEqual(object): - def __eq__(self, i): - return abs(i - 50.0) < 1 - - self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', sent_max=4800, interface='eth0'), [ - {'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']}, - {'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent_gradient', 'network_load_gradient', 'network_load_sent', 'network_load'], 'gradient_level': ApproxEqual()}, - ]) - finally: - common.network_load.shutdown() - - def test_virtualenv(self): - pl = Pl() - with replace_env('VIRTUAL_ENV', '/abc/def/ghi') as segment_info: - self.assertEqual(common.virtualenv(pl=pl, segment_info=segment_info), 'ghi') - segment_info['environ'].pop('VIRTUAL_ENV') - self.assertEqual(common.virtualenv(pl=pl, segment_info=segment_info), None) - - def test_environment(self): - pl = Pl() - variable = 'FOO' - value = 'bar' - with replace_env(variable, value) as segment_info: - self.assertEqual(common.environment(pl=pl, segment_info=segment_info, variable=variable), value) - segment_info['environ'].pop(variable) - self.assertEqual(common.environment(pl=pl, segment_info=segment_info, variable=variable), None) +class TestMail(TestCommon): + module_name = 'mail' def test_email_imap_alert(self): # TODO pass + +class TestPlayers(TestCommon): + module_name = 'players' + def test_now_playing(self): # TODO pass + +class TestBat(TestCommon): + module_name = 'bat' + def test_battery(self): pl = Pl() def _get_capacity(pl): return 86 - with replace_attr(common, '_get_capacity', _get_capacity): + with replace_attr(self.module, '_get_capacity', _get_capacity): self.assertEqual(common.battery(pl=pl), [{ 'contents': '86%', 'highlight_group': ['battery_gradient', 'battery'], @@ -739,52 +820,6 @@ class TestCommon(TestCase): } ]) - def test_internal_ip(self): - try: - import netifaces - except ImportError: - raise SkipTest() - pl = Pl() - addr = { - 'enp2s0': { - netifaces.AF_INET: [{'addr': '192.168.100.200'}], - netifaces.AF_INET6: [{'addr': 'feff::5446:5eff:fe5a:7777%enp2s0'}] - }, - 'lo': { - netifaces.AF_INET: [{'addr': '127.0.0.1'}], - netifaces.AF_INET6: [{'addr': '::1'}] - }, - 'teredo': { - netifaces.AF_INET6: [{'addr': 'feff::5446:5eff:fe5a:7777'}] - }, - } - interfaces = ['lo', 'enp2s0', 'teredo'] - with replace_module_module( - common, 'netifaces', - interfaces=(lambda: interfaces), - ifaddresses=(lambda interface: addr[interface]), - AF_INET=netifaces.AF_INET, - AF_INET6=netifaces.AF_INET6, - ): - self.assertEqual(common.internal_ip(pl=pl), '192.168.100.200') - self.assertEqual(common.internal_ip(pl=pl, interface='detect'), '192.168.100.200') - self.assertEqual(common.internal_ip(pl=pl, interface='lo'), '127.0.0.1') - self.assertEqual(common.internal_ip(pl=pl, interface='teredo'), None) - self.assertEqual(common.internal_ip(pl=pl, ipv=4), '192.168.100.200') - self.assertEqual(common.internal_ip(pl=pl, interface='detect', ipv=4), '192.168.100.200') - self.assertEqual(common.internal_ip(pl=pl, interface='lo', ipv=4), '127.0.0.1') - self.assertEqual(common.internal_ip(pl=pl, interface='teredo', ipv=4), None) - self.assertEqual(common.internal_ip(pl=pl, ipv=6), 'feff::5446:5eff:fe5a:7777%enp2s0') - self.assertEqual(common.internal_ip(pl=pl, interface='detect', ipv=6), 'feff::5446:5eff:fe5a:7777%enp2s0') - self.assertEqual(common.internal_ip(pl=pl, interface='lo', ipv=6), '::1') - self.assertEqual(common.internal_ip(pl=pl, interface='teredo', ipv=6), 'feff::5446:5eff:fe5a:7777') - interfaces[1:2] = () - self.assertEqual(common.internal_ip(pl=pl, ipv=6), 'feff::5446:5eff:fe5a:7777') - interfaces[1:2] = () - self.assertEqual(common.internal_ip(pl=pl, ipv=6), '::1') - interfaces[:] = () - self.assertEqual(common.internal_ip(pl=pl, ipv=6), None) - class TestVim(TestCase): def test_mode(self): From 3c1d5c615ce4c364a9b922fd46e573a312b107a8 Mon Sep 17 00:00:00 2001 From: ZyX Date: Sun, 21 Sep 2014 02:10:13 +0400 Subject: [PATCH 3/4] Deprecate powerline.segments.common Makes linter emit a warning --- powerline/config_files/themes/ascii.json | 22 +++++++++---------- powerline/config_files/themes/ipython/in.json | 3 +-- .../config_files/themes/ipython/in2.json | 1 - powerline/config_files/themes/powerline.json | 22 +++++++++---------- .../config_files/themes/shell/default.json | 9 ++++---- .../themes/shell/default_leftonly.json | 9 ++++---- .../config_files/themes/tmux/default.json | 11 +++++----- powerline/config_files/themes/unicode.json | 22 +++++++++---------- .../config_files/themes/unicode_terminus.json | 22 +++++++++---------- .../themes/unicode_terminus_condensed.json | 22 +++++++++---------- powerline/config_files/themes/wm/default.json | 9 ++++---- powerline/lint/imp.py | 6 +++++ 12 files changed, 79 insertions(+), 79 deletions(-) diff --git a/powerline/config_files/themes/ascii.json b/powerline/config_files/themes/ascii.json index 7875f219..7b49da21 100644 --- a/powerline/config_files/themes/ascii.json +++ b/powerline/config_files/themes/ascii.json @@ -29,13 +29,16 @@ "before": "" }, - "powerline.segments.common.network_load": { + "powerline.segments.common.net.network_load": { "args": { "recv_format": "DL {value:>8}", "sent_format": "UL {value:>8}" } }, - "powerline.segments.common.now_playing": { + "powerline.segments.common.net.hostname": { + "before": "H " + }, + "powerline.segments.common.players.now_playing": { "args": { "state_symbols": { "fallback": "", @@ -45,25 +48,22 @@ } } }, - "powerline.segments.common.battery": { + "powerline.segments.common.bat.battery": { "args": { "full_heart": "O", "empty_heart": "O" } }, - "powerline.segments.common.uptime": { + "powerline.segments.common.sys.uptime": { "before": "UP " }, - "powerline.segments.common.email_imap_alert": { + "powerline.segments.common.mail.email_imap_alert": { "before": "MAIL " }, - "powerline.segments.common.virtualenv": { + "powerline.segments.common.env.virtualenv": { "before": "(e) " }, - "powerline.segments.common.hostname": { - "before": "H " - }, - "powerline.segments.common.weather": { + "powerline.segments.common.wthr.weather": { "args": { "icons": { "day": "DAY", @@ -82,7 +82,7 @@ "temp_format": "{temp:.0f} C" } }, - "powerline.segments.common.fuzzy_time": { + "powerline.segments.common.time.fuzzy_time": { "args": { "unicode_text": false } diff --git a/powerline/config_files/themes/ipython/in.json b/powerline/config_files/themes/ipython/in.json index 6218b3ab..b8957753 100644 --- a/powerline/config_files/themes/ipython/in.json +++ b/powerline/config_files/themes/ipython/in.json @@ -1,9 +1,8 @@ { - "default_module": "powerline.segments.common", "segments": { "left": [ { - "function": "virtualenv", + "function": "powerline.segments.common.env.virtualenv", "priority": 10 }, { diff --git a/powerline/config_files/themes/ipython/in2.json b/powerline/config_files/themes/ipython/in2.json index 601fc9e5..ce54e1bb 100644 --- a/powerline/config_files/themes/ipython/in2.json +++ b/powerline/config_files/themes/ipython/in2.json @@ -1,5 +1,4 @@ { - "default_module": "powerline.segments.common", "segments": { "left": [ { diff --git a/powerline/config_files/themes/powerline.json b/powerline/config_files/themes/powerline.json index 859d8341..bba3b541 100644 --- a/powerline/config_files/themes/powerline.json +++ b/powerline/config_files/themes/powerline.json @@ -28,13 +28,16 @@ "before": "⌚ " }, - "powerline.segments.common.network_load": { + "powerline.segments.common.net.network_load": { "args": { "recv_format": "⬇ {value:>8}", "sent_format": "⬆ {value:>8}" } }, - "powerline.segments.common.now_playing": { + "powerline.segments.common.net.hostname": { + "before": " " + }, + "powerline.segments.common.players.now_playing": { "args": { "state_symbols": { "fallback": "♫", @@ -44,25 +47,22 @@ } } }, - "powerline.segments.common.battery": { + "powerline.segments.common.bat.battery": { "args": { "full_heart": "♥", "empty_heart": "♥" } }, - "powerline.segments.common.uptime": { + "powerline.segments.common.sys.uptime": { "before": "⇑ " }, - "powerline.segments.common.email_imap_alert": { + "powerline.segments.common.mail.email_imap_alert": { "before": "✉ " }, - "powerline.segments.common.virtualenv": { + "powerline.segments.common.env.virtualenv": { "before": "ⓔ " }, - "powerline.segments.common.hostname": { - "before": " " - }, - "powerline.segments.common.weather": { + "powerline.segments.common.wthr.weather": { "args": { "icons": { "day": "〇", @@ -80,7 +80,7 @@ } } }, - "powerline.segments.common.fuzzy_time": { + "powerline.segments.common.time.fuzzy_time": { "args": { "unicode_text": true } diff --git a/powerline/config_files/themes/shell/default.json b/powerline/config_files/themes/shell/default.json index 6ba1ba6d..480da5c9 100644 --- a/powerline/config_files/themes/shell/default.json +++ b/powerline/config_files/themes/shell/default.json @@ -1,20 +1,19 @@ { - "default_module": "powerline.segments.common", "segments": { "left": [ { "function": "powerline.segments.shell.mode" }, { - "function": "hostname", + "function": "powerline.segments.common.net.hostname", "priority": 10 }, { - "function": "user", + "function": "powerline.segments.common.env.user", "priority": 30 }, { - "function": "virtualenv", + "function": "powerline.segments.common.env.virtualenv", "priority": 50 }, { @@ -32,7 +31,7 @@ "priority": 10 }, { - "function": "branch", + "function": "powerline.segments.common.vcs.branch", "priority": 40 } ] diff --git a/powerline/config_files/themes/shell/default_leftonly.json b/powerline/config_files/themes/shell/default_leftonly.json index 018847ba..61e59f81 100644 --- a/powerline/config_files/themes/shell/default_leftonly.json +++ b/powerline/config_files/themes/shell/default_leftonly.json @@ -1,21 +1,20 @@ { - "default_module": "powerline.segments.common", "segments": { "left": [ { - "function": "hostname", + "function": "powerline.segments.common.net.hostname", "priority": 10 }, { - "function": "user", + "function": "powerline.segments.common.env.user", "priority": 30 }, { - "function": "virtualenv", + "function": "powerline.segments.common.env.virtualenv", "priority": 50 }, { - "function": "branch", + "function": "powerline.segments.common.vcs.branch", "priority": 40 }, { diff --git a/powerline/config_files/themes/tmux/default.json b/powerline/config_files/themes/tmux/default.json index 780f34f2..4532cedc 100644 --- a/powerline/config_files/themes/tmux/default.json +++ b/powerline/config_files/themes/tmux/default.json @@ -1,20 +1,19 @@ { - "default_module": "powerline.segments.common", "segments": { "right": [ { - "function": "uptime", + "function": "powerline.segments.common.sys.uptime", "priority": 50 }, { - "function": "system_load", + "function": "powerline.segments.common.sys.system_load", "priority": 50 }, { - "function": "date" + "function": "powerline.segments.common.time.date" }, { - "function": "date", + "function": "powerline.segments.common.time.date", "name": "time", "args": { "format": "%H:%M", @@ -22,7 +21,7 @@ } }, { - "function": "hostname" + "function": "powerline.segments.common.net.hostname" } ] } diff --git a/powerline/config_files/themes/unicode.json b/powerline/config_files/themes/unicode.json index a3b5a365..de74c23b 100644 --- a/powerline/config_files/themes/unicode.json +++ b/powerline/config_files/themes/unicode.json @@ -28,13 +28,16 @@ "before": "⌚ " }, - "powerline.segments.common.network_load": { + "powerline.segments.common.net.network_load": { "args": { "recv_format": "⬇ {value:>8}", "sent_format": "⬆ {value:>8}" } }, - "powerline.segments.common.now_playing": { + "powerline.segments.common.net.hostname": { + "before": "⌂ " + }, + "powerline.segments.common.players.now_playing": { "args": { "state_symbols": { "fallback": "♫", @@ -44,25 +47,22 @@ } } }, - "powerline.segments.common.battery": { + "powerline.segments.common.bat.battery": { "args": { "full_heart": "♥", "empty_heart": "♥" } }, - "powerline.segments.common.uptime": { + "powerline.segments.common.sys.uptime": { "before": "⇑ " }, - "powerline.segments.common.email_imap_alert": { + "powerline.segments.common.mail.email_imap_alert": { "before": "✉ " }, - "powerline.segments.common.virtualenv": { + "powerline.segments.common.env.virtualenv": { "before": "ⓔ " }, - "powerline.segments.common.hostname": { - "before": "⌂ " - }, - "powerline.segments.common.weather": { + "powerline.segments.common.wthr.weather": { "args": { "icons": { "day": "〇", @@ -80,7 +80,7 @@ } } }, - "powerline.segments.common.fuzzy_time": { + "powerline.segments.common.time.fuzzy_time": { "args": { "unicode_text": true } diff --git a/powerline/config_files/themes/unicode_terminus.json b/powerline/config_files/themes/unicode_terminus.json index e435ea93..33a06395 100644 --- a/powerline/config_files/themes/unicode_terminus.json +++ b/powerline/config_files/themes/unicode_terminus.json @@ -28,13 +28,16 @@ "before": "" }, - "powerline.segments.common.network_load": { + "powerline.segments.common.net.network_load": { "args": { "recv_format": "⇓ {value:>8}", "sent_format": "⇑ {value:>8}" } }, - "powerline.segments.common.now_playing": { + "powerline.segments.common.net.hostname": { + "before": "⌂ " + }, + "powerline.segments.common.players.now_playing": { "args": { "state_symbols": { "fallback": "♫", @@ -44,25 +47,22 @@ } } }, - "powerline.segments.common.battery": { + "powerline.segments.common.bat.battery": { "args": { "full_heart": "♥", "empty_heart": "♥" } }, - "powerline.segments.common.uptime": { + "powerline.segments.common.sys.uptime": { "before": "↑ " }, - "powerline.segments.common.email_imap_alert": { + "powerline.segments.common.mail.email_imap_alert": { "before": "MAIL " }, - "powerline.segments.common.virtualenv": { + "powerline.segments.common.env.virtualenv": { "before": "(e) " }, - "powerline.segments.common.hostname": { - "before": "⌂ " - }, - "powerline.segments.common.weather": { + "powerline.segments.common.wthr.weather": { "args": { "icons": { "day": "DAY", @@ -80,7 +80,7 @@ } } }, - "powerline.segments.common.fuzzy_time": { + "powerline.segments.common.time.fuzzy_time": { "args": { "unicode_text": true } diff --git a/powerline/config_files/themes/unicode_terminus_condensed.json b/powerline/config_files/themes/unicode_terminus_condensed.json index c4266ee6..a386c7cd 100644 --- a/powerline/config_files/themes/unicode_terminus_condensed.json +++ b/powerline/config_files/themes/unicode_terminus_condensed.json @@ -29,13 +29,16 @@ "before": "" }, - "powerline.segments.common.network_load": { + "powerline.segments.common.net.network_load": { "args": { "recv_format": "⇓{value:>8}", "sent_format": "⇑{value:>8}" } }, - "powerline.segments.common.now_playing": { + "powerline.segments.common.net.hostname": { + "before": "⌂" + }, + "powerline.segments.common.players.now_playing": { "args": { "state_symbols": { "fallback": "♫", @@ -45,25 +48,22 @@ } } }, - "powerline.segments.common.battery": { + "powerline.segments.common.bat.battery": { "args": { "full_heart": "♥", "empty_heart": "♥" } }, - "powerline.segments.common.uptime": { + "powerline.segments.common.sys.uptime": { "before": "↑" }, - "powerline.segments.common.email_imap_alert": { + "powerline.segments.common.mail.email_imap_alert": { "before": "M " }, - "powerline.segments.common.virtualenv": { + "powerline.segments.common.env.virtualenv": { "before": "E " }, - "powerline.segments.common.hostname": { - "before": "⌂" - }, - "powerline.segments.common.weather": { + "powerline.segments.common.wthr.weather": { "args": { "icons": { "day": "D", @@ -81,7 +81,7 @@ } } }, - "powerline.segments.common.fuzzy_time": { + "powerline.segments.common.time.fuzzy_time": { "args": { "unicode_text": true } diff --git a/powerline/config_files/themes/wm/default.json b/powerline/config_files/themes/wm/default.json index 009c4924..579080d1 100644 --- a/powerline/config_files/themes/wm/default.json +++ b/powerline/config_files/themes/wm/default.json @@ -1,16 +1,15 @@ { - "default_module": "powerline.segments.common", "segments": { "right": [ { - "function": "weather", + "function": "powerline.segments.common.wthr.weather", "priority": 50 }, { - "function": "date" + "function": "powerline.segments.common.time.date" }, { - "function": "date", + "function": "powerline.segments.common.time.date", "name": "time", "args": { "format": "%H:%M", @@ -18,7 +17,7 @@ } }, { - "function": "email_imap_alert", + "function": "powerline.segments.common.mail.email_imap_alert", "priority": 10, "args": { "username": "", diff --git a/powerline/lint/imp.py b/powerline/lint/imp.py index 6e402132..98b7591b 100644 --- a/powerline/lint/imp.py +++ b/powerline/lint/imp.py @@ -21,6 +21,12 @@ class WithPath(object): def import_function(function_type, name, data, context, echoerr, module): havemarks(name, module) + if module == 'powerline.segments.common': + echoerr(context='Warning while checking segments (key {key})'.format(key=context.key), + context_mark=name.mark, + problem='module {0} is deprecated'.format(module), + problem_mark=module.mark) + with WithPath(data['import_paths']): try: func = getattr(__import__(str(module), fromlist=[str(name)]), str(name)) From 080e3e54a3d48cff6569a097206b0d8c7b563ff8 Mon Sep 17 00:00:00 2001 From: ZyX Date: Sun, 21 Sep 2014 02:12:36 +0400 Subject: [PATCH 4/4] Add deprecation warning to powerline.segments.common --- powerline/segments/common/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/powerline/segments/common/__init__.py b/powerline/segments/common/__init__.py index 3a67901e..f5ec2fa9 100644 --- a/powerline/segments/common/__init__.py +++ b/powerline/segments/common/__init__.py @@ -2,6 +2,10 @@ from __future__ import (unicode_literals, division, absolute_import, print_function) +# DEPRECATED MODULE. Do not add any segments below. Do not remove existing +# segments as well until next major release. + + from powerline.segments.common.vcs import branch # NOQA from powerline.segments.common.sys import cpu_load_percent # NOQA from powerline.segments.common.sys import uptime # NOQA