Make Popen.communicate receive only bytes

Fixes #1547
Closes #1548
This commit is contained in:
Foo 2016-03-19 23:07:01 +03:00
parent 8e5bc2538b
commit e9b70b9edf
3 changed files with 28 additions and 3 deletions

View File

@ -7,7 +7,7 @@ import os
from subprocess import Popen, PIPE
from functools import partial
from powerline.lib.encoding import get_preferred_input_encoding
from powerline.lib.encoding import get_preferred_input_encoding, get_preferred_output_encoding
if sys.platform.startswith('win32'):
@ -36,7 +36,8 @@ def run_cmd(pl, cmd, stdin=None, strip=True):
pl.exception('Could not execute command ({0}): {1}', e, cmd)
return None
else:
stdout, err = p.communicate(stdin)
stdout, err = p.communicate(
stdin if stdin is None else stdin.encode(get_preferred_output_encoding()))
stdout = stdout.decode(get_preferred_input_encoding())
return stdout.strip() if strip else stdout

View File

@ -16,11 +16,16 @@ class Pl(object):
self.use_daemon_threads = True
for meth in ('error', 'warn', 'debug', 'exception', 'info'):
exec ((
exec((
'def {0}(self, msg, *args, **kwargs):\n'
' self.{0}s.append((kwargs.get("prefix") or self.prefix, msg, args, kwargs))\n'
).format(meth))
def __nonzero__(self):
return bool(self.exceptions or self.errors or self.warns)
__bool__ = __nonzero__
class Args(object):
theme_override = {}

View File

@ -17,6 +17,7 @@ from powerline.lib.vcs import guess, get_fallback_create_watcher
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment
from powerline.lib.monotonic import monotonic
from powerline.lib.vcs.git import git_directory
from powerline.lib.shell import run_cmd
import powerline.lib.unicode as plu
@ -48,6 +49,24 @@ def thread_number():
return len(threading.enumerate())
class TestShell(TestCase):
def test_run_cmd(self):
pl = Pl()
self.assertEqual(run_cmd(pl, ['xxx_nonexistent_command_xxx']), None)
self.assertEqual(len(pl.exceptions), 1)
pl = Pl()
self.assertEqual(run_cmd(pl, ['echo', ' test ']), 'test')
self.assertFalse(pl)
self.assertEqual(run_cmd(pl, ['echo', ' test '], strip=True), 'test')
self.assertFalse(pl)
self.assertEqual(run_cmd(pl, ['echo', ' test '], strip=False), ' test \n')
self.assertFalse(pl)
self.assertEqual(run_cmd(pl, ['cat'], stdin='test'), 'test')
self.assertFalse(pl)
self.assertEqual(run_cmd(pl, ['sh', '-c', 'cat >&2'], stdin='test'), '')
self.assertFalse(pl)
class TestThreaded(TestCase):
def test_threaded_segment(self):
log = []