mtest: fix test output issues (in console)

This change set aims to fix various "issues" seen with the current
implementation. The changes can be summarized with the following list:

* Replace emojis and spinners with multiline status displaying the name
  and running time of each currently running test.
* The test output (especially in verbose mode or when multiple failing
  tests' output gets printed out) can get confusing. Try to make the
  output easier to read and grasp. Most notable change here is the
  addition of the test number to the beginning of each printed line.
* Print exit details (i.e. exit code) of the test in verbose mode.
* Try to make the verbose "live" output from tests to match the look and
  feel of otherwise produced (verbose) test output.
pull/9297/head
Hemmo Nieminen 4 years ago committed by Jussi Pakkanen
parent 64c267c49c
commit 5fcb0e6525
  1. 3
      mesonbuild/mlog.py
  2. 410
      mesonbuild/mtest.py

@ -158,6 +158,9 @@ class AnsiText:
def bold(text: str, quoted: bool = False) -> AnsiDecorator:
return AnsiDecorator(text, "\033[1m", quoted=quoted)
def italic(text: str, quoted: bool = False) -> AnsiDecorator:
return AnsiDecorator(text, "\033[3m", quoted=quoted)
def plain(text: str) -> AnsiDecorator:
return AnsiDecorator(text, "")

@ -17,6 +17,7 @@
from pathlib import Path
from collections import deque
from copy import deepcopy
from itertools import islice
import argparse
import asyncio
import datetime
@ -263,9 +264,6 @@ class TestResult(enum.Enum):
result_str = '{res:{reslen}}'.format(res=self.value, reslen=self.maxlen())
return self.colorize(result_str).get_text(colorize)
def get_command_marker(self) -> str:
return str(self.colorize('>>> '))
TYPE_TAPResult = T.Union['TAPParser.Test', 'TAPParser.Error', 'TAPParser.Version', 'TAPParser.Plan', 'TAPParser.Bailout']
@ -319,6 +317,8 @@ class TAPParser:
def parse_test(self, ok: bool, num: int, name: str, directive: T.Optional[str], explanation: T.Optional[str]) -> \
T.Generator[T.Union['TAPParser.Test', 'TAPParser.Error'], None, None]:
name = name.strip()
if name[0:2] == '- ':
name = name[2:]
explanation = explanation.strip() if explanation else None
if directive is not None:
directive = directive.upper()
@ -452,8 +452,8 @@ class TestLogger:
def start_test(self, harness: 'TestHarness', test: 'TestRun') -> None:
pass
def log_subtest(self, harness: 'TestHarness', test: 'TestRun', s: str, res: TestResult) -> None:
pass
def log_subtest(self, harness: 'TestHarness', test: 'TestRun', s: str, res: TestResult) -> str:
return ''
def log(self, harness: 'TestHarness', result: 'TestRun') -> None:
pass
@ -477,25 +477,15 @@ class TestFileLogger(TestLogger):
class ConsoleLogger(TestLogger):
SPINNER = "\U0001f311\U0001f312\U0001f313\U0001f314" + \
"\U0001f315\U0001f316\U0001f317\U0001f318"
SCISSORS = "\u2700 "
HLINE = "\u2015"
RTRI = "\u25B6 "
def __init__(self) -> None:
self.update = asyncio.Event()
self.running_tests = OrderedSet() # type: OrderedSet['TestRun']
self.progress_test = None # type: T.Optional['TestRun']
self.progress_task = None # type: T.Optional[asyncio.Future]
self.max_left_width = 0 # type: int
self.stop = False
self.update = asyncio.Event()
self.should_erase_line = ''
self.test_count = 0
self.started_tests = 0
self.spinner_index = 0
try:
self.cols, _ = os.get_terminal_size(1)
self.is_tty = True
@ -503,59 +493,46 @@ class ConsoleLogger(TestLogger):
self.cols = 80
self.is_tty = False
self.output_start = dashes(self.SCISSORS, self.HLINE, self.cols - 2)
self.output_end = dashes('', self.HLINE, self.cols - 2)
self.sub = self.RTRI
try:
self.output_start.encode(sys.stdout.encoding or 'ascii')
except UnicodeEncodeError:
self.output_start = dashes('8<', '-', self.cols - 2)
self.output_end = dashes('', '-', self.cols - 2)
self.sub = '| '
def flush(self) -> None:
if self.should_erase_line:
print(self.should_erase_line, end='')
self.should_erase_line = ''
def print_progress(self, line: str) -> None:
print(self.should_erase_line, line, sep='', end='\r')
self.should_erase_line = '\x1b[K'
def print_progress(self, lines: T.List[str]) -> None:
line_count = len(lines)
if line_count > 0:
self.flush()
for line in lines:
print(line)
print(f'\x1b[{line_count}A', end='')
self.should_erase_line = '\x1b[K' + '\x1b[1B\x1b[K' * (line_count - 1)
if line_count > 1:
self.should_erase_line += f'\x1b[{line_count - 1}A'
def request_update(self) -> None:
self.update.set()
def emit_progress(self, harness: 'TestHarness') -> None:
if self.progress_test is None:
self.flush()
return
if len(self.running_tests) == 1:
count = f'{self.started_tests}/{self.test_count}'
else:
count = '{}-{}/{}'.format(self.started_tests - len(self.running_tests) + 1,
self.started_tests, self.test_count)
left = '[{}] {} '.format(count, self.SPINNER[self.spinner_index])
self.spinner_index = (self.spinner_index + 1) % len(self.SPINNER)
right = '{spaces} {dur:{durlen}}'.format(
spaces=' ' * TestResult.maxlen(),
dur=int(time.time() - self.progress_test.starttime),
durlen=harness.duration_max_len)
if self.progress_test.timeout:
right += '/{timeout:{durlen}}'.format(
timeout=self.progress_test.timeout,
lines: T.List[str] = []
for test in islice(reversed(self.running_tests), 10):
left = ' ' * (len(str(self.test_count)) * 2 + 2)
right = '{spaces} {dur:{durlen}}'.format(
spaces=' ' * TestResult.maxlen(),
dur=int(time.time() - test.starttime),
durlen=harness.duration_max_len)
right += 's'
detail = self.progress_test.detail
if detail:
right += ' ' + detail
line = harness.format(self.progress_test, colorize=True,
max_left_width=self.max_left_width,
left=left, right=right)
self.print_progress(line)
if test.timeout:
right += '/{timeout:{durlen}}'.format(
timeout=test.timeout,
durlen=harness.duration_max_len)
right += 's'
lines = [harness.format(test, colorize=True,
max_left_width=self.max_left_width,
left=left,
right=right)] + lines
if len(self.running_tests) > 10:
lines += [' ' * len(harness.get_test_num_prefix(0))
+ f'[{len(self.running_tests) - 10} more tests running]']
self.print_progress(lines)
def start(self, harness: 'TestHarness') -> None:
async def report_progress() -> None:
@ -565,26 +542,12 @@ class ConsoleLogger(TestLogger):
while not self.stop:
await self.update.wait()
self.update.clear()
# We may get here simply because the progress line has been
# overwritten, so do not always switch. Only do so every
# second, or if the printed test has finished
if loop.time() >= next_update:
self.progress_test = None
next_update = loop.time() + 1
loop.call_at(next_update, self.request_update)
if (self.progress_test and
self.progress_test.res is not TestResult.RUNNING):
self.progress_test = None
if not self.progress_test:
if not self.running_tests:
continue
# Pick a test in round robin order
self.progress_test = self.running_tests.pop(last=False)
self.running_tests.add(self.progress_test)
self.emit_progress(harness)
self.flush()
@ -602,77 +565,92 @@ class ConsoleLogger(TestLogger):
print(harness.format(test, mlog.colorize_console(),
max_left_width=self.max_left_width,
right=test.res.get_text(mlog.colorize_console())))
print(test.res.get_command_marker() + test.cmdline)
if test.needs_parsing:
pass
elif not test.is_parallel:
print(self.output_start, flush=True)
else:
print(flush=True)
self.started_tests += 1
self.running_tests.add(test)
self.running_tests.move_to_end(test, last=False)
self.request_update()
def shorten_log(self, harness: 'TestHarness', result: 'TestRun') -> str:
if not harness.options.verbose and not harness.options.print_errorlogs:
return ''
log = result.get_log(mlog.colorize_console(),
stderr_only=result.needs_parsing)
if harness.options.verbose:
return log
lines = log.splitlines()
if len(lines) < 100:
return log
else:
return str(mlog.bold('Listing only the last 100 lines from a long log.\n')) + '\n'.join(lines[-100:])
def print_log(self, harness: 'TestHarness', result: 'TestRun') -> None:
if not harness.options.verbose:
cmdline = result.cmdline
if not cmdline:
print(result.res.get_command_marker() + result.stdo)
return
print(result.res.get_command_marker() + cmdline)
log = self.shorten_log(harness, result)
if log:
print(self.output_start)
print_safe(log)
print(self.output_end)
@staticmethod
def print_test_details_header(prefix: str, header: str) -> None:
header += ':'
print(prefix + mlog.italic(f'{header:<9}').get_text(mlog.colorize_console()))
def log_subtest(self, harness: 'TestHarness', test: 'TestRun', s: str, result: TestResult) -> None:
if harness.options.verbose or (harness.options.print_errorlogs and result.is_bad()):
self.flush()
print(harness.format(test, mlog.colorize_console(), max_left_width=self.max_left_width,
prefix=self.sub,
middle=s,
right=result.get_text(mlog.colorize_console())), flush=True)
@staticmethod
def print_test_details_line(prefix: str,
line: str,
end: str = '\n',
flush: bool = False) -> None:
print(prefix + ' ' + line, flush=flush, end=end)
self.request_update()
@staticmethod
def print_test_details(prefix: str,
header: str,
lines: T.Union[T.List[str], str],
clip: T.Optional[bool] = False) -> None:
offset = 0
if not isinstance(lines, list):
lines = [lines]
if clip and len(lines) > 100:
offset = -100
header += ' (only the last 100 lines from a long output included)'
ConsoleLogger.print_test_details_header(prefix, header)
for line in lines[offset:]:
ConsoleLogger.print_test_details_line(prefix, line)
def print_log(self,
harness: 'TestHarness',
result: 'TestRun',
no_output: bool = False) -> None:
assert result.cmdline
prefix = harness.get_test_num_prefix(result.num)
self.print_test_details(prefix, "command", result.cmdline)
self.print_test_details(prefix,
"exit details",
returncode_to_status(result.returncode))
if not no_output:
if result.stdo:
if harness.options.split or result.stde:
name = 'stdout'
else:
name = 'output'
self.print_test_details(prefix,
name,
result.stdo.splitlines(),
not harness.options.verbose)
if result.stde:
self.print_test_details(prefix,
"stderr",
result.stde.splitlines(),
not harness.options.verbose)
if result.additional_out:
self.print_test_details(prefix,
"additional output",
result.additional_out.splitlines(),
not harness.options.verbose)
if result.additional_err:
self.print_test_details(prefix,
"additional error",
result.additional_err.splitlines(),
not harness.options.verbose)
def log_subtest(self, harness: 'TestHarness', test: 'TestRun', s: str, result: TestResult) -> str:
return 'subtest %s %s' % (s, result.get_text(mlog.colorize_console()))
def log(self, harness: 'TestHarness', result: 'TestRun') -> None:
self.running_tests.remove(result)
if result.res is TestResult.TIMEOUT and harness.options.verbose:
self.flush()
print(f'{result.name} time out (After {result.timeout} seconds)')
if result.res is TestResult.TIMEOUT and (harness.options.verbose or
harness.options.print_errorlogs):
result.additional_err += f'timed out (after {result.timeout} seconds)\n'
if not harness.options.quiet or not result.res.is_ok():
self.flush()
if harness.options.verbose and not result.is_parallel and result.cmdline:
if not result.needs_parsing:
print(self.output_end)
print(harness.format(result, mlog.colorize_console(), max_left_width=self.max_left_width))
else:
print(harness.format(result, mlog.colorize_console(), max_left_width=self.max_left_width),
flush=True)
if harness.options.verbose or result.res.is_bad():
self.print_log(harness, result)
if harness.options.verbose or result.res.is_bad():
print(flush=True)
print(harness.format(result, mlog.colorize_console(), max_left_width=self.max_left_width))
if harness.options.verbose and not result.is_parallel and result.cmdline and not result.needs_parsing:
# output already printed during execution
self.print_log(harness, result, no_output=True)
elif harness.options.verbose or (result.res.is_bad() and harness.options.print_errorlogs):
# verbose or fail + print_errorlogs -> print
self.print_log(harness, result)
self.request_update()
@ -703,9 +681,14 @@ class TextLogfileBuilder(TestFileLogger):
if cmdline:
starttime_str = time.strftime("%H:%M:%S", time.gmtime(result.starttime))
self.file.write(starttime_str + ' ' + cmdline + '\n')
self.file.write(dashes('output', '-', 78) + '\n')
self.file.write(result.get_log())
self.file.write(dashes('', '-', 78) + '\n\n')
if result.stdo:
self.file.write(dashes('stdout', '-', 78) + '\n')
self.file.write(result.stdo + '\n')
self.file.write(dashes('', '-', 78) + '\n\n')
if result.stde:
self.file.write(dashes('stderr', '-', 78) + '\n')
self.file.write(result.stde + '\n')
self.file.write(dashes('', '-', 78) + '\n\n')
async def finish(self, harness: 'TestHarness') -> None:
if harness.collected_failures:
@ -895,7 +878,6 @@ class TestRun:
self._num = TestRun.TEST_NUM
return self._num
@property
def detail(self) -> str:
if self.res is TestResult.PENDING:
return ''
@ -912,7 +894,8 @@ class TestRun:
return ''
def _complete(self, returncode: int, res: TestResult,
stdo: T.Optional[str], stde: T.Optional[str]) -> None:
stdo: T.Optional[str], stde: T.Optional[str],
additional_out: T.Optional[str], additional_err: T.Optional[str]) -> None:
assert isinstance(res, TestResult)
if self.should_fail and res in (TestResult.OK, TestResult.FAIL):
res = TestResult.UNEXPECTEDPASS if res.is_ok() else TestResult.EXPECTEDFAIL
@ -922,6 +905,8 @@ class TestRun:
self.duration = time.time() - self.starttime
self.stdo = stdo
self.stde = stde
self.additional_out = additional_out
self.additional_err = additional_err
@property
def cmdline(self) -> T.Optional[str]:
@ -933,43 +918,28 @@ class TestRun:
def complete_skip(self, message: str) -> None:
self.starttime = time.time()
self._complete(GNU_SKIP_RETURNCODE, TestResult.SKIP, message, None)
self._complete(GNU_SKIP_RETURNCODE, TestResult.SKIP, message, None, None, None)
def complete(self, returncode: int, res: TestResult,
stdo: T.Optional[str], stde: T.Optional[str]) -> None:
self._complete(returncode, res, stdo, stde)
def get_log(self, colorize: bool = False, stderr_only: bool = False) -> str:
stdo = '' if stderr_only else self.stdo
if self.stde:
res = ''
if stdo:
res += mlog.cyan('stdout:').get_text(colorize) + '\n'
res += stdo
if res[-1:] != '\n':
res += '\n'
res += mlog.cyan('stderr:').get_text(colorize) + '\n'
res += self.stde
else:
res = stdo
if res and res[-1:] != '\n':
res += '\n'
return res
stdo: T.Optional[str], stde: T.Optional[str],
additional_out: T.Optional[str], additional_err: T.Optional[str]) -> None:
self._complete(returncode, res, stdo, stde, additional_out, additional_err)
@property
def needs_parsing(self) -> bool:
return False
async def parse(self, harness: 'TestHarness', lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]:
async def parse(self, harness: 'TestHarness', lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str, str]:
async for l in lines:
pass
return TestResult.OK, ''
return TestResult.OK, '', ''
class TestRunExitCode(TestRun):
def complete(self, returncode: int, res: TestResult,
stdo: T.Optional[str], stde: T.Optional[str]) -> None:
stdo: T.Optional[str], stde: T.Optional[str],
additional_out: T.Optional[str], additional_err: T.Optional[str]) -> None:
if res:
pass
elif returncode == GNU_SKIP_RETURNCODE:
@ -978,14 +948,15 @@ class TestRunExitCode(TestRun):
res = TestResult.ERROR
else:
res = TestResult.FAIL if bool(returncode) else TestResult.OK
super().complete(returncode, res, stdo, stde)
super().complete(returncode, res, stdo, stde, additional_out, additional_err)
TestRun.PROTOCOL_TO_CLASS[TestProtocol.EXITCODE] = TestRunExitCode
class TestRunGTest(TestRunExitCode):
def complete(self, returncode: int, res: TestResult,
stdo: T.Optional[str], stde: T.Optional[str]) -> None:
stdo: T.Optional[str], stde: T.Optional[str],
additional_out: T.Optional[str], additional_err: T.Optional[str]) -> None:
filename = f'{self.test.name}.xml'
if self.test.workdir:
filename = os.path.join(self.test.workdir, filename)
@ -998,7 +969,7 @@ class TestRunGTest(TestRunExitCode):
# will handle the failure, don't generate a stacktrace.
pass
super().complete(returncode, res, stdo, stde)
super().complete(returncode, res, stdo, stde, additional_out, additional_err)
TestRun.PROTOCOL_TO_CLASS[TestProtocol.GTEST] = TestRunGTest
@ -1009,35 +980,39 @@ class TestRunTAP(TestRun):
return True
def complete(self, returncode: int, res: TestResult,
stdo: str, stde: str) -> None:
stdo: T.Optional[str], stde: T.Optional[str],
additional_out: T.Optional[str], additional_err: T.Optional[str]) -> None:
if returncode != 0 and not res.was_killed():
res = TestResult.ERROR
stde = stde or ''
stde += f'\n(test program exited with status code {returncode})'
super().complete(returncode, res, stdo, stde)
super().complete(returncode, res, stdo, stde, additional_out, additional_err)
async def parse(self, harness: 'TestHarness', lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]:
async def parse(self,
harness: 'TestHarness',
lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str, str]:
res = TestResult.OK
output = ''
error = ''
async for i in TAPParser().parse_async(lines):
if isinstance(i, TAPParser.Bailout):
res = TestResult.ERROR
harness.log_subtest(self, i.message, res)
output += '\n' + harness.log_subtest(self, i.message, res)
elif isinstance(i, TAPParser.Test):
self.results.append(i)
if i.result.is_bad():
res = TestResult.FAIL
harness.log_subtest(self, i.name or f'subtest {i.number}', i.result)
output += '\n' + harness.log_subtest(self, i.name or f'subtest {i.number}', i.result)
elif isinstance(i, TAPParser.Error):
error = '\nTAP parsing error: ' + i.message
error += '\nTAP parsing error: ' + i.message
res = TestResult.ERROR
if all(t.result is TestResult.SKIP for t in self.results):
# This includes the case where self.results is empty
res = TestResult.SKIP
return res, error
return res, output, error
TestRun.PROTOCOL_TO_CLASS[TestProtocol.TAP] = TestRunTAP
@ -1047,7 +1022,9 @@ class TestRunRust(TestRun):
def needs_parsing(self) -> bool:
return True
async def parse(self, harness: 'TestHarness', lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]:
async def parse(self,
harness: 'TestHarness',
lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str, str]:
def parse_res(n: int, name: str, result: str) -> TAPParser.Test:
if result == 'ok':
return TAPParser.Test(n, name, TestResult.OK, None)
@ -1058,6 +1035,7 @@ class TestRunRust(TestRun):
return TAPParser.Test(n, name, TestResult.ERROR,
f'Unsupported output from rust test: {result}')
output = ''
n = 1
async for line in lines:
if line.startswith('test ') and not line.startswith('test result'):
@ -1065,17 +1043,17 @@ class TestRunRust(TestRun):
name = name.replace('::', '.')
t = parse_res(n, name, result)
self.results.append(t)
harness.log_subtest(self, name, t.result)
output += '\n' + harness.log_subtest(self, name, t.result)
n += 1
if all(t.result is TestResult.SKIP for t in self.results):
# This includes the case where self.results is empty
return TestResult.SKIP, ''
return TestResult.SKIP, output, ''
elif any(t.result is TestResult.ERROR for t in self.results):
return TestResult.ERROR, ''
return TestResult.ERROR, output, ''
elif any(t.result is TestResult.FAIL for t in self.results):
return TestResult.FAIL, ''
return TestResult.OK, ''
return TestResult.FAIL, output, ''
return TestResult.OK, output, ''
TestRun.PROTOCOL_TO_CLASS[TestProtocol.RUST] = TestRunRust
@ -1088,14 +1066,17 @@ def decode(stream: T.Union[None, bytes]) -> str:
except UnicodeDecodeError:
return stream.decode('iso-8859-1', errors='ignore')
async def read_decode(reader: asyncio.StreamReader, console_mode: ConsoleUser) -> str:
async def read_decode(reader: asyncio.StreamReader,
line_handler: T.Callable[[str], None]) -> str:
stdo_lines = []
try:
while not reader.at_eof():
line = decode(await reader.readline())
if len(line) == 0:
continue
stdo_lines.append(line)
if console_mode is ConsoleUser.STDOUT:
print(line, end='', flush=True)
if line_handler:
line_handler(line)
return ''.join(stdo_lines)
except asyncio.CancelledError:
return ''.join(stdo_lines)
@ -1206,16 +1187,17 @@ class TestSubprocess:
self.stdo_task = asyncio.ensure_future(decode_coro)
return queue_iter(q)
def communicate(self, console_mode: ConsoleUser) -> T.Tuple[T.Optional[T.Awaitable[str]],
T.Optional[T.Awaitable[str]]]:
def communicate(self,
console_mode: ConsoleUser,
line_handler: T.Callable[[str], None] = None) -> T.Tuple[T.Optional[T.Awaitable[str]], T.Optional[T.Awaitable[str]]]:
# asyncio.ensure_future ensures that printing can
# run in the background, even before it is awaited
if self.stdo_task is None and self.stdout is not None:
decode_coro = read_decode(self._process.stdout, console_mode)
decode_coro = read_decode(self._process.stdout, line_handler)
self.stdo_task = asyncio.ensure_future(decode_coro)
self.all_futures.append(self.stdo_task)
if self.stderr is not None and self.stderr != asyncio.subprocess.STDOUT:
decode_coro = read_decode(self._process.stderr, console_mode)
decode_coro = read_decode(self._process.stderr, line_handler)
self.stde_task = asyncio.ensure_future(decode_coro)
self.all_futures.append(self.stde_task)
@ -1285,7 +1267,9 @@ class TestSubprocess:
if self.postwait_fn:
self.postwait_fn()
return p.returncode or 0, result, additional_error
return p.returncode or 0, \
result, \
additional_error + '\n' if additional_error else ''
class SingleTestRunner:
@ -1443,22 +1427,39 @@ class SingleTestRunner:
parse_task = None
if self.runobj.needs_parsing:
parse_coro = self.runobj.parse(harness, p.stdout_lines(self.console_mode))
parse_coro = self.runobj.parse(harness,
p.stdout_lines(self.console_mode))
parse_task = asyncio.ensure_future(parse_coro)
stdo_task, stde_task = p.communicate(self.console_mode)
if self.console_mode == ConsoleUser.STDOUT:
prefix = harness.get_test_num_prefix(self.runobj.num)
def printer(line: str) -> None:
ConsoleLogger.print_test_details_line(prefix,
line,
flush=True,
end='')
ConsoleLogger.print_test_details_header(prefix, 'output')
stdo_task, stde_task = p.communicate(self.console_mode, printer)
else:
stdo_task, stde_task = p.communicate(self.console_mode)
additional_output = ''
returncode, result, additional_error = await p.wait(self.runobj.timeout)
if parse_task is not None:
res, error = await parse_task
res, additional_output, error = await parse_task
if error:
additional_error = join_lines(additional_error, error)
result = result or res
stdo = await stdo_task if stdo_task else ''
stde = await stde_task if stde_task else ''
stde = join_lines(stde, additional_error)
self.runobj.complete(returncode, result, stdo, stde)
self.runobj.complete(returncode,
result,
stdo.strip(),
stde.strip(),
additional_output.strip(),
additional_error.strip())
class TestHarness:
@ -1598,18 +1599,18 @@ class TestHarness:
def max_left_width(self) -> int:
return 2 * self.numlen + 2
def get_test_num_prefix(self, num: int) -> str:
return '{num:{numlen}}/{testcount} '.format(numlen=self.numlen,
num=num,
testcount=self.test_count)
def format(self, result: TestRun, colorize: bool,
max_left_width: int = 0,
prefix: str = '',
left: T.Optional[str] = None,
middle: T.Optional[str] = None,
right: T.Optional[str] = None) -> str:
if left is None:
left = '{num:{numlen}}/{testcount} '.format(
numlen=self.numlen,
num=result.num,
testcount=self.test_count)
left = self.get_test_num_prefix(result.num)
# A non-default max_left_width lets the logger print more stuff before the
# name, while ensuring that the rightmost columns remain aligned.
@ -1617,7 +1618,7 @@ class TestHarness:
if middle is None:
middle = result.name
extra_mid_width = max_left_width + self.name_max_len + 1 - uniwidth(middle) - uniwidth(left) - uniwidth(prefix)
extra_mid_width = max_left_width + self.name_max_len + 1 - uniwidth(middle) - uniwidth(left)
middle += ' ' * max(1, extra_mid_width)
if right is None:
@ -1625,13 +1626,16 @@ class TestHarness:
res=result.res.get_text(colorize),
dur=result.duration,
durlen=self.duration_max_len + 3)
detail = result.detail
if detail:
right += ' ' + detail
return prefix + left + middle + right
if not (result.res.is_bad() and self.options.print_errorlogs) \
and not self.options.verbose \
and (result.res.is_bad() or result.needs_parsing):
detail = result.detail()
if detail:
right += ' ' + detail
return left + middle + right
def summary(self) -> str:
return textwrap.dedent('''
return textwrap.dedent('''\
Ok: {:<4}
Expected Fail: {:<4}
@ -1672,7 +1676,7 @@ class TestHarness:
for runner in runners])
# Disable the progress report if it gets in the way
self.need_console = any(runner.console_mode is not ConsoleUser.LOGGER
for runner in runners)
for runner in runners)
self.test_count = len(runners)
self.run_tests(runners)
@ -1818,9 +1822,13 @@ class TestHarness:
finally:
self.close_logfiles()
def log_subtest(self, test: TestRun, s: str, res: TestResult) -> None:
def log_subtest(self, test: TestRun, s: str, res: TestResult) -> str:
rv = ''
for l in self.loggers:
l.log_subtest(self, test, s, res)
tmp = l.log_subtest(self, test, s, res)
if tmp:
rv += tmp
return rv
def log_start_test(self, test: TestRun) -> None:
for l in self.loggers:

Loading…
Cancel
Save