Merge pull request #8225 from bonzini/mtest-asyncio-cleanups

mtest: cleanups and bugfixes
pull/8456/head
Jussi Pakkanen 4 years ago committed by GitHub
commit 4d5f6876f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 383
      mesonbuild/mtest.py

@ -147,6 +147,13 @@ def print_safe(s: str) -> None:
s = s.encode('ascii', errors='backslashreplace').decode('ascii')
print(s)
def join_lines(a: str, b: str) -> str:
if not a:
return b
if not b:
return a
return a + '\n' + b
def returncode_to_status(retcode: int) -> str:
# Note: We can't use `os.WIFSIGNALED(result.returncode)` and the related
# functions here because the status returned by subprocess is munged. It
@ -180,6 +187,19 @@ class TestException(MesonException):
pass
@enum.unique
class ConsoleUser(enum.Enum):
# the logger can use the console
LOGGER = 0
# the console is used by gdb
GDB = 1
# the console is used to write stdout/stderr
STDOUT = 2
@enum.unique
class TestResult(enum.Enum):
@ -522,9 +542,7 @@ class ConsoleLogger(TestLogger):
self.test_count = harness.test_count
# In verbose mode, the progress report gets in the way of the tests'
# stdout and stderr.
if self.is_tty() and not harness.options.verbose:
if self.is_tty() and not harness.need_console:
# Account for "[aa-bb/cc] OO " in the progress report
self.max_left_width = 3 * len(str(self.test_count)) + 8
self.progress_task = asyncio.ensure_future(report_progress())
@ -731,6 +749,10 @@ class JunitBuilder(TestLogger):
class TestRun:
TEST_NUM = 0
PROTOCOL_TO_CLASS: T.Dict[TestProtocol, T.Type['TestRun']] = {}
def __new__(cls, test: TestSerialisation, *args: T.Any, **kwargs: T.Any) -> T.Any:
return super().__new__(TestRun.PROTOCOL_TO_CLASS[test.protocol])
def __init__(self, test: TestSerialisation, test_env: T.Dict[str, str],
name: str, timeout: T.Optional[int]):
@ -746,92 +768,15 @@ class TestRun:
self.stdo = None # type: T.Optional[str]
self.stde = None # type: T.Optional[str]
self.cmd = None # type: T.Optional[T.List[str]]
self.env = dict() # type: T.Dict[str, str]
self.env = test_env # type: T.Dict[str, str]
self.should_fail = test.should_fail
self.project = test.project_name
self.junit = None # type: T.Optional[et.ElementTree]
def start(self) -> None:
def start(self, cmd: T.List[str]) -> None:
self.res = TestResult.RUNNING
self.starttime = time.time()
def complete_gtest(self, returncode: int,
stdo: T.Optional[str], stde: T.Optional[str],
cmd: T.List[str]) -> None:
filename = '{}.xml'.format(self.test.name)
if self.test.workdir:
filename = os.path.join(self.test.workdir, filename)
tree = et.parse(filename)
self.complete_exitcode(returncode, stdo, stde, cmd, junit=tree)
def complete_exitcode(self, returncode: int,
stdo: T.Optional[str], stde: T.Optional[str],
cmd: T.List[str],
**kwargs: T.Any) -> None:
if returncode == GNU_SKIP_RETURNCODE:
res = TestResult.SKIP
elif returncode == GNU_ERROR_RETURNCODE:
res = TestResult.ERROR
else:
res = TestResult.FAIL if bool(returncode) else TestResult.OK
self.complete(returncode, res, stdo, stde, cmd, **kwargs)
async def parse_tap(self, lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]:
res = TestResult.OK
error = ''
async for i in TAPParser().parse_async(lines):
if isinstance(i, TAPParser.Bailout):
res = TestResult.ERROR
elif isinstance(i, TAPParser.Test):
self.results.append(i)
if i.result.is_bad():
res = TestResult.FAIL
elif isinstance(i, TAPParser.Error):
error = '\nTAP parsing error: ' + i.message
res = TestResult.ERROR
if all(t.result is TestResult.SKIP for t in self.results):
# This includes the case where self.results is empty
res = TestResult.SKIP
return res, error
def complete_tap(self, returncode: int, res: TestResult,
stdo: str, stde: str, cmd: T.List[str]) -> None:
if returncode != 0 and not res.was_killed():
res = TestResult.ERROR
stde += '\n(test program exited with status code {})'.format(returncode,)
self.complete(returncode, res, stdo, stde, cmd)
async def parse_rust(self, lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]:
def parse_res(n: int, name: str, result: str) -> TAPParser.Test:
if result == 'ok':
return TAPParser.Test(n, name, TestResult.OK, None)
elif result == 'ignored':
return TAPParser.Test(n, name, TestResult.SKIP, None)
elif result == 'FAILED':
return TAPParser.Test(n, name, TestResult.FAIL, None)
return TAPParser.Test(n, name, TestResult.ERROR,
'Unsupported output from rust test: {}'.format(result))
n = 1
async for line in lines:
if line.startswith('test ') and not line.startswith('test result'):
_, name, _, result = line.rstrip().split(' ')
name = name.replace('::', '.')
self.results.append(parse_res(n, name, result))
n += 1
if all(t.result is TestResult.SKIP for t in self.results):
# This includes the case where self.results is empty
return TestResult.SKIP, ''
elif any(t.result is TestResult.ERROR for t in self.results):
return TestResult.ERROR, ''
elif any(t.result is TestResult.FAIL for t in self.results):
return TestResult.FAIL, ''
return TestResult.OK, ''
self.cmd = cmd
@property
def num(self) -> int:
@ -856,9 +801,8 @@ class TestRun:
return '{}/{} subtests passed'.format(passed, ran)
return ''
def complete(self, returncode: int, res: TestResult,
stdo: T.Optional[str], stde: T.Optional[str],
cmd: T.List[str], *, junit: T.Optional[et.ElementTree] = None) -> None:
def _complete(self, returncode: int, res: TestResult,
stdo: T.Optional[str], stde: T.Optional[str]) -> None:
assert isinstance(res, TestResult)
if self.should_fail and res in (TestResult.OK, TestResult.FAIL):
res = TestResult.UNEXPECTEDPASS if res.is_ok() else TestResult.EXPECTEDFAIL
@ -868,8 +812,14 @@ class TestRun:
self.duration = time.time() - self.starttime
self.stdo = stdo
self.stde = stde
self.cmd = cmd
self.junit = junit
def complete_skip(self, message: str) -> None:
self.starttime = time.time()
self._complete(GNU_SKIP_RETURNCODE, TestResult.SKIP, message, None)
def complete(self, returncode: int, res: TestResult,
stdo: T.Optional[str], stde: T.Optional[str]) -> None:
self._complete(returncode, res, stdo, stde)
def get_log(self) -> str:
res = '--- command ---\n'
@ -906,6 +856,118 @@ class TestRun:
log += '\n'.join(lines[-100:])
return log
@property
def needs_parsing(self) -> bool:
return False
async def parse(self, lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]:
async for l in lines:
pass
return TestResult.OK, ''
class TestRunExitCode(TestRun):
def complete(self, returncode: int, res: TestResult,
stdo: T.Optional[str], stde: T.Optional[str]) -> None:
if res:
pass
elif returncode == GNU_SKIP_RETURNCODE:
res = TestResult.SKIP
elif returncode == GNU_ERROR_RETURNCODE:
res = TestResult.ERROR
else:
res = TestResult.FAIL if bool(returncode) else TestResult.OK
super().complete(returncode, res, stdo, stde)
TestRun.PROTOCOL_TO_CLASS[TestProtocol.EXITCODE] = TestRunExitCode
class TestRunGTest(TestRunExitCode):
def complete(self, returncode: int, res: TestResult,
stdo: T.Optional[str], stde: T.Optional[str]) -> None:
filename = '{}.xml'.format(self.test.name)
if self.test.workdir:
filename = os.path.join(self.test.workdir, filename)
self.junit = et.parse(filename)
super().complete(returncode, res, stdo, stde)
TestRun.PROTOCOL_TO_CLASS[TestProtocol.GTEST] = TestRunGTest
class TestRunTAP(TestRun):
@property
def needs_parsing(self) -> bool:
return True
def complete(self, returncode: int, res: TestResult,
stdo: str, stde: str) -> None:
if returncode != 0 and not res.was_killed():
res = TestResult.ERROR
stde += '\n(test program exited with status code {})'.format(returncode,)
super().complete(returncode, res, stdo, stde)
async def parse(self, lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]:
res = TestResult.OK
error = ''
async for i in TAPParser().parse_async(lines):
if isinstance(i, TAPParser.Bailout):
res = TestResult.ERROR
elif isinstance(i, TAPParser.Test):
self.results.append(i)
if i.result.is_bad():
res = TestResult.FAIL
elif isinstance(i, TAPParser.Error):
error = '\nTAP parsing error: ' + i.message
res = TestResult.ERROR
if all(t.result is TestResult.SKIP for t in self.results):
# This includes the case where self.results is empty
res = TestResult.SKIP
return res, error
TestRun.PROTOCOL_TO_CLASS[TestProtocol.TAP] = TestRunTAP
class TestRunRust(TestRun):
@property
def needs_parsing(self) -> bool:
return True
async def parse(self, lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]:
def parse_res(n: int, name: str, result: str) -> TAPParser.Test:
if result == 'ok':
return TAPParser.Test(n, name, TestResult.OK, None)
elif result == 'ignored':
return TAPParser.Test(n, name, TestResult.SKIP, None)
elif result == 'FAILED':
return TAPParser.Test(n, name, TestResult.FAIL, None)
return TAPParser.Test(n, name, TestResult.ERROR,
'Unsupported output from rust test: {}'.format(result))
n = 1
async for line in lines:
if line.startswith('test ') and not line.startswith('test result'):
_, name, _, result = line.rstrip().split(' ')
name = name.replace('::', '.')
self.results.append(parse_res(n, name, result))
n += 1
if all(t.result is TestResult.SKIP for t in self.results):
# This includes the case where self.results is empty
return TestResult.SKIP, ''
elif any(t.result is TestResult.ERROR for t in self.results):
return TestResult.ERROR, ''
elif any(t.result is TestResult.FAIL for t in self.results):
return TestResult.FAIL, ''
return TestResult.OK, ''
TestRun.PROTOCOL_TO_CLASS[TestProtocol.RUST] = TestRunRust
def decode(stream: T.Union[None, bytes]) -> str:
if stream is None:
return ''
@ -914,6 +976,35 @@ def decode(stream: T.Union[None, bytes]) -> str:
except UnicodeDecodeError:
return stream.decode('iso-8859-1', errors='ignore')
async def read_decode(reader: asyncio.StreamReader, console_mode: ConsoleUser) -> str:
if console_mode is not ConsoleUser.STDOUT:
return decode(await reader.read(-1))
stdo_lines = []
while not reader.at_eof():
line = decode(await reader.readline())
stdo_lines.append(line)
print(line, end='', flush=True)
return ''.join(stdo_lines)
# Extract lines out of the StreamReader. Print them
# along the way if requested, and at the end collect
# them all into a future.
async def read_decode_lines(reader: asyncio.StreamReader, f: 'asyncio.Future[str]',
console_mode: ConsoleUser) -> T.AsyncIterator[str]:
stdo_lines = []
try:
while not reader.at_eof():
line = decode(await reader.readline())
stdo_lines.append(line)
if console_mode is ConsoleUser.STDOUT:
print(line, end='', flush=True)
yield line
except Exception as e:
f.set_exception(e)
finally:
f.set_result(''.join(stdo_lines))
def run_with_mono(fname: str) -> bool:
return fname.endswith('.exe') and not (is_windows() or is_cygwin())
@ -971,17 +1062,32 @@ async def complete_all(futures: T.Iterable[asyncio.Future]) -> None:
f.result()
class TestSubprocess:
def __init__(self, p: asyncio.subprocess.Process, postwait_fn: T.Callable[[], None] = None):
def __init__(self, p: asyncio.subprocess.Process,
stdout: T.Optional[int], stderr: T.Optional[int],
postwait_fn: T.Callable[[], None] = None):
self._process = p
self.stdout = stdout
self.stderr = stderr
self.stdo_task = None # type: T.Optional[T.Awaitable[str]]
self.stde_task = None # type: T.Optional[T.Awaitable[str]]
self.postwait_fn = postwait_fn # type: T.Callable[[], None]
@property
def stdout(self) -> T.Optional[asyncio.StreamReader]:
return self._process.stdout
def stdout_lines(self, console_mode: ConsoleUser) -> T.AsyncIterator[str]:
self.stdo_task = asyncio.get_event_loop().create_future()
return read_decode_lines(self._process.stdout, self.stdo_task, console_mode)
@property
def stderr(self) -> T.Optional[asyncio.StreamReader]:
return self._process.stderr
def communicate(self, console_mode: ConsoleUser) -> T.Tuple[T.Optional[T.Awaitable[str]],
T.Optional[T.Awaitable[str]]]:
# asyncio.ensure_future ensures that printing can
# run in the background, even before it is awaited
if self.stdo_task is None and self.stdout is not None:
decode_task = read_decode(self._process.stdout, console_mode)
self.stdo_task = asyncio.ensure_future(decode_task)
if self.stderr is not None and self.stderr != asyncio.subprocess.STDOUT:
decode_task = read_decode(self._process.stderr, console_mode)
self.stde_task = asyncio.ensure_future(decode_task)
return self.stdo_task, self.stde_task
async def _kill(self) -> T.Optional[str]:
# Python does not provide multiplatform support for
@ -1061,6 +1167,13 @@ class SingleTestRunner:
self.runobj = TestRun(test, test_env, name, timeout)
if self.options.gdb:
self.console_mode = ConsoleUser.GDB
elif self.options.verbose and not self.runobj.needs_parsing:
self.console_mode = ConsoleUser.STDOUT
else:
self.console_mode = ConsoleUser.LOGGER
def _get_cmd(self) -> T.Optional[T.List[str]]:
if self.test.fname[0].endswith('.jar'):
return ['java', '-jar'] + self.test.fname
@ -1091,10 +1204,9 @@ class SingleTestRunner:
async def run(self) -> TestRun:
cmd = self._get_cmd()
self.runobj.start()
if cmd is None:
skip_stdout = 'Not run because can not execute cross compiled binaries.'
self.runobj.complete(GNU_SKIP_RETURNCODE, TestResult.SKIP, skip_stdout, None, None)
self.runobj.complete_skip(skip_stdout)
else:
wrap = TestHarness.get_wrapper(self.options)
await self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args)
@ -1131,7 +1243,8 @@ class SingleTestRunner:
env=env,
cwd=cwd,
preexec_fn=preexec_fn if not is_windows() else None)
return TestSubprocess(p, postwait_fn=postwait_fn if not is_windows() else None)
return TestSubprocess(p, stdout=stdout, stderr=stderr,
postwait_fn=postwait_fn if not is_windows() else None)
async def _run_cmd(self, cmd: T.List[str]) -> None:
if self.test.extra_paths:
@ -1154,14 +1267,15 @@ class SingleTestRunner:
if ('MALLOC_PERTURB_' not in self.env or not self.env['MALLOC_PERTURB_']) and not self.options.benchmark:
self.env['MALLOC_PERTURB_'] = str(random.randint(1, 255))
stdout = None
stderr = None
if self.test.protocol is TestProtocol.TAP:
stdout = asyncio.subprocess.PIPE
stderr = None if self.options.verbose else asyncio.subprocess.PIPE
elif not self.options.verbose:
self.runobj.start(cmd)
if self.console_mode is ConsoleUser.GDB:
stdout = None
stderr = None
else:
stdout = asyncio.subprocess.PIPE
stderr = asyncio.subprocess.PIPE if self.options.split else asyncio.subprocess.STDOUT
stderr = asyncio.subprocess.STDOUT \
if not self.options.split and not self.runobj.needs_parsing \
else asyncio.subprocess.PIPE
extra_cmd = [] # type: T.List[str]
if self.test.protocol is TestProtocol.GTEST:
@ -1176,60 +1290,23 @@ class SingleTestRunner:
env=self.env,
cwd=self.test.workdir)
stdo = stde = ''
stdo_task = stde_task = parse_task = None
# Extract lines out of the StreamReader and print them
# along the way if requested
async def lines() -> T.AsyncIterator[str]:
stdo_lines = []
reader = p.stdout
while not reader.at_eof():
line = decode(await reader.readline())
stdo_lines.append(line)
if self.options.verbose:
print(line, end='')
yield line
nonlocal stdo
stdo = ''.join(stdo_lines)
if self.test.protocol is TestProtocol.TAP:
parse_task = self.runobj.parse_tap(lines())
elif self.test.protocol is TestProtocol.RUST:
parse_task = self.runobj.parse_rust(lines())
elif stdout is not None:
stdo_task = p.stdout.read(-1)
if stderr is not None and stderr != asyncio.subprocess.STDOUT:
stde_task = p.stderr.read(-1)
parse_task = None
if self.runobj.needs_parsing:
parse_task = self.runobj.parse(p.stdout_lines(self.console_mode))
stdo_task, stde_task = p.communicate(self.console_mode)
returncode, result, additional_error = await p.wait(self.runobj.timeout)
if result is TestResult.TIMEOUT and self.options.verbose:
print('{} time out (After {} seconds)'.format(self.test.name, self.runobj.timeout))
if stdo_task is not None:
stdo = decode(await stdo_task)
if stde_task is not None:
stde = decode(await stde_task)
if additional_error is not None:
stde += '\n' + additional_error
if parse_task is not None:
res, error = await parse_task
if error:
stde += '\n' + error
additional_error = join_lines(additional_error, error)
result = result or res
if self.test.protocol is TestProtocol.TAP:
self.runobj.complete_tap(returncode, result, stdo, stde, cmd)
return
if result:
self.runobj.complete(returncode, result, stdo, stde, cmd)
elif self.test.protocol is TestProtocol.EXITCODE:
self.runobj.complete_exitcode(returncode, stdo, stde, cmd)
elif self.test.protocol is TestProtocol.GTEST:
self.runobj.complete_gtest(returncode, stdo, stde, cmd)
stdo = await stdo_task if stdo_task else ''
stde = await stde_task if stde_task else ''
stde = join_lines(stde, additional_error)
self.runobj.complete(returncode, result, stdo, stde)
class TestHarness:
@ -1247,6 +1324,7 @@ class TestHarness:
self.is_run = False
self.loggers = [] # type: T.List[TestLogger]
self.loggers.append(ConsoleLogger())
self.need_console = False
if self.options.benchmark:
self.tests = load_benchmarks(options.wd)
@ -1397,6 +1475,9 @@ class TestHarness:
runners = [self.get_test_runner(test) for test in tests]
self.duration_max_len = max([len(str(int(runner.timeout or 99)))
for runner in runners])
# Disable the progress report if it gets in the way
self.need_console = any((runner.console_mode is not ConsoleUser.LOGGER
for runner in runners))
self.run_tests(runners)
finally:
os.chdir(startdir)

Loading…
Cancel
Save