Some fixes for zsh, remove some exceptions handling

Those exceptions are now handled and logged at top level, thus no need to handle
them in segment.

Ref #330, it is now fixed for zsh/zpython

Note: do not use zsh/zpython + python3, it does not work properly, even though
this changeset contains fixes for it as well

Fixes #360
This commit is contained in:
ZyX 2013-03-23 18:55:43 +04:00
parent ed435f8063
commit 8a51d99389
5 changed files with 45 additions and 45 deletions

View File

@ -45,24 +45,31 @@ class Args(object):
return None
def string(s):
if type(s) is bytes:
return s.decode('utf-8', errors='replace')
else:
return str(s)
class Environment(object):
@staticmethod
def __getitem__(key):
try:
return zsh.getvalue(key)
return string(zsh.getvalue(key))
except IndexError as e:
raise KeyError(*e.args)
@staticmethod
def get(key, default=None):
try:
return zsh.getvalue(key)
return string(zsh.getvalue(key))
except IndexError:
return default
class Prompt(object):
__slots__ = ('render', 'side', 'savedpsvar', 'savedps')
__slots__ = ('render', 'side', 'savedpsvar', 'savedps', 'args')
def __init__(self, powerline, side, savedpsvar=None, savedps=None):
self.render = powerline.renderer.render
@ -72,7 +79,13 @@ class Prompt(object):
self.args = powerline.args
def __str__(self):
return self.render(width=zsh.columns(), side=self.side, segment_info=args).encode('utf-8')
r = self.render(width=zsh.columns(), side=self.side, segment_info=self.args)
if type(r) is not str:
if type(r) is bytes:
return r.decode('utf-8')
else:
return r.encode('utf-8')
return r
def __del__(self):
if self.savedps:
@ -88,6 +101,7 @@ def set_prompt(powerline, psvar, side):
def setup():
powerline = ShellPowerline(Args(), environ=Environment(), getcwd=lambda: zsh.getvalue('PWD'))
environ = Environment()
powerline = ShellPowerline(Args(), environ=environ, getcwd=lambda: environ['PWD'])
set_prompt(powerline, 'PS1', 'left')
set_prompt(powerline, 'RPS1', 'right')

View File

@ -140,7 +140,7 @@ class KwThreadedSegment(ThreadedSegment):
try:
updates[key] = (last_query_time, self.compute_state(key))
except Exception as e:
self.error('Exception while computing state for {0}: {1}', repr(key), str(e))
self.exception('Exception while computing state for {0}: {1}', repr(key), str(e))
else:
removes.append(key)
with self.write_lock:

View File

@ -366,14 +366,10 @@ class WeatherSegment(ThreadedSegment):
# Do not lock attribute assignments in this branch: they are used
# only in .update()
if not self.location:
try:
location_data = json.loads(urllib_read('http://freegeoip.net/json/' + _external_ip()))
self.location = ','.join([location_data['city'],
location_data['region_name'],
location_data['country_name']])
except (TypeError, ValueError) as e:
self.error('Failed to get location: {0}', str(e))
return
location_data = json.loads(urllib_read('http://freegeoip.net/json/' + _external_ip()))
self.location = ','.join([location_data['city'],
location_data['region_name'],
location_data['country_name']])
query_data = {
'q':
'use "http://github.com/yql/yql-tables/raw/master/weather/weather.bylocation.xml" as we;'
@ -382,15 +378,14 @@ class WeatherSegment(ThreadedSegment):
}
self.url = 'http://query.yahooapis.com/v1/public/yql?' + urllib_urlencode(query_data)
try:
raw_response = urllib_read(self.url)
response = json.loads(raw_response)
condition = response['query']['results']['weather']['rss']['channel']['item']['condition']
condition_code = int(condition['code'])
temp = float(condition['temp'])
except (KeyError, TypeError, ValueError) as e:
self.error('Failed to get weather conditions: {0}', str(e))
raw_response = urllib_read(self.url)
if not raw_response:
self.error('Failed to get response')
return
response = json.loads(raw_response)
condition = response['query']['results']['weather']['rss']['channel']['item']['condition']
condition_code = int(condition['code'])
temp = float(condition['temp'])
try:
icon_names = weather_conditions_codes[condition_code]
@ -560,14 +555,11 @@ try:
return '{0}%'.format(cpu_percent)
except ImportError:
def _get_bytes(interface): # NOQA
try:
with open('/sys/class/net/{interface}/statistics/rx_bytes'.format(interface=interface), 'rb') as file_obj:
rx = int(file_obj.read())
with open('/sys/class/net/{interface}/statistics/tx_bytes'.format(interface=interface), 'rb') as file_obj:
tx = int(file_obj.read())
return (rx, tx)
except IOError:
return None
with open('/sys/class/net/{interface}/statistics/rx_bytes'.format(interface=interface), 'rb') as file_obj:
rx = int(file_obj.read())
with open('/sys/class/net/{interface}/statistics/tx_bytes'.format(interface=interface), 'rb') as file_obj:
tx = int(file_obj.read())
return (rx, tx)
def _get_interfaces():
for interface in os.listdir('/sys/class/net'):
@ -591,6 +583,8 @@ except ImportError:
username = False
# os.geteuid is not available on windows
_geteuid = getattr(os, 'geteuid', lambda: 1)
def user(pl):
@ -606,11 +600,7 @@ def user(pl):
if username is None:
pl.warn('Failed to get username')
return None
try:
euid = os.geteuid()
except AttributeError:
# os.geteuid is not available on windows
euid = 1
euid = _geteuid()
return [{
'contents': username,
'highlight_group': 'user' if euid != 0 else ['superuser', 'user'],
@ -644,9 +634,6 @@ def uptime(pl, format='{days}d {hours:02d}h {minutes:02d}m'):
'''
try:
seconds = _get_uptime()
except IOError as e:
pl.error('Failed to get uptime: {0}', e)
return None
except NotImplementedError:
pl.warn('Unable to get uptime. You should install psutil package')
return None

View File

@ -14,10 +14,10 @@ def mergeargs(argvalue):
class ShellPowerline(Powerline):
def __init__(self, args, run_once=False):
def __init__(self, args, run_once=False, **kwargs):
self.args = args
self.theme_option = mergeargs(args.theme_option) or {}
super(ShellPowerline, self).__init__(args.ext[0], args.renderer_module, run_once=run_once)
super(ShellPowerline, self).__init__(args.ext[0], args.renderer_module, run_once=run_once, **kwargs)
def load_main_config(self):
r = super(ShellPowerline, self).load_main_config()

View File

@ -44,11 +44,10 @@ class TestCommon(TestCase):
with replace_env('USER', 'def') as pl:
with replace_attr(common, 'os', new_os):
with replace_attr(common, 'psutil', new_psutil):
self.assertEqual(common.user(pl=pl), [{'contents': 'def', 'highlight_group': 'user'}])
new_os.geteuid = lambda: 1
self.assertEqual(common.user(pl=pl), [{'contents': 'def', 'highlight_group': 'user'}])
new_os.geteuid = lambda: 0
self.assertEqual(common.user(pl=pl), [{'contents': 'def', 'highlight_group': ['superuser', 'user']}])
with replace_attr(common, '_geteuid', lambda: 5):
self.assertEqual(common.user(pl=pl), [{'contents': 'def', 'highlight_group': 'user'}])
with replace_attr(common, '_geteuid', lambda: 0):
self.assertEqual(common.user(pl=pl), [{'contents': 'def', 'highlight_group': ['superuser', 'user']}])
def test_branch(self):
pl = Pl()