Merge pull request #7836 from bonzini/mtest-asyncio

[RFC] mtest: use asyncio instead of concurrency.futures
pull/8005/head
Jussi Pakkanen 4 years ago committed by GitHub
commit ef6f85f8ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 349
      mesonbuild/mtest.py

@ -15,10 +15,10 @@
# A tool to run tests in many different ways. # A tool to run tests in many different ways.
from ._pathlib import Path from ._pathlib import Path
from collections import namedtuple from collections import deque, namedtuple
from copy import deepcopy from copy import deepcopy
import argparse import argparse
import concurrent.futures as conc import asyncio
import datetime import datetime
import enum import enum
import io import io
@ -167,6 +167,7 @@ class TestResult(enum.Enum):
OK = 'OK' OK = 'OK'
TIMEOUT = 'TIMEOUT' TIMEOUT = 'TIMEOUT'
INTERRUPT = 'INTERRUPT'
SKIP = 'SKIP' SKIP = 'SKIP'
FAIL = 'FAIL' FAIL = 'FAIL'
EXPECTEDFAIL = 'EXPECTEDFAIL' EXPECTEDFAIL = 'EXPECTEDFAIL'
@ -326,7 +327,6 @@ class TAPParser:
yield self.Error('Too many tests run (expected {}, got {})'.format(plan.count, num_tests)) yield self.Error('Too many tests run (expected {}, got {})'.format(plan.count, num_tests))
class JunitBuilder: class JunitBuilder:
"""Builder for Junit test results. """Builder for Junit test results.
@ -376,7 +376,8 @@ class JunitBuilder:
'testsuite', 'testsuite',
name=suitename, name=suitename,
tests=str(len(test.results)), tests=str(len(test.results)),
errors=str(sum(1 for r in test.results if r is TestResult.ERROR)), errors=str(sum(1 for r in test.results if r in
{TestResult.INTERRUPT, TestResult.ERROR})),
failures=str(sum(1 for r in test.results if r in failures=str(sum(1 for r in test.results if r in
{TestResult.FAIL, TestResult.UNEXPECTEDPASS, TestResult.TIMEOUT})), {TestResult.FAIL, TestResult.UNEXPECTEDPASS, TestResult.TIMEOUT})),
skipped=str(sum(1 for r in test.results if r is TestResult.SKIP)), skipped=str(sum(1 for r in test.results if r is TestResult.SKIP)),
@ -395,6 +396,9 @@ class JunitBuilder:
elif result is TestResult.UNEXPECTEDPASS: elif result is TestResult.UNEXPECTEDPASS:
fail = et.SubElement(testcase, 'failure') fail = et.SubElement(testcase, 'failure')
fail.text = 'Test unexpected passed.' fail.text = 'Test unexpected passed.'
elif result is TestResult.INTERRUPT:
fail = et.SubElement(testcase, 'failure')
fail.text = 'Test was interrupted by user.'
elif result is TestResult.TIMEOUT: elif result is TestResult.TIMEOUT:
fail = et.SubElement(testcase, 'failure') fail = et.SubElement(testcase, 'failure')
fail.text = 'Test did not finish before configured timeout.' fail.text = 'Test did not finish before configured timeout.'
@ -606,6 +610,31 @@ def load_tests(build_dir: str) -> T.List[TestSerialisation]:
objs = check_testdata(pickle.load(f)) objs = check_testdata(pickle.load(f))
return objs return objs
# Custom waiting primitives for asyncio
async def try_wait_one(*awaitables: T.Any, timeout: T.Optional[T.Union[int, float]]) -> None:
try:
await asyncio.wait(awaitables,
timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
except asyncio.TimeoutError:
pass
async def complete(future: asyncio.Future) -> None:
"""Wait for completion of the given future, ignoring cancellation."""
try:
await future
except asyncio.CancelledError:
pass
async def complete_all(futures: T.Iterable[asyncio.Future]) -> None:
"""Wait for completion of all the given futures, ignoring cancellation."""
while futures:
done, futures = await asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION)
# Raise exceptions if needed for all the "done" futures
for f in done:
if not f.cancelled():
f.result()
class SingleTestRunner: class SingleTestRunner:
@ -636,7 +665,7 @@ class SingleTestRunner:
return self.test.exe_runner.get_command() + self.test.fname return self.test.exe_runner.get_command() + self.test.fname
return self.test.fname return self.test.fname
def run(self) -> TestRun: async def run(self) -> TestRun:
cmd = self._get_cmd() cmd = self._get_cmd()
if cmd is None: if cmd is None:
skip_stdout = 'Not run because can not execute cross compiled binaries.' skip_stdout = 'Not run because can not execute cross compiled binaries.'
@ -645,9 +674,94 @@ class SingleTestRunner:
wrap = TestHarness.get_wrapper(self.options) wrap = TestHarness.get_wrapper(self.options)
if self.options.gdb: if self.options.gdb:
self.test.timeout = None self.test.timeout = None
return self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args) return await self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args)
async def _run_subprocess(self, args: T.List[str], *, timeout: T.Optional[int],
stdout: T.IO, stderr: T.IO,
env: T.Dict[str, str], cwd: T.Optional[str]) -> T.Tuple[int, TestResult, T.Optional[str]]:
async def kill_process(p: asyncio.subprocess.Process) -> T.Optional[str]:
# Python does not provide multiplatform support for
# killing a process and all its children so we need
# to roll our own.
try:
if is_windows():
subprocess.run(['taskkill', '/F', '/T', '/PID', str(p.pid)])
else:
# Send a termination signal to the process group that setsid()
# created - giving it a chance to perform any cleanup.
os.killpg(p.pid, signal.SIGTERM)
# Make sure the termination signal actually kills the process
# group, otherwise retry with a SIGKILL.
await try_wait_one(p.wait(), timeout=0.5)
if p.returncode is not None:
return None
os.killpg(p.pid, signal.SIGKILL)
await try_wait_one(p.wait(), timeout=1)
if p.returncode is not None:
return None
# An earlier kill attempt has not worked for whatever reason.
# Try to kill it one last time with a direct call.
# If the process has spawned children, they will remain around.
p.kill()
await try_wait_one(p.wait(), timeout=1)
if p.returncode is not None:
return None
return 'Test process could not be killed.'
except ProcessLookupError:
# Sometimes (e.g. with Wine) this happens. There's nothing
# we can do, probably the process already died so just wait
# for the event loop to pick that up.
await p.wait()
return None
# Let gdb handle ^C instead of us
if self.options.gdb:
previous_sigint_handler = signal.getsignal(signal.SIGINT)
# Make the meson executable ignore SIGINT while gdb is running.
signal.signal(signal.SIGINT, signal.SIG_IGN)
def _run_cmd(self, cmd: T.List[str]) -> TestRun: def preexec_fn() -> None:
if self.options.gdb:
# Restore the SIGINT handler for the child process to
# ensure it can handle it.
signal.signal(signal.SIGINT, signal.SIG_DFL)
else:
# We don't want setsid() in gdb because gdb needs the
# terminal in order to handle ^C and not show tcsetpgrp()
# errors avoid not being able to use the terminal.
os.setsid()
p = await asyncio.create_subprocess_exec(*args,
stdout=stdout,
stderr=stderr,
env=env,
cwd=cwd,
preexec_fn=preexec_fn if not is_windows() else None)
result = None
additional_error = None
try:
await try_wait_one(p.wait(), timeout=timeout)
if p.returncode is None:
if self.options.verbose:
print('{} time out (After {} seconds)'.format(self.test.name, timeout))
additional_error = await kill_process(p)
result = TestResult.TIMEOUT
except asyncio.CancelledError:
# The main loop must have seen Ctrl-C.
additional_error = await kill_process(p)
result = TestResult.INTERRUPT
finally:
if self.options.gdb:
# Let us accept ^C again
signal.signal(signal.SIGINT, previous_sigint_handler)
return p.returncode or 0, result, additional_error
async def _run_cmd(self, cmd: T.List[str]) -> TestRun:
starttime = time.time() starttime = time.time()
if self.test.extra_paths: if self.test.extra_paths:
@ -678,23 +792,6 @@ class SingleTestRunner:
if self.test.protocol is TestProtocol.TAP and stderr is stdout: if self.test.protocol is TestProtocol.TAP and stderr is stdout:
stdout = tempfile.TemporaryFile("wb+") stdout = tempfile.TemporaryFile("wb+")
# Let gdb handle ^C instead of us
if self.options.gdb:
previous_sigint_handler = signal.getsignal(signal.SIGINT)
# Make the meson executable ignore SIGINT while gdb is running.
signal.signal(signal.SIGINT, signal.SIG_IGN)
def preexec_fn() -> None:
if self.options.gdb:
# Restore the SIGINT handler for the child process to
# ensure it can handle it.
signal.signal(signal.SIGINT, signal.SIG_DFL)
else:
# We don't want setsid() in gdb because gdb needs the
# terminal in order to handle ^C and not show tcsetpgrp()
# errors avoid not being able to use the terminal.
os.setsid()
extra_cmd = [] # type: T.List[str] extra_cmd = [] # type: T.List[str]
if self.test.protocol is TestProtocol.GTEST: if self.test.protocol is TestProtocol.GTEST:
gtestname = self.test.name gtestname = self.test.name
@ -702,77 +799,19 @@ class SingleTestRunner:
gtestname = os.path.join(self.test.workdir, self.test.name) gtestname = os.path.join(self.test.workdir, self.test.name)
extra_cmd.append('--gtest_output=xml:{}.xml'.format(gtestname)) extra_cmd.append('--gtest_output=xml:{}.xml'.format(gtestname))
p = subprocess.Popen(cmd + extra_cmd,
stdout=stdout,
stderr=stderr,
env=self.env,
cwd=self.test.workdir,
preexec_fn=preexec_fn if not is_windows() else None)
timed_out = False
kill_test = False
if self.test.timeout is None: if self.test.timeout is None:
timeout = None timeout = None
elif self.options.timeout_multiplier is not None: elif self.options.timeout_multiplier is not None:
timeout = self.test.timeout * self.options.timeout_multiplier timeout = self.test.timeout * self.options.timeout_multiplier
else: else:
timeout = self.test.timeout timeout = self.test.timeout
try:
p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
if self.options.verbose:
print('{} time out (After {} seconds)'.format(self.test.name, timeout))
timed_out = True
except KeyboardInterrupt:
mlog.warning('CTRL-C detected while running {}'.format(self.test.name))
kill_test = True
finally:
if self.options.gdb:
# Let us accept ^C again
signal.signal(signal.SIGINT, previous_sigint_handler)
additional_error = None
if kill_test or timed_out: returncode, result, additional_error = await self._run_subprocess(cmd + extra_cmd,
# Python does not provide multiplatform support for timeout=timeout,
# killing a process and all its children so we need stdout=stdout,
# to roll our own. stderr=stderr,
if is_windows(): env=self.env,
subprocess.run(['taskkill', '/F', '/T', '/PID', str(p.pid)]) cwd=self.test.workdir)
else:
def _send_signal_to_process_group(pgid : int, signum : int) -> None:
""" sends a signal to a process group """
try:
os.killpg(pgid, signum)
except ProcessLookupError:
# Sometimes (e.g. with Wine) this happens.
# There's nothing we can do (maybe the process
# already died) so carry on.
pass
# Send a termination signal to the process group that setsid()
# created - giving it a chance to perform any cleanup.
_send_signal_to_process_group(p.pid, signal.SIGTERM)
# Make sure the termination signal actually kills the process
# group, otherwise retry with a SIGKILL.
try:
p.communicate(timeout=0.5)
except subprocess.TimeoutExpired:
_send_signal_to_process_group(p.pid, signal.SIGKILL)
try:
p.communicate(timeout=1)
except subprocess.TimeoutExpired:
# An earlier kill attempt has not worked for whatever reason.
# Try to kill it one last time with a direct call.
# If the process has spawned children, they will remain around.
p.kill()
try:
p.communicate(timeout=1)
except subprocess.TimeoutExpired:
additional_error = 'Test process could not be killed.'
except ValueError:
additional_error = 'Could not read output. Maybe the process has redirected its stdout/stderr?'
endtime = time.time() endtime = time.time()
duration = endtime - starttime duration = endtime - starttime
if additional_error is None: if additional_error is None:
@ -789,17 +828,17 @@ class SingleTestRunner:
else: else:
stdo = "" stdo = ""
stde = additional_error stde = additional_error
if timed_out: if result:
return TestRun(self.test, self.test_env, TestResult.TIMEOUT, [], p.returncode, starttime, duration, stdo, stde, cmd) return TestRun(self.test, self.test_env, result, [], returncode, starttime, duration, stdo, stde, cmd)
else: else:
if self.test.protocol is TestProtocol.EXITCODE: if self.test.protocol is TestProtocol.EXITCODE:
return TestRun.make_exitcode(self.test, self.test_env, p.returncode, starttime, duration, stdo, stde, cmd) return TestRun.make_exitcode(self.test, self.test_env, returncode, starttime, duration, stdo, stde, cmd)
elif self.test.protocol is TestProtocol.GTEST: elif self.test.protocol is TestProtocol.GTEST:
return TestRun.make_gtest(self.test, self.test_env, p.returncode, starttime, duration, stdo, stde, cmd) return TestRun.make_gtest(self.test, self.test_env, returncode, starttime, duration, stdo, stde, cmd)
else: else:
if self.options.verbose: if self.options.verbose:
print(stdo, end='') print(stdo, end='')
return TestRun.make_tap(self.test, self.test_env, p.returncode, starttime, duration, stdo, stde, cmd) return TestRun.make_tap(self.test, self.test_env, returncode, starttime, duration, stdo, stde, cmd)
class TestHarness: class TestHarness:
@ -841,7 +880,7 @@ class TestHarness:
def close_logfiles(self) -> None: def close_logfiles(self) -> None:
for f in ['logfile', 'jsonlogfile']: for f in ['logfile', 'jsonlogfile']:
lfile = getattr(self, f) lfile = getattr(self, f)
if lfile: if lfile:
lfile.close() lfile.close()
setattr(self, f, None) setattr(self, f, None)
@ -892,7 +931,7 @@ class TestHarness:
self.skip_count += 1 self.skip_count += 1
elif result.res is TestResult.OK: elif result.res is TestResult.OK:
self.success_count += 1 self.success_count += 1
elif result.res is TestResult.FAIL or result.res is TestResult.ERROR: elif result.res in {TestResult.FAIL, TestResult.ERROR, TestResult.INTERRUPT}:
self.fail_count += 1 self.fail_count += 1
elif result.res is TestResult.EXPECTEDFAIL: elif result.res is TestResult.EXPECTEDFAIL:
self.expectedfail_count += 1 self.expectedfail_count += 1
@ -905,7 +944,7 @@ class TestHarness:
tests: T.List[TestSerialisation], tests: T.List[TestSerialisation],
name: str, result: TestRun, i: int) -> None: name: str, result: TestRun, i: int) -> None:
ok_statuses = (TestResult.OK, TestResult.EXPECTEDFAIL) ok_statuses = (TestResult.OK, TestResult.EXPECTEDFAIL)
bad_statuses = (TestResult.FAIL, TestResult.TIMEOUT, bad_statuses = (TestResult.FAIL, TestResult.TIMEOUT, TestResult.INTERRUPT,
TestResult.UNEXPECTEDPASS, TestResult.ERROR) TestResult.UNEXPECTEDPASS, TestResult.ERROR)
result_str = '{num:{numlen}}/{testcount} {name:{name_max_len}} {res:{reslen}} {dur:.2f}s'.format( result_str = '{num:{numlen}}/{testcount} {name:{name_max_len}} {res:{reslen}} {dur:.2f}s'.format(
numlen=len(str(test_count)), numlen=len(str(test_count)),
@ -951,7 +990,7 @@ class TestHarness:
Skipped: {:<4} Skipped: {:<4}
Timeout: {:<4} Timeout: {:<4}
''').format(self.success_count, self.expectedfail_count, self.fail_count, ''').format(self.success_count, self.expectedfail_count, self.fail_count,
self.unexpectedpass_count, self.skip_count, self.timeout_count) self.unexpectedpass_count, self.skip_count, self.timeout_count)
print(msg) print(msg)
if self.logfile: if self.logfile:
self.logfile.write(msg) self.logfile.write(msg)
@ -1131,8 +1170,14 @@ class TestHarness:
return test.name return test.name
def run_tests(self, tests: T.List[TestSerialisation]) -> None: def run_tests(self, tests: T.List[TestSerialisation]) -> None:
executor = None # Replace with asyncio.run once we can require Python 3.7
futures = [] # type: T.List[T.Tuple[conc.Future[TestRun], int, int, T.List[TestSerialisation], str, int]] loop = asyncio.get_event_loop()
loop.run_until_complete(self._run_tests(tests))
async def _run_tests(self, tests: T.List[TestSerialisation]) -> None:
semaphore = asyncio.Semaphore(self.options.num_processes)
futures = deque() # type: T.Deque[asyncio.Future]
running_tests = dict() # type: T.Dict[asyncio.Future, str]
test_count = len(tests) test_count = len(tests)
name_max_len = max([len(self.get_pretty_suite(test)) for test in tests]) name_max_len = max([len(self.get_pretty_suite(test)) for test in tests])
self.open_log_files() self.open_log_files()
@ -1140,59 +1185,87 @@ class TestHarness:
if self.options.wd: if self.options.wd:
os.chdir(self.options.wd) os.chdir(self.options.wd)
self.build_data = build.load(os.getcwd()) self.build_data = build.load(os.getcwd())
interrupted = False
async def run_test(test: SingleTestRunner,
name: str, index: int) -> None:
async with semaphore:
if interrupted or (self.options.repeat > 1 and self.fail_count):
return
res = await test.run()
self.process_test_result(res)
self.print_stats(test_count, name_max_len, tests, name, res, index)
def test_done(f: asyncio.Future) -> None:
if not f.cancelled():
f.result()
futures.remove(f)
try:
del running_tests[f]
except KeyError:
pass
def cancel_one_test(warn: bool) -> None:
future = futures.popleft()
futures.append(future)
if warn:
mlog.warning('CTRL-C detected, interrupting {}'.format(running_tests[future]))
del running_tests[future]
future.cancel()
def sigterm_handler() -> None:
nonlocal interrupted
if interrupted:
return
interrupted = True
mlog.warning('Received SIGTERM, exiting')
while running_tests:
cancel_one_test(False)
def sigint_handler() -> None:
# We always pick the longest-running future that has not been cancelled
# If all the tests have been CTRL-C'ed, just stop
nonlocal interrupted
if interrupted:
return
if running_tests:
cancel_one_test(True)
else:
mlog.warning('CTRL-C detected, exiting')
interrupted = True
if sys.platform != 'win32':
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, sigint_handler)
asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, sigterm_handler)
try: try:
for _ in range(self.options.repeat): for _ in range(self.options.repeat):
for i, test in enumerate(tests, 1): for i, test in enumerate(tests, 1):
visible_name = self.get_pretty_suite(test) visible_name = self.get_pretty_suite(test)
single_test = self.get_test_runner(test) single_test = self.get_test_runner(test)
if not test.is_parallel or self.options.num_processes == 1 or single_test.options.gdb: if not test.is_parallel or single_test.options.gdb:
self.drain_futures(futures) await complete_all(futures)
futures = [] future = asyncio.ensure_future(run_test(single_test, visible_name, i))
res = single_test.run() futures.append(future)
self.process_test_result(res) running_tests[future] = visible_name
self.print_stats(test_count, name_max_len, tests, visible_name, res, i) future.add_done_callback(test_done)
else: if not test.is_parallel or single_test.options.gdb:
if not executor: await complete(future)
executor = conc.ThreadPoolExecutor(max_workers=self.options.num_processes)
f = executor.submit(single_test.run)
futures.append((f, test_count, name_max_len, tests, visible_name, i))
if self.options.repeat > 1 and self.fail_count:
break
if self.options.repeat > 1 and self.fail_count: if self.options.repeat > 1 and self.fail_count:
break break
self.drain_futures(futures) await complete_all(futures)
self.print_collected_logs() self.print_collected_logs()
self.print_summary() self.print_summary()
if self.logfilename: if self.logfilename:
print('Full log written to {}'.format(self.logfilename)) print('Full log written to {}'.format(self.logfilename))
finally: finally:
if sys.platform != 'win32':
asyncio.get_event_loop().remove_signal_handler(signal.SIGINT)
asyncio.get_event_loop().remove_signal_handler(signal.SIGTERM)
os.chdir(startdir) os.chdir(startdir)
def drain_futures(self, futures: T.List[T.Tuple['conc.Future[TestRun]', int, int, T.List[TestSerialisation], str, int]]) -> None:
for x in futures:
(result, test_count, name_max_len, tests, name, i) = x
if self.options.repeat > 1 and self.fail_count:
result.cancel()
if self.options.verbose:
result.result()
self.process_test_result(result.result())
self.print_stats(test_count, name_max_len, tests, name, result.result(), i)
def run_special(self) -> int:
'''Tests run by the user, usually something like "under gdb 1000 times".'''
if self.is_run:
raise RuntimeError('Can not use run_special after a full run.')
tests = self.get_tests()
if not tests:
return 0
self.run_tests(tests)
return self.total_failure_count()
def list_tests(th: TestHarness) -> bool: def list_tests(th: TestHarness) -> bool:
tests = th.get_tests() tests = th.get_tests()
for t in tests: for t in tests:
@ -1235,6 +1308,10 @@ def run(options: argparse.Namespace) -> int:
if options.wrapper: if options.wrapper:
check_bin = options.wrapper[0] check_bin = options.wrapper[0]
if sys.platform == 'win32':
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
if check_bin is not None: if check_bin is not None:
exe = ExternalProgram(check_bin, silent=True) exe = ExternalProgram(check_bin, silent=True)
if not exe.found(): if not exe.found():
@ -1252,9 +1329,7 @@ def run(options: argparse.Namespace) -> int:
try: try:
if options.list: if options.list:
return list_tests(th) return list_tests(th)
if not options.args: return th.doit()
return th.doit()
return th.run_special()
except TestException as e: except TestException as e:
print('Meson test encountered an error:\n') print('Meson test encountered an error:\n')
if os.environ.get('MESON_FORCE_BACKTRACE'): if os.environ.get('MESON_FORCE_BACKTRACE'):

Loading…
Cancel
Save