From 93acec238e8071593af3caf6227eac6093c131d7 Mon Sep 17 00:00:00 2001
From: ZyX <kp-pav@yandex.ru>
Date: Sun, 11 Jan 2015 03:09:14 +0300
Subject: [PATCH] Create vterm-based tests that will test tmux support

It is possible that they eventually will be used also for shells: at least this
makes using postproc.py with all its hacks not needed.
---
 tests/install.sh                 |   2 +-
 tests/lib/terminal.py            |  59 ++++++++++
 tests/lib/vterm.py               | 178 +++++++++++++++++++++++++++++++
 tests/run_vterm_tests.sh         |   8 ++
 tests/test_in_vterm/test.sh      |  53 +++++++++
 tests/test_in_vterm/test_tmux.py | 147 +++++++++++++++++++++++++
 6 files changed, 446 insertions(+), 1 deletion(-)
 create mode 100644 tests/lib/terminal.py
 create mode 100644 tests/lib/vterm.py
 create mode 100755 tests/run_vterm_tests.sh
 create mode 100755 tests/test_in_vterm/test.sh
 create mode 100755 tests/test_in_vterm/test_tmux.py

diff --git a/tests/install.sh b/tests/install.sh
index f1910fc4..de74e21f 100755
--- a/tests/install.sh
+++ b/tests/install.sh
@@ -5,7 +5,7 @@ git clone --depth=1 git://github.com/powerline/deps tests/bot-ci/deps
 . tests/bot-ci/scripts/common/main.sh
 
 sudo apt-get install -qq libssl1.0.0
-sudo apt-get install -qq screen zsh tcsh mksh busybox socat realpath bc rc
+sudo apt-get install -qq screen zsh tcsh mksh busybox socat realpath bc rc tmux
 
 if test -n "$USE_UCS2_PYTHON" ; then
 	pip install virtualenvwrapper
diff --git a/tests/lib/terminal.py b/tests/lib/terminal.py
new file mode 100644
index 00000000..f026ec57
--- /dev/null
+++ b/tests/lib/terminal.py
@@ -0,0 +1,59 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import threading
+
+from time import sleep
+
+import pexpect
+
+from tests.lib.vterm import VTerm
+
+
+class ExpectProcess(threading.Thread):
+	def __init__(self, lib, rows, cols, cmd, args, cwd=None, env=None):
+		super(ExpectProcess, self).__init__()
+		self.vterm = VTerm(lib, rows, cols)
+		self.lock = threading.Lock()
+		self.rows = rows
+		self.cols = cols
+		self.cmd = cmd
+		self.args = args
+		self.cwd = cwd
+		self.env = env
+		self.buffer = []
+		self.child_lock = threading.Lock()
+
+	def run(self):
+		child = pexpect.spawn(self.cmd, self.args, cwd=self.cwd, env=self.env)
+		sleep(0.5)
+		child.setwinsize(self.rows, self.cols)
+		self.child = child
+		status = None
+		while status is None:
+			try:
+				with self.child_lock:
+					s = child.read_nonblocking(size=1024, timeout=0)
+					status = child.status
+			except pexpect.TIMEOUT:
+				pass
+			except pexpect.EOF:
+				break
+			else:
+				with self.lock:
+					self.vterm.push(s)
+					self.buffer.append(s)
+
+	def __getitem__(self, position):
+		with self.lock:
+			return self.vterm.vtscreen[position]
+
+	def read(self):
+		with self.lock:
+			ret = b''.join(self.buffer)
+			del self.buffer[:]
+			return ret
+
+	def send(self, data):
+		with self.child_lock:
+			self.child.send(data)
diff --git a/tests/lib/vterm.py b/tests/lib/vterm.py
new file mode 100644
index 00000000..c83235ec
--- /dev/null
+++ b/tests/lib/vterm.py
@@ -0,0 +1,178 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import ctypes
+
+from powerline.lib.unicode import unicode, unichr, tointiter
+
+
+class CTypesFunction(object):
+	def __init__(self, library, name, rettype, args):
+		self.name = name
+		self.prototype = ctypes.CFUNCTYPE(rettype, *[
+			arg[1] for arg in args
+		])
+		self.args = args
+		self.func = self.prototype((name, library), tuple((
+			(1, arg[0]) for arg in args
+		)))
+
+	def __call__(self, *args, **kwargs):
+		return self.func(*args, **kwargs)
+
+	def __repr__(self):
+		return '{cls}(<library>, {name!r}, {rettype!r}, {args!r})'.format(
+			cls=self.__class__.__name__,
+			**self.__dict__
+		)
+
+
+class CTypesLibraryFuncsCollection(object):
+	def __init__(self, lib, **kwargs):
+		self.lib = lib
+		library_loader = ctypes.LibraryLoader(ctypes.CDLL)
+		library = library_loader.LoadLibrary(lib)
+		self.library = library
+		for name, args in kwargs.items():
+			self.__dict__[name] = CTypesFunction(library, name, *args)
+
+
+class VTermPos_s(ctypes.Structure):
+	_fields_ = (
+		('row', ctypes.c_int),
+		('col', ctypes.c_int),
+	)
+
+
+class VTermColor_s(ctypes.Structure):
+	_fields_ = (
+		('red', ctypes.c_uint8),
+		('green', ctypes.c_uint8),
+		('blue', ctypes.c_uint8),
+	)
+
+
+class VTermScreenCellAttrs_s(ctypes.Structure):
+	_fields_ = (
+		('bold', ctypes.c_uint, 1),
+		('underline', ctypes.c_uint, 2),
+		('italic', ctypes.c_uint, 1),
+		('blink', ctypes.c_uint, 1),
+		('reverse', ctypes.c_uint, 1),
+		('strike', ctypes.c_uint, 1),
+		('font', ctypes.c_uint, 4),
+		('dwl', ctypes.c_uint, 1),
+		('dhl', ctypes.c_uint, 2),
+	)
+
+
+VTERM_MAX_CHARS_PER_CELL = 6
+
+
+class VTermScreenCell_s(ctypes.Structure):
+	_fields_ = (
+		('chars', ctypes.ARRAY(ctypes.c_uint32, VTERM_MAX_CHARS_PER_CELL)),
+		('width', ctypes.c_char),
+		('attrs', VTermScreenCellAttrs_s),
+		('fg', VTermColor_s),
+		('bg', VTermColor_s),
+	)
+
+
+VTerm_p = ctypes.c_void_p
+VTermScreen_p = ctypes.c_void_p
+
+
+def get_functions(lib):
+	return CTypesLibraryFuncsCollection(
+		lib,
+		vterm_new=(VTerm_p, (
+			('rows', ctypes.c_int),
+			('cols', ctypes.c_int)
+		)),
+		vterm_obtain_screen=(VTermScreen_p, (('vt', VTerm_p),)),
+		vterm_screen_reset=(None, (
+			('screen', VTermScreen_p),
+			('hard', ctypes.c_int)
+		)),
+		vterm_input_write=(ctypes.c_size_t, (
+			('vt', VTerm_p),
+			('bytes', ctypes.POINTER(ctypes.c_char)),
+			('size', ctypes.c_size_t),
+		)),
+		vterm_screen_get_cell=(ctypes.c_int, (
+			('screen', VTermScreen_p),
+			('pos', VTermPos_s),
+			('cell', ctypes.POINTER(VTermScreenCell_s))
+		)),
+		vterm_free=(None, (('vt', VTerm_p),)),
+	)
+
+
+class VTermColor(object):
+	__slots__ = ('red', 'green', 'blue')
+
+	def __init__(self, color):
+		self.red = color.red
+		self.green = color.green
+		self.blue = color.blue
+
+	@property
+	def color_key(self):
+		return (self.red, self.green, self.blue)
+
+
+class VTermScreenCell(object):
+	def __init__(self, vtsc):
+		for field in VTermScreenCellAttrs_s._fields_:
+			field_name = field[0]
+			setattr(self, field_name, getattr(vtsc.attrs, field_name))
+		self.text = ''.join((
+			unichr(vtsc.chars[i]) for i in range(VTERM_MAX_CHARS_PER_CELL)
+		)).rstrip('\x00')
+		self.width = next(tointiter(vtsc.width))
+		self.fg = VTermColor(vtsc.fg)
+		self.bg = VTermColor(vtsc.bg)
+		self.cell_properties_key = (
+			self.fg.color_key,
+			self.bg.color_key,
+			self.bold,
+			self.underline,
+			self.italic,
+		)
+
+
+class VTermScreen(object):
+	def __init__(self, functions, screen):
+		self.functions = functions
+		self.screen = screen
+
+	def __getitem__(self, position):
+		pos = VTermPos_s(*position)
+		cell = VTermScreenCell_s()
+		ret = self.functions.vterm_screen_get_cell(self.screen, pos, cell)
+		if ret != 1:
+			raise ValueError('vterm_screen_get_cell returned {0}'.format(ret))
+		return VTermScreenCell(cell)
+
+	def reset(self, hard):
+		self.functions.vterm_screen_reset(self.screen, int(bool(hard)))
+
+
+class VTerm(object):
+	def __init__(self, lib, rows, cols):
+		self.functions = get_functions(lib)
+		self.vt = self.functions.vterm_new(rows, cols)
+		self.vtscreen = VTermScreen(self.functions, self.functions.vterm_obtain_screen(self.vt))
+		self.vtscreen.reset(True)
+
+	def push(self, data):
+		if isinstance(data, unicode):
+			data = data.encode('utf-8')
+		return self.functions.vterm_input_write(self.vt, data, len(data))
+
+	def __del__(self):
+		try:
+			self.functions.vterm_free(self.vt)
+		except AttributeError:
+			pass
diff --git a/tests/run_vterm_tests.sh b/tests/run_vterm_tests.sh
new file mode 100755
index 00000000..312b817d
--- /dev/null
+++ b/tests/run_vterm_tests.sh
@@ -0,0 +1,8 @@
+#!/bin/sh
+set -e
+FAILED=0
+if ! sh tests/test_in_vterm/test.sh ; then
+	echo "Failed vterm"
+	FAILED=1
+fi
+exit $FAILED
diff --git a/tests/test_in_vterm/test.sh b/tests/test_in_vterm/test.sh
new file mode 100755
index 00000000..156493f7
--- /dev/null
+++ b/tests/test_in_vterm/test.sh
@@ -0,0 +1,53 @@
+#!/bin/sh
+: ${PYTHON:=python}
+: ${POWERLINE_TMUX_EXE:=tmux}
+
+set -e
+
+# HACK: get newline for use in strings given that "\n" and $'' do not work.
+NL="$(printf '\nE')"
+NL="${NL%E}"
+
+FAILED=0
+
+rm -rf tests/vterm
+mkdir tests/vterm
+mkdir tests/vterm/path
+
+ln -s "$(which "${PYTHON}")" tests/vterm/path/python
+ln -s "$(which bash)" tests/vterm/path
+ln -s "$(which env)" tests/vterm/path
+ln -s "$PWD/scripts/powerline-render" tests/vterm/path
+ln -s "$PWD/scripts/powerline-config" tests/vterm/path
+
+FAIL_SUMMARY=""
+
+test_tmux() {
+	if ! which "${POWERLINE_TMUX_EXE}" ; then
+		return 0
+	fi
+	ln -s "$(which "${POWERLINE_TMUX_EXE}")" tests/vterm/path
+	if ! "${PYTHON}" tests/test_in_vterm/test_tmux.py; then
+		echo "Failed vterm test $f"
+		FAILED=1
+		FAIL_SUMMARY="$FAIL_SUMMARY${NL}F $f"
+		for file in tests/vterm/*.log ; do
+			if ! test -e "$file" ; then
+				break
+			fi
+			echo '____________________________________________________________'
+			echo "$file:"
+			echo '============================================================'
+			cat -v $file
+		done
+	fi
+}
+
+test_tmux || true
+
+if test $FAILED -eq 0 ; then
+	echo "$FAIL_SUMMARY"
+	rm -rf tests/vterm
+fi
+
+exit $FAILED
diff --git a/tests/test_in_vterm/test_tmux.py b/tests/test_in_vterm/test_tmux.py
new file mode 100755
index 00000000..e3e9fc26
--- /dev/null
+++ b/tests/test_in_vterm/test_tmux.py
@@ -0,0 +1,147 @@
+#!/usr/bin/env python
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import os
+
+from time import sleep
+from subprocess import check_call
+from itertools import groupby
+from difflib import ndiff
+
+from powerline.lib.unicode import u
+
+from tests.lib.terminal import ExpectProcess
+
+
+def cell_properties_key_to_shell_escape(cell_properties_key):
+	fg, bg, bold, underline, italic = cell_properties_key
+	return('\x1b[38;2;{0};48;2;{1}{bold}{underline}{italic}m'.format(
+		';'.join((str(i) for i in fg)),
+		';'.join((str(i) for i in bg)),
+		bold=(';1' if bold else ''),
+		underline=(';4' if underline else ''),
+		italic=(';3' if italic else ''),
+	))
+
+
+def main():
+	VTERM_TEST_DIR = os.path.abspath('tests/vterm')
+	vterm_path = os.path.join(VTERM_TEST_DIR, 'path')
+	socket_path = os.path.join(VTERM_TEST_DIR, 'tmux-socket')
+	rows = 50
+	cols = 200
+
+	try:
+		p = ExpectProcess(
+			lib='tests/bot-ci/deps/libvterm/libvterm.so',
+			rows=rows,
+			cols=cols,
+			cmd='tmux',
+			args=[
+				# Specify full path to tmux socket (testing tmux instance must 
+				# not interfere with user one)
+				'-S', socket_path,
+				# Force 256-color mode
+				'-2',
+				# Request verbose logging just in case
+				'-v',
+				# Specify configuration file
+				'-f', os.path.abspath('powerline/bindings/tmux/powerline.conf'),
+				# Run bash three times
+				'new-session', 'bash --norc --noprofile -i', ';',
+				'new-window', 'bash --norc --noprofile -i', ';',
+				'new-window', 'bash --norc --noprofile -i', ';',
+			],
+			cwd=VTERM_TEST_DIR,
+			env={
+				'TERM': 'vt100',
+				'PATH': vterm_path,
+				'SHELL': os.path.join(''),
+				'POWERLINE_CONFIG_PATHS': os.path.abspath('powerline/config_files'),
+				'POWERLINE_COMMAND': 'powerline-render',
+				'POWERLINE_THEME_OVERRIDES': (
+					'default.segments.right=[{"type":"string","name":"s1","highlight_groups":["cwd"]}];'
+					'default.segments.left=[{"type":"string","name":"s2","highlight_groups":["background"]}];'
+					'default.segment_data.s1.contents=S1 string here;'
+					'default.segment_data.s2.contents=S2 string here;'
+				),
+			},
+		)
+		p.start()
+		sleep(1)
+		last_line = []
+		for col in range(cols):
+			last_line.append(p[rows - 1, col])
+		result = tuple((
+			(key, ''.join((i.text for i in subline)))
+			for key, subline in groupby(last_line, lambda i: i.cell_properties_key)
+		))
+		expected_result = (
+			(((0, 0, 0), (243, 243, 243), 1, 0, 0), ' 0 '),
+			(((243, 243, 243), (11, 11, 11), 0, 0, 0), ' '),
+			(((255, 255, 255), (11, 11, 11), 0, 0, 0), ' S2 string here  '),
+			(((133, 133, 133), (11, 11, 11), 0, 0, 0), ' 0 '),
+			(((88, 88, 88), (11, 11, 11), 0, 0, 0), '| '),
+			(((188, 188, 188), (11, 11, 11), 0, 0, 0), 'bash  '),
+			(((255, 255, 255), (11, 11, 11), 0, 0, 0), ' '),
+			(((133, 133, 133), (11, 11, 11), 0, 0, 0), ' 1 '),
+			(((88, 88, 88), (11, 11, 11), 0, 0, 0), '| '),
+			(((0, 102, 153), (11, 11, 11), 0, 0, 0), 'bash  '),
+			(((255, 255, 255), (11, 11, 11), 0, 0, 0), ' '),
+			(((11, 11, 11), (0, 102, 153), 0, 0, 0), ' '),
+			(((102, 204, 255), (0, 102, 153), 0, 0, 0), '2 | '),
+			(((255, 255, 255), (0, 102, 153), 1, 0, 0), 'bash '),
+			(((0, 102, 153), (11, 11, 11), 0, 0, 0), ' '),
+			(((255, 255, 255), (11, 11, 11), 0, 0, 0), '                                                                                                                               '),
+			(((88, 88, 88), (11, 11, 11), 0, 0, 0), ' '),
+			(((199, 199, 199), (88, 88, 88), 0, 0, 0), ' S1 string here '),
+		)
+		print('Result:')
+		shesc_result = ''.join((
+			'{0}{1}\x1b[m'.format(cell_properties_key_to_shell_escape(key), text)
+			for key, text in result
+		))
+		print(shesc_result)
+		print('Expected:')
+		shesc_expected_result = ''.join((
+			'{0}{1}\x1b[m'.format(cell_properties_key_to_shell_escape(key), text)
+			for key, text in expected_result
+		))
+		print(shesc_expected_result)
+		print('Screen:')
+		screen = []
+		for i in range(rows):
+			screen.append([])
+			for j in range(cols):
+				screen[-1].append(p[i, j])
+		print('\n'.join(
+			''.join((
+				'{0}{1}\x1b[m'.format(
+					cell_properties_key_to_shell_escape(i.cell_properties_key),
+					i.text
+				) for i in line
+			))
+			for line in screen
+		))
+		if result == expected_result:
+			return True
+		else:
+			a = shesc_result.replace('\x1b', '\\e') + '\n'
+			b = shesc_expected_result.replace('\x1b', '\\e') + '\n'
+			print('_' * 80)
+			print('Diff:')
+			print('=' * 80)
+			print(''.join((u(line) for line in ndiff([a], [b]))))
+			return False
+	finally:
+		check_call(['tmux', '-S', socket_path, 'kill-server'], env={
+			'PATH': vterm_path,
+		})
+
+
+if __name__ == '__main__':
+	if main():
+		raise SystemExit(0)
+	else:
+		raise SystemExit(1)