diff --git a/tests/install.sh b/tests/install.sh index f1910fc4..de74e21f 100755 --- a/tests/install.sh +++ b/tests/install.sh @@ -5,7 +5,7 @@ git clone --depth=1 git://github.com/powerline/deps tests/bot-ci/deps . tests/bot-ci/scripts/common/main.sh sudo apt-get install -qq libssl1.0.0 -sudo apt-get install -qq screen zsh tcsh mksh busybox socat realpath bc rc +sudo apt-get install -qq screen zsh tcsh mksh busybox socat realpath bc rc tmux if test -n "$USE_UCS2_PYTHON" ; then pip install virtualenvwrapper diff --git a/tests/lib/terminal.py b/tests/lib/terminal.py new file mode 100644 index 00000000..f026ec57 --- /dev/null +++ b/tests/lib/terminal.py @@ -0,0 +1,59 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import threading + +from time import sleep + +import pexpect + +from tests.lib.vterm import VTerm + + +class ExpectProcess(threading.Thread): + def __init__(self, lib, rows, cols, cmd, args, cwd=None, env=None): + super(ExpectProcess, self).__init__() + self.vterm = VTerm(lib, rows, cols) + self.lock = threading.Lock() + self.rows = rows + self.cols = cols + self.cmd = cmd + self.args = args + self.cwd = cwd + self.env = env + self.buffer = [] + self.child_lock = threading.Lock() + + def run(self): + child = pexpect.spawn(self.cmd, self.args, cwd=self.cwd, env=self.env) + sleep(0.5) + child.setwinsize(self.rows, self.cols) + self.child = child + status = None + while status is None: + try: + with self.child_lock: + s = child.read_nonblocking(size=1024, timeout=0) + status = child.status + except pexpect.TIMEOUT: + pass + except pexpect.EOF: + break + else: + with self.lock: + self.vterm.push(s) + self.buffer.append(s) + + def __getitem__(self, position): + with self.lock: + return self.vterm.vtscreen[position] + + def read(self): + with self.lock: + ret = b''.join(self.buffer) + del self.buffer[:] + return ret + + def send(self, data): + with self.child_lock: + self.child.send(data) diff --git a/tests/lib/vterm.py b/tests/lib/vterm.py new file mode 100644 index 00000000..c83235ec --- /dev/null +++ b/tests/lib/vterm.py @@ -0,0 +1,178 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import ctypes + +from powerline.lib.unicode import unicode, unichr, tointiter + + +class CTypesFunction(object): + def __init__(self, library, name, rettype, args): + self.name = name + self.prototype = ctypes.CFUNCTYPE(rettype, *[ + arg[1] for arg in args + ]) + self.args = args + self.func = self.prototype((name, library), tuple(( + (1, arg[0]) for arg in args + ))) + + def __call__(self, *args, **kwargs): + return self.func(*args, **kwargs) + + def __repr__(self): + return '{cls}(, {name!r}, {rettype!r}, {args!r})'.format( + cls=self.__class__.__name__, + **self.__dict__ + ) + + +class CTypesLibraryFuncsCollection(object): + def __init__(self, lib, **kwargs): + self.lib = lib + library_loader = ctypes.LibraryLoader(ctypes.CDLL) + library = library_loader.LoadLibrary(lib) + self.library = library + for name, args in kwargs.items(): + self.__dict__[name] = CTypesFunction(library, name, *args) + + +class VTermPos_s(ctypes.Structure): + _fields_ = ( + ('row', ctypes.c_int), + ('col', ctypes.c_int), + ) + + +class VTermColor_s(ctypes.Structure): + _fields_ = ( + ('red', ctypes.c_uint8), + ('green', ctypes.c_uint8), + ('blue', ctypes.c_uint8), + ) + + +class VTermScreenCellAttrs_s(ctypes.Structure): + _fields_ = ( + ('bold', ctypes.c_uint, 1), + ('underline', ctypes.c_uint, 2), + ('italic', ctypes.c_uint, 1), + ('blink', ctypes.c_uint, 1), + ('reverse', ctypes.c_uint, 1), + ('strike', ctypes.c_uint, 1), + ('font', ctypes.c_uint, 4), + ('dwl', ctypes.c_uint, 1), + ('dhl', ctypes.c_uint, 2), + ) + + +VTERM_MAX_CHARS_PER_CELL = 6 + + +class VTermScreenCell_s(ctypes.Structure): + _fields_ = ( + ('chars', ctypes.ARRAY(ctypes.c_uint32, VTERM_MAX_CHARS_PER_CELL)), + ('width', ctypes.c_char), + ('attrs', VTermScreenCellAttrs_s), + ('fg', VTermColor_s), + ('bg', VTermColor_s), + ) + + +VTerm_p = ctypes.c_void_p +VTermScreen_p = ctypes.c_void_p + + +def get_functions(lib): + return CTypesLibraryFuncsCollection( + lib, + vterm_new=(VTerm_p, ( + ('rows', ctypes.c_int), + ('cols', ctypes.c_int) + )), + vterm_obtain_screen=(VTermScreen_p, (('vt', VTerm_p),)), + vterm_screen_reset=(None, ( + ('screen', VTermScreen_p), + ('hard', ctypes.c_int) + )), + vterm_input_write=(ctypes.c_size_t, ( + ('vt', VTerm_p), + ('bytes', ctypes.POINTER(ctypes.c_char)), + ('size', ctypes.c_size_t), + )), + vterm_screen_get_cell=(ctypes.c_int, ( + ('screen', VTermScreen_p), + ('pos', VTermPos_s), + ('cell', ctypes.POINTER(VTermScreenCell_s)) + )), + vterm_free=(None, (('vt', VTerm_p),)), + ) + + +class VTermColor(object): + __slots__ = ('red', 'green', 'blue') + + def __init__(self, color): + self.red = color.red + self.green = color.green + self.blue = color.blue + + @property + def color_key(self): + return (self.red, self.green, self.blue) + + +class VTermScreenCell(object): + def __init__(self, vtsc): + for field in VTermScreenCellAttrs_s._fields_: + field_name = field[0] + setattr(self, field_name, getattr(vtsc.attrs, field_name)) + self.text = ''.join(( + unichr(vtsc.chars[i]) for i in range(VTERM_MAX_CHARS_PER_CELL) + )).rstrip('\x00') + self.width = next(tointiter(vtsc.width)) + self.fg = VTermColor(vtsc.fg) + self.bg = VTermColor(vtsc.bg) + self.cell_properties_key = ( + self.fg.color_key, + self.bg.color_key, + self.bold, + self.underline, + self.italic, + ) + + +class VTermScreen(object): + def __init__(self, functions, screen): + self.functions = functions + self.screen = screen + + def __getitem__(self, position): + pos = VTermPos_s(*position) + cell = VTermScreenCell_s() + ret = self.functions.vterm_screen_get_cell(self.screen, pos, cell) + if ret != 1: + raise ValueError('vterm_screen_get_cell returned {0}'.format(ret)) + return VTermScreenCell(cell) + + def reset(self, hard): + self.functions.vterm_screen_reset(self.screen, int(bool(hard))) + + +class VTerm(object): + def __init__(self, lib, rows, cols): + self.functions = get_functions(lib) + self.vt = self.functions.vterm_new(rows, cols) + self.vtscreen = VTermScreen(self.functions, self.functions.vterm_obtain_screen(self.vt)) + self.vtscreen.reset(True) + + def push(self, data): + if isinstance(data, unicode): + data = data.encode('utf-8') + return self.functions.vterm_input_write(self.vt, data, len(data)) + + def __del__(self): + try: + self.functions.vterm_free(self.vt) + except AttributeError: + pass diff --git a/tests/run_vterm_tests.sh b/tests/run_vterm_tests.sh new file mode 100755 index 00000000..312b817d --- /dev/null +++ b/tests/run_vterm_tests.sh @@ -0,0 +1,8 @@ +#!/bin/sh +set -e +FAILED=0 +if ! sh tests/test_in_vterm/test.sh ; then + echo "Failed vterm" + FAILED=1 +fi +exit $FAILED diff --git a/tests/test_in_vterm/test.sh b/tests/test_in_vterm/test.sh new file mode 100755 index 00000000..156493f7 --- /dev/null +++ b/tests/test_in_vterm/test.sh @@ -0,0 +1,53 @@ +#!/bin/sh +: ${PYTHON:=python} +: ${POWERLINE_TMUX_EXE:=tmux} + +set -e + +# HACK: get newline for use in strings given that "\n" and $'' do not work. +NL="$(printf '\nE')" +NL="${NL%E}" + +FAILED=0 + +rm -rf tests/vterm +mkdir tests/vterm +mkdir tests/vterm/path + +ln -s "$(which "${PYTHON}")" tests/vterm/path/python +ln -s "$(which bash)" tests/vterm/path +ln -s "$(which env)" tests/vterm/path +ln -s "$PWD/scripts/powerline-render" tests/vterm/path +ln -s "$PWD/scripts/powerline-config" tests/vterm/path + +FAIL_SUMMARY="" + +test_tmux() { + if ! which "${POWERLINE_TMUX_EXE}" ; then + return 0 + fi + ln -s "$(which "${POWERLINE_TMUX_EXE}")" tests/vterm/path + if ! "${PYTHON}" tests/test_in_vterm/test_tmux.py; then + echo "Failed vterm test $f" + FAILED=1 + FAIL_SUMMARY="$FAIL_SUMMARY${NL}F $f" + for file in tests/vterm/*.log ; do + if ! test -e "$file" ; then + break + fi + echo '____________________________________________________________' + echo "$file:" + echo '============================================================' + cat -v $file + done + fi +} + +test_tmux || true + +if test $FAILED -eq 0 ; then + echo "$FAIL_SUMMARY" + rm -rf tests/vterm +fi + +exit $FAILED diff --git a/tests/test_in_vterm/test_tmux.py b/tests/test_in_vterm/test_tmux.py new file mode 100755 index 00000000..e3e9fc26 --- /dev/null +++ b/tests/test_in_vterm/test_tmux.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import os + +from time import sleep +from subprocess import check_call +from itertools import groupby +from difflib import ndiff + +from powerline.lib.unicode import u + +from tests.lib.terminal import ExpectProcess + + +def cell_properties_key_to_shell_escape(cell_properties_key): + fg, bg, bold, underline, italic = cell_properties_key + return('\x1b[38;2;{0};48;2;{1}{bold}{underline}{italic}m'.format( + ';'.join((str(i) for i in fg)), + ';'.join((str(i) for i in bg)), + bold=(';1' if bold else ''), + underline=(';4' if underline else ''), + italic=(';3' if italic else ''), + )) + + +def main(): + VTERM_TEST_DIR = os.path.abspath('tests/vterm') + vterm_path = os.path.join(VTERM_TEST_DIR, 'path') + socket_path = os.path.join(VTERM_TEST_DIR, 'tmux-socket') + rows = 50 + cols = 200 + + try: + p = ExpectProcess( + lib='tests/bot-ci/deps/libvterm/libvterm.so', + rows=rows, + cols=cols, + cmd='tmux', + args=[ + # Specify full path to tmux socket (testing tmux instance must + # not interfere with user one) + '-S', socket_path, + # Force 256-color mode + '-2', + # Request verbose logging just in case + '-v', + # Specify configuration file + '-f', os.path.abspath('powerline/bindings/tmux/powerline.conf'), + # Run bash three times + 'new-session', 'bash --norc --noprofile -i', ';', + 'new-window', 'bash --norc --noprofile -i', ';', + 'new-window', 'bash --norc --noprofile -i', ';', + ], + cwd=VTERM_TEST_DIR, + env={ + 'TERM': 'vt100', + 'PATH': vterm_path, + 'SHELL': os.path.join(''), + 'POWERLINE_CONFIG_PATHS': os.path.abspath('powerline/config_files'), + 'POWERLINE_COMMAND': 'powerline-render', + 'POWERLINE_THEME_OVERRIDES': ( + 'default.segments.right=[{"type":"string","name":"s1","highlight_groups":["cwd"]}];' + 'default.segments.left=[{"type":"string","name":"s2","highlight_groups":["background"]}];' + 'default.segment_data.s1.contents=S1 string here;' + 'default.segment_data.s2.contents=S2 string here;' + ), + }, + ) + p.start() + sleep(1) + last_line = [] + for col in range(cols): + last_line.append(p[rows - 1, col]) + result = tuple(( + (key, ''.join((i.text for i in subline))) + for key, subline in groupby(last_line, lambda i: i.cell_properties_key) + )) + expected_result = ( + (((0, 0, 0), (243, 243, 243), 1, 0, 0), ' 0 '), + (((243, 243, 243), (11, 11, 11), 0, 0, 0), ' '), + (((255, 255, 255), (11, 11, 11), 0, 0, 0), ' S2 string here '), + (((133, 133, 133), (11, 11, 11), 0, 0, 0), ' 0 '), + (((88, 88, 88), (11, 11, 11), 0, 0, 0), '| '), + (((188, 188, 188), (11, 11, 11), 0, 0, 0), 'bash '), + (((255, 255, 255), (11, 11, 11), 0, 0, 0), ' '), + (((133, 133, 133), (11, 11, 11), 0, 0, 0), ' 1 '), + (((88, 88, 88), (11, 11, 11), 0, 0, 0), '| '), + (((0, 102, 153), (11, 11, 11), 0, 0, 0), 'bash '), + (((255, 255, 255), (11, 11, 11), 0, 0, 0), ' '), + (((11, 11, 11), (0, 102, 153), 0, 0, 0), ' '), + (((102, 204, 255), (0, 102, 153), 0, 0, 0), '2 | '), + (((255, 255, 255), (0, 102, 153), 1, 0, 0), 'bash '), + (((0, 102, 153), (11, 11, 11), 0, 0, 0), ' '), + (((255, 255, 255), (11, 11, 11), 0, 0, 0), ' '), + (((88, 88, 88), (11, 11, 11), 0, 0, 0), ' '), + (((199, 199, 199), (88, 88, 88), 0, 0, 0), ' S1 string here '), + ) + print('Result:') + shesc_result = ''.join(( + '{0}{1}\x1b[m'.format(cell_properties_key_to_shell_escape(key), text) + for key, text in result + )) + print(shesc_result) + print('Expected:') + shesc_expected_result = ''.join(( + '{0}{1}\x1b[m'.format(cell_properties_key_to_shell_escape(key), text) + for key, text in expected_result + )) + print(shesc_expected_result) + print('Screen:') + screen = [] + for i in range(rows): + screen.append([]) + for j in range(cols): + screen[-1].append(p[i, j]) + print('\n'.join( + ''.join(( + '{0}{1}\x1b[m'.format( + cell_properties_key_to_shell_escape(i.cell_properties_key), + i.text + ) for i in line + )) + for line in screen + )) + if result == expected_result: + return True + else: + a = shesc_result.replace('\x1b', '\\e') + '\n' + b = shesc_expected_result.replace('\x1b', '\\e') + '\n' + print('_' * 80) + print('Diff:') + print('=' * 80) + print(''.join((u(line) for line in ndiff([a], [b])))) + return False + finally: + check_call(['tmux', '-S', socket_path, 'kill-server'], env={ + 'PATH': vterm_path, + }) + + +if __name__ == '__main__': + if main(): + raise SystemExit(0) + else: + raise SystemExit(1)