Use psutil module for `user` segment

This commit is contained in:
ZyX 2013-03-17 13:00:01 +04:00
parent 93e41bf2c4
commit 29eccf409b
4 changed files with 115 additions and 60 deletions

View File

@ -22,25 +22,6 @@ def hostname(only_if_ssh=False):
return socket.gethostname()
def user():
'''Return the current user.
Highlights the user with the ``superuser`` if the effective user ID is 0.
Highlight groups used: ``superuser`` or ``user``. It is recommended to define all highlight groups.
'''
user = os.environ.get('USER')
try:
euid = os.geteuid()
except AttributeError:
# os.geteuid is not available on windows
euid = 1
return [{
'contents': user,
'highlight_group': 'user' if euid != 0 else ['superuser', 'user'],
}]
def branch(status_colors=True):
'''Return the current VCS branch.@
@ -396,20 +377,78 @@ def system_load(format='{avg:.1f}', threshold_good=1, threshold_bad=2):
return ret
def cpu_load_percent(measure_interval=.5):
'''Return the average CPU load as a percentage.
try:
import psutil
Requires the ``psutil`` module.
def _get_bytes(interface):
io_counters = psutil.network_io_counters(pernic=True)
if_io = io_counters.get(interface)
if not if_io:
return None
return if_io.bytes_recv, if_io.bytes_sent
:param float measure_interval:
interval used to measure CPU load (in seconds)
'''
try:
import psutil
except ImportError:
def _get_user():
return psutil.Process(os.getpid()).username
def cpu_load_percent(measure_interval=.5):
'''Return the average CPU load as a percentage.
Requires the ``psutil`` module.
:param float measure_interval:
interval used to measure CPU load (in seconds)
'''
cpu_percent = int(psutil.cpu_percent(interval=measure_interval))
return '{0}%'.format(cpu_percent)
except ImportError:
def _get_bytes(interface): # NOQA
try:
with open('/sys/class/net/{interface}/statistics/rx_bytes'.format(interface=interface), 'rb') as file_obj:
rx = int(file_obj.read())
with open('/sys/class/net/{interface}/statistics/tx_bytes'.format(interface=interface), 'rb') as file_obj:
tx = int(file_obj.read())
return (rx, tx)
except IOError:
return None
def _get_user():
return os.environ.get('USER', None)
def cpu_load_percent(**kwargs): # NOQA
'''Return the average CPU load as a percentage.
Requires the ``psutil`` module.
:param float measure_interval:
interval used to measure CPU load (in seconds)
'''
return None
cpu_percent = int(psutil.cpu_percent(interval=measure_interval))
return '{0}%'.format(cpu_percent)
username = False
def user():
'''Return the current user.
Highlights the user with the ``superuser`` if the effective user ID is 0.
Highlight groups used: ``superuser`` or ``user``. It is recommended to define all highlight groups.
'''
global username
if username is False:
username = _get_user()
if username is None:
return None
try:
euid = os.geteuid()
except AttributeError:
# os.geteuid is not available on windows
euid = 1
return [{
'contents': username,
'highlight_group': 'user' if euid != 0 else ['superuser', 'user'],
}]
if os.path.exists('/proc/uptime'):
@ -438,7 +477,7 @@ def uptime(format='{days:02d}d {hours:02d}h {minutes:02d}m'):
'''
try:
seconds = _get_uptime()
except IOError, NotImplementedError:
except (IOError, NotImplementedError):
return None
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
@ -466,33 +505,47 @@ def network_load(interface='eth0', measure_interval=1, suffix='B/s', si_prefix=F
import time
from powerline.lib import humanize_bytes
def get_bytes():
try:
import psutil
io_counters = psutil.network_io_counters(pernic=True)
if_io = io_counters.get(interface)
if not if_io:
return None
return (if_io.bytes_recv, if_io.bytes_sent)
except ImportError:
try:
with open('/sys/class/net/{interface}/statistics/rx_bytes'.format(interface=interface), 'rb') as file_obj:
rx = int(file_obj.read())
with open('/sys/class/net/{interface}/statistics/tx_bytes'.format(interface=interface), 'rb') as file_obj:
tx = int(file_obj.read())
return (rx, tx)
except IOError:
return None
interfaces = {}
b1 = get_bytes()
if b1 is None:
return None
time.sleep(measure_interval)
b2 = get_bytes()
return '{rx_diff}{tx_diff}'.format(
rx_diff=humanize_bytes((b2[0] - b1[0]) / measure_interval, suffix, si_prefix).rjust(8),
tx_diff=humanize_bytes((b2[1] - b1[1]) / measure_interval, suffix, si_prefix).rjust(8),
)
@staticmethod
def key(interface='eth0', **kwargs):
return interface
def compute_state(self, interface):
if interface in self.interfaces:
idata = self.interfaces[interface]
idata['prev'] = idata['last']
else:
idata = {}
if self.run_once:
idata['prev'] = (monotonic(), _get_bytes(interface))
self.sleep(0)
self.interfaces[interface] = idata
idata['last'] = (monotonic(), _get_bytes(interface))
return idata
def render_one(self, idata, suffix='B/s', si_prefix=False, **kwargs):
if 'prev' not in idata:
return None
t1, b1 = idata['prev']
t2, b2 = idata['last']
measure_interval = t2 - t1
if None in (b1, b2):
return None
return [{
'contents': '{rx_diff}{tx_diff}'.format(
rx_diff=humanize_bytes((b2[0] - b1[0]) / measure_interval, suffix, si_prefix).rjust(8),
tx_diff=humanize_bytes((b2[1] - b1[1]) / measure_interval, suffix, si_prefix).rjust(8),
),
'divider_highlight_group': 'background:divider',
}]
network_load = NetworkLoadSegment()
def virtualenv():

View File

@ -1,5 +1,6 @@
#!/bin/sh
pip install .
pip install psutil
if python -c 'import sys; sys.exit(1 * (sys.version_info[0] != 2))' ; then
# Python 2
pip install mercurial bzr

View File

@ -6,7 +6,7 @@ if ${PYTHON} -c 'import sys; sys.exit(1 * (sys.version_info >= (2, 7)))' ; then
export PYTHONPATH="${PYTHONPATH}:`realpath .`"
for file in tests/test_*.py ; do
if ! ${PYTHON} $file ; then
exit 1
FAILED=1
fi
done
else

View File

@ -37,8 +37,9 @@ class TestCommon(TestCase):
self.assertEqual(common.hostname(only_if_ssh=True), None)
def test_user(self):
new_os = new_module('os', environ={'USER': 'def'})
with replace_module_attr(common, 'os', new_os):
new_os = new_module('os', environ={'USER': 'def'}, getpid=lambda: 1)
new_psutil = new_module('psutil', Process=lambda pid: Args(username='def'))
with replace_module_attr(common, 'os', new_os), replace_module_attr(common, 'psutil', new_psutil):
self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': 'user'}])
new_os.geteuid = lambda: 1
self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': 'user'}])