mirror of
https://github.com/powerline/powerline.git
synced 2025-07-28 16:24:57 +02:00
Refactor try block in main and refactor specifying dimensions
This commit is contained in:
parent
3b769d2b98
commit
3d0f168806
@ -9,16 +9,39 @@ from signal import SIGKILL
|
|||||||
|
|
||||||
import pexpect
|
import pexpect
|
||||||
|
|
||||||
from tests.lib.vterm import VTerm
|
from tests.lib.vterm import VTerm, Dimensions
|
||||||
|
|
||||||
|
|
||||||
|
class MutableDimensions(Dimensions):
|
||||||
|
def __new__(cls, rows, cols):
|
||||||
|
return Dimensions.__new__(cls, rows, cols)
|
||||||
|
|
||||||
|
def __init__(self, rows, cols):
|
||||||
|
Dimensions.__init__(self, rows, cols)
|
||||||
|
self._list = [rows, cols]
|
||||||
|
|
||||||
|
def __getitem__(self, idx):
|
||||||
|
return self._list[idx]
|
||||||
|
|
||||||
|
def __setitem__(self, idx, val):
|
||||||
|
self._list[idx] = val
|
||||||
|
|
||||||
|
rows = property(
|
||||||
|
fget = lambda self: self._list[0],
|
||||||
|
fset = lambda self, val: self._list.__setitem__(0, val),
|
||||||
|
)
|
||||||
|
cols = property(
|
||||||
|
fget = lambda self: self._list[1],
|
||||||
|
fset = lambda self, val: self._list.__setitem__(1, val),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class ExpectProcess(threading.Thread):
|
class ExpectProcess(threading.Thread):
|
||||||
def __init__(self, lib, rows, cols, cmd, args, cwd=None, env=None):
|
def __init__(self, lib, dim, cmd, args, cwd=None, env=None):
|
||||||
super(ExpectProcess, self).__init__()
|
super(ExpectProcess, self).__init__()
|
||||||
self.vterm = VTerm(lib, rows, cols)
|
self.vterm = VTerm(lib, dim)
|
||||||
self.lock = threading.Lock()
|
self.lock = threading.Lock()
|
||||||
self.rows = rows
|
self.dim = dim
|
||||||
self.cols = cols
|
|
||||||
self.cmd = cmd
|
self.cmd = cmd
|
||||||
self.args = args
|
self.args = args
|
||||||
self.cwd = cwd
|
self.cwd = cwd
|
||||||
@ -30,7 +53,7 @@ class ExpectProcess(threading.Thread):
|
|||||||
def run(self):
|
def run(self):
|
||||||
child = pexpect.spawn(self.cmd, self.args, cwd=self.cwd, env=self.env)
|
child = pexpect.spawn(self.cmd, self.args, cwd=self.cwd, env=self.env)
|
||||||
sleep(0.5)
|
sleep(0.5)
|
||||||
child.setwinsize(self.rows, self.cols)
|
child.setwinsize(self.dim.rows, self.dim.cols)
|
||||||
sleep(0.5)
|
sleep(0.5)
|
||||||
self.child = child
|
self.child = child
|
||||||
status = None
|
status = None
|
||||||
@ -54,12 +77,11 @@ class ExpectProcess(threading.Thread):
|
|||||||
def kill(self):
|
def kill(self):
|
||||||
self.shutdown_event.set()
|
self.shutdown_event.set()
|
||||||
|
|
||||||
def resize(self, rows, cols):
|
def resize(self, dim):
|
||||||
with self.child_lock:
|
with self.child_lock:
|
||||||
self.rows = rows
|
self.dim = dim
|
||||||
self.cols = cols
|
self.child.setwinsize(dim.rows, dim.cols)
|
||||||
self.child.setwinsize(rows, cols)
|
self.vterm.resize(dim)
|
||||||
self.vterm.resize(rows, cols)
|
|
||||||
|
|
||||||
def __getitem__(self, position):
|
def __getitem__(self, position):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
@ -94,13 +116,14 @@ class ExpectProcess(threading.Thread):
|
|||||||
return self.get_highlighted_text((
|
return self.get_highlighted_text((
|
||||||
(key, ''.join((cell.text for cell in subline)))
|
(key, ''.join((cell.text for cell in subline)))
|
||||||
for key, subline in groupby((
|
for key, subline in groupby((
|
||||||
self.vterm.vtscreen[row, col] for col in range(self.cols)
|
self.vterm.vtscreen[row, col]
|
||||||
|
for col in range(self.dim.cols)
|
||||||
), lambda cell: cell.cell_properties_key)
|
), lambda cell: cell.cell_properties_key)
|
||||||
), attrs, default_props)
|
), attrs, default_props)
|
||||||
|
|
||||||
def get_screen(self, attrs, default_props=()):
|
def get_screen(self, attrs, default_props=()):
|
||||||
lines = []
|
lines = []
|
||||||
for row in range(self.rows):
|
for row in range(self.dim.rows):
|
||||||
line, attrs = self.get_row(row, attrs, default_props)
|
line, attrs = self.get_row(row, attrs, default_props)
|
||||||
lines.append(line)
|
lines.append(line)
|
||||||
return '\n'.join(lines), attrs
|
return '\n'.join(lines), attrs
|
||||||
|
@ -3,9 +3,14 @@ from __future__ import (unicode_literals, division, absolute_import, print_funct
|
|||||||
|
|
||||||
import ctypes
|
import ctypes
|
||||||
|
|
||||||
|
from collections import namedtuple
|
||||||
|
|
||||||
from powerline.lib.unicode import unicode, unichr, tointiter
|
from powerline.lib.unicode import unicode, unichr, tointiter
|
||||||
|
|
||||||
|
|
||||||
|
Dimensions = namedtuple('Dimensions', ('rows', 'cols'))
|
||||||
|
|
||||||
|
|
||||||
class CTypesFunction(object):
|
class CTypesFunction(object):
|
||||||
def __init__(self, library, name, rettype, args):
|
def __init__(self, library, name, rettype, args):
|
||||||
self.name = name
|
self.name = name
|
||||||
@ -165,9 +170,9 @@ class VTermScreen(object):
|
|||||||
|
|
||||||
|
|
||||||
class VTerm(object):
|
class VTerm(object):
|
||||||
def __init__(self, lib, rows, cols):
|
def __init__(self, lib, dim):
|
||||||
self.functions = get_functions(lib)
|
self.functions = get_functions(lib)
|
||||||
self.vt = self.functions.vterm_new(rows, cols)
|
self.vt = self.functions.vterm_new(dim.rows, dim.cols)
|
||||||
self.vtscreen = VTermScreen(self.functions, self.functions.vterm_obtain_screen(self.vt))
|
self.vtscreen = VTermScreen(self.functions, self.functions.vterm_obtain_screen(self.vt))
|
||||||
self.vtscreen.reset(True)
|
self.vtscreen.reset(True)
|
||||||
|
|
||||||
@ -176,8 +181,8 @@ class VTerm(object):
|
|||||||
data = data.encode('utf-8')
|
data = data.encode('utf-8')
|
||||||
return self.functions.vterm_input_write(self.vt, data, len(data))
|
return self.functions.vterm_input_write(self.vt, data, len(data))
|
||||||
|
|
||||||
def resize(self, rows, cols):
|
def resize(self, dim):
|
||||||
self.functions.vterm_set_size(self.vt, rows, cols)
|
self.functions.vterm_set_size(self.vt, dim.rows, dim.cols)
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
try:
|
try:
|
||||||
|
@ -17,7 +17,7 @@ from powerline.lib.dict import updated
|
|||||||
from powerline.bindings.tmux import get_tmux_version
|
from powerline.bindings.tmux import get_tmux_version
|
||||||
from powerline import get_fallback_logger
|
from powerline import get_fallback_logger
|
||||||
|
|
||||||
from tests.lib.terminal import ExpectProcess
|
from tests.lib.terminal import ExpectProcess, MutableDimensions
|
||||||
|
|
||||||
|
|
||||||
VTERM_TEST_DIR = os.path.abspath('tests/vterm_tmux')
|
VTERM_TEST_DIR = os.path.abspath('tests/vterm_tmux')
|
||||||
@ -46,12 +46,12 @@ def cell_properties_key_to_shell_escape(cell_properties_key):
|
|||||||
))
|
))
|
||||||
|
|
||||||
|
|
||||||
def test_expected_result(p, expected_result, cols, rows, print_logs):
|
def test_expected_result(p, expected_result, dim, print_logs):
|
||||||
expected_text, attrs = expected_result
|
expected_text, attrs = expected_result
|
||||||
attempts = 3
|
attempts = 3
|
||||||
result = None
|
result = None
|
||||||
while attempts:
|
while attempts:
|
||||||
actual_text, all_attrs = p.get_row(rows - 1, attrs)
|
actual_text, all_attrs = p.get_row(dim.rows - 1, attrs)
|
||||||
if actual_text == expected_text:
|
if actual_text == expected_text:
|
||||||
return True
|
return True
|
||||||
attempts -= 1
|
attempts -= 1
|
||||||
@ -103,8 +103,7 @@ def main(attempts=3):
|
|||||||
socket_path = os.path.abspath('tmux-socket-{0}'.format(attempts))
|
socket_path = os.path.abspath('tmux-socket-{0}'.format(attempts))
|
||||||
if os.path.exists(socket_path):
|
if os.path.exists(socket_path):
|
||||||
os.unlink(socket_path)
|
os.unlink(socket_path)
|
||||||
rows = 50
|
dim = MutableDimensions(rows=50, cols=200)
|
||||||
cols = 200
|
|
||||||
|
|
||||||
tmux_exe = os.path.join(vterm_path, 'tmux')
|
tmux_exe = os.path.join(vterm_path, 'tmux')
|
||||||
|
|
||||||
@ -177,6 +176,8 @@ def main(attempts=3):
|
|||||||
with open(conf_file, 'w') as cf_fd:
|
with open(conf_file, 'w') as cf_fd:
|
||||||
cf_fd.write(conf_line)
|
cf_fd.write(conf_line)
|
||||||
|
|
||||||
|
tmux_version = get_tmux_version(get_fallback_logger())
|
||||||
|
|
||||||
base_attrs = {
|
base_attrs = {
|
||||||
((0, 0, 0), (243, 243, 243), 1, 0, 0): 'lead',
|
((0, 0, 0), (243, 243, 243), 1, 0, 0): 'lead',
|
||||||
((243, 243, 243), (11, 11, 11), 0, 0, 0): 'leadsep',
|
((243, 243, 243), (11, 11, 11), 0, 0, 0): 'leadsep',
|
||||||
@ -185,35 +186,8 @@ def main(attempts=3):
|
|||||||
((88, 88, 88), (11, 11, 11), 0, 0, 0): 'cwdhsep',
|
((88, 88, 88), (11, 11, 11), 0, 0, 0): 'cwdhsep',
|
||||||
((0, 0, 0), (0, 224, 0), 0, 0, 0): 'defstl',
|
((0, 0, 0), (0, 224, 0), 0, 0, 0): 'defstl',
|
||||||
}
|
}
|
||||||
|
expected_results = (
|
||||||
try:
|
get_expected_result(
|
||||||
p = ExpectProcess(
|
|
||||||
lib=lib,
|
|
||||||
rows=rows,
|
|
||||||
cols=cols,
|
|
||||||
cmd=tmux_exe,
|
|
||||||
args=[
|
|
||||||
# Specify full path to tmux socket (testing tmux instance must
|
|
||||||
# not interfere with user one)
|
|
||||||
'-S', socket_path,
|
|
||||||
# Force 256-color mode
|
|
||||||
'-2',
|
|
||||||
# Request verbose logging just in case
|
|
||||||
'-v',
|
|
||||||
# Specify configuration file
|
|
||||||
'-f', conf_file,
|
|
||||||
# Run bash three times
|
|
||||||
'new-session', 'bash --norc --noprofile -i', ';',
|
|
||||||
'new-window', 'bash --norc --noprofile -i', ';',
|
|
||||||
'new-window', 'bash --norc --noprofile -i', ';',
|
|
||||||
],
|
|
||||||
cwd=VTERM_TEST_DIR,
|
|
||||||
env=env,
|
|
||||||
)
|
|
||||||
p.start()
|
|
||||||
sleep(5)
|
|
||||||
tmux_version = get_tmux_version(get_fallback_logger())
|
|
||||||
expected_result = get_expected_result(
|
|
||||||
tmux_version,
|
tmux_version,
|
||||||
expected_result_old=(
|
expected_result_old=(
|
||||||
'{lead: 0 }{leadsep: }{bg: S2 string here }'
|
'{lead: 0 }{leadsep: }{bg: S2 string here }'
|
||||||
@ -257,21 +231,10 @@ def main(attempts=3):
|
|||||||
((102, 204, 255), (0, 102, 153), 0, 0, 0): 9,
|
((102, 204, 255), (0, 102, 153), 0, 0, 0): 9,
|
||||||
((255, 255, 255), (0, 102, 153), 1, 0, 0): 10,
|
((255, 255, 255), (0, 102, 153), 1, 0, 0): 10,
|
||||||
})),
|
})),
|
||||||
)
|
),
|
||||||
ret = RetValues.not_run
|
get_expected_result(
|
||||||
if not test_expected_result(p, expected_result, cols, rows, not attempts):
|
|
||||||
if attempts:
|
|
||||||
ret = RetValues.do_restart
|
|
||||||
else:
|
|
||||||
ret = RetValues.failed
|
|
||||||
elif ret is RetValues.not_run:
|
|
||||||
ret = RetValues.partial_success
|
|
||||||
cols = 40
|
|
||||||
p.resize(rows, cols)
|
|
||||||
sleep(5)
|
|
||||||
expected_result = get_expected_result(
|
|
||||||
tmux_version,
|
tmux_version,
|
||||||
expected_result_old=('{bg:' + (' ' * cols) + '}', base_attrs),
|
expected_result_old=('{bg:' + (' ' * dim.cols) + '}', base_attrs),
|
||||||
expected_result_1_7=(
|
expected_result_1_7=(
|
||||||
'{lead: 0 }'
|
'{lead: 0 }'
|
||||||
'{leadsep: }{bg: <}{4:h }{bg: }{5: }'
|
'{leadsep: }{bg: <}{4:h }{bg: }{5: }'
|
||||||
@ -303,8 +266,46 @@ def main(attempts=3):
|
|||||||
((102, 204, 255), (0, 102, 153), 0, 0, 0): 6,
|
((102, 204, 255), (0, 102, 153), 0, 0, 0): 6,
|
||||||
((255, 255, 255), (0, 102, 153), 1, 0, 0): 7,
|
((255, 255, 255), (0, 102, 153), 1, 0, 0): 7,
|
||||||
})),
|
})),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
p = ExpectProcess(
|
||||||
|
lib=lib,
|
||||||
|
dim=dim,
|
||||||
|
cmd=tmux_exe,
|
||||||
|
args=[
|
||||||
|
# Specify full path to tmux socket (testing tmux instance must
|
||||||
|
# not interfere with user one)
|
||||||
|
'-S', socket_path,
|
||||||
|
# Force 256-color mode
|
||||||
|
'-2',
|
||||||
|
# Request verbose logging just in case
|
||||||
|
'-v',
|
||||||
|
# Specify configuration file
|
||||||
|
'-f', conf_file,
|
||||||
|
# Run bash three times
|
||||||
|
'new-session', 'bash --norc --noprofile -i', ';',
|
||||||
|
'new-window', 'bash --norc --noprofile -i', ';',
|
||||||
|
'new-window', 'bash --norc --noprofile -i', ';',
|
||||||
|
],
|
||||||
|
cwd=VTERM_TEST_DIR,
|
||||||
|
env=env,
|
||||||
)
|
)
|
||||||
if not test_expected_result(p, expected_result, cols, rows, not attempts):
|
p.start()
|
||||||
|
sleep(5)
|
||||||
|
ret = RetValues.not_run
|
||||||
|
if not test_expected_result(p, expected_results[0], dim, not attempts):
|
||||||
|
if attempts:
|
||||||
|
ret = RetValues.do_restart
|
||||||
|
else:
|
||||||
|
ret = RetValues.failed
|
||||||
|
elif ret is RetValues.not_run:
|
||||||
|
ret = RetValues.partial_success
|
||||||
|
dim.cols = 40
|
||||||
|
p.resize(dim)
|
||||||
|
sleep(5)
|
||||||
|
if not test_expected_result(p, expected_results[1], dim, not attempts):
|
||||||
if attempts:
|
if attempts:
|
||||||
ret = RetValues.do_restart
|
ret = RetValues.do_restart
|
||||||
else:
|
else:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user