mtest: remove usage of executors

Rewrite the SingleTestRunner to use asyncio to manage subprocesses,
while still using subprocess.Popen to run them.  Concurrency is
managed with an asyncio Semaphore; for simplicity (since this is
a temporary state) we create a new thread for each test that is run
instead of having a pool.

This already provides the main advantage of asyncio, which is better
control on cancellation; with the current code, KeyboardInterrupt
was never handled by the thread executor so the code that tried to handle
it in SingleTestRunner only worked for non-parallel tests.  And
because executor futures cannot be cancelled, there was no way for
the user to kill a test that got stuck.  Instead, without executors
^C exits "meson test" immediately.  The next patch will improve things
even further, allowing a single test to be interrupted with ^C.
pull/7836/head
Paolo Bonzini 4 years ago
parent 98d3863fa4
commit acf5d78f34
  1. 126
      mesonbuild/mtest.py

@ -19,7 +19,6 @@ from collections import namedtuple
from copy import deepcopy
import argparse
import asyncio
import concurrent.futures as conc
import datetime
import enum
import io
@ -35,6 +34,7 @@ import subprocess
import sys
import tempfile
import textwrap
import threading
import time
import typing as T
import xml.etree.ElementTree as et
@ -613,6 +613,13 @@ def load_tests(build_dir: str) -> T.List[TestSerialisation]:
# Custom waiting primitives for asyncio
async def try_wait_one(*awaitables: T.Any, timeout: T.Optional[T.Union[int, float]]) -> None:
try:
await asyncio.wait(awaitables,
timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
except asyncio.TimeoutError:
pass
async def complete(future: asyncio.Future) -> None:
"""Wait for completion of the given future, ignoring cancellation."""
try:
@ -659,7 +666,7 @@ class SingleTestRunner:
return self.test.exe_runner.get_command() + self.test.fname
return self.test.fname
def run(self) -> TestRun:
async def run(self) -> TestRun:
cmd = self._get_cmd()
if cmd is None:
skip_stdout = 'Not run because can not execute cross compiled binaries.'
@ -668,12 +675,12 @@ class SingleTestRunner:
wrap = TestHarness.get_wrapper(self.options)
if self.options.gdb:
self.test.timeout = None
return self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args)
return await self._run_cmd(wrap + cmd + self.test.cmd_args + self.options.test_args)
def _run_subprocess(self, args: T.List[str], *, timeout: T.Optional[int],
stdout: T.IO, stderr: T.IO,
env: T.Dict[str, str], cwd: T.Optional[str]) -> T.Tuple[T.Optional[int], TestResult, T.Optional[str]]:
def kill_process(p: subprocess.Popen) -> T.Optional[str]:
async def _run_subprocess(self, args: T.List[str], *, timeout: T.Optional[int],
stdout: T.IO, stderr: T.IO,
env: T.Dict[str, str], cwd: T.Optional[str]) -> T.Tuple[int, TestResult, T.Optional[str]]:
async def kill_process(p: subprocess.Popen, f: asyncio.Future) -> T.Optional[str]:
# Python does not provide multiplatform support for
# killing a process and all its children so we need
# to roll our own.
@ -687,27 +694,39 @@ class SingleTestRunner:
# Make sure the termination signal actually kills the process
# group, otherwise retry with a SIGKILL.
try:
p.communicate(timeout=0.5)
except subprocess.TimeoutExpired:
os.killpg(p.pid, signal.SIGKILL)
try:
p.communicate(timeout=1)
except subprocess.TimeoutExpired:
# An earlier kill attempt has not worked for whatever reason.
# Try to kill it one last time with a direct call.
# If the process has spawned children, they will remain around.
p.kill()
try:
p.communicate(timeout=1)
except subprocess.TimeoutExpired:
return 'Test process could not be killed.'
await try_wait_one(f, timeout=0.5)
if f.done():
return None
os.killpg(p.pid, signal.SIGKILL)
await try_wait_one(f, timeout=1)
if f.done():
return None
# An earlier kill attempt has not worked for whatever reason.
# Try to kill it one last time with a direct call.
# If the process has spawned children, they will remain around.
p.kill()
await try_wait_one(f, timeout=1)
if f.done():
return None
return 'Test process could not be killed.'
except ProcessLookupError:
# Sometimes (e.g. with Wine) this happens.
# There's nothing we can do (probably the process
# already died) so carry on.
pass
return None
# Sometimes (e.g. with Wine) this happens. There's nothing
# we can do, probably the process already died so just wait
# for the event loop to pick that up.
await f
return None
def wait(p: subprocess.Popen, loop: asyncio.AbstractEventLoop, f: asyncio.Future) -> None:
try:
p.wait()
loop.call_soon_threadsafe(f.set_result, p.returncode)
except BaseException as e: # lgtm [py/catch-base-exception]
# The exception will be raised again in the main thread,
# so catching BaseException is okay.
loop.call_soon_threadsafe(f.set_exception, e)
# Let gdb handle ^C instead of us
if self.options.gdb:
@ -734,25 +753,31 @@ class SingleTestRunner:
preexec_fn=preexec_fn if not is_windows() else None)
result = None
additional_error = None
loop = asyncio.get_event_loop()
future = asyncio.get_event_loop().create_future()
threading.Thread(target=wait, args=(p, loop, future), daemon=True).start()
try:
p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
if self.options.verbose:
print('{} time out (After {} seconds)'.format(self.test.name, timeout))
additional_error = kill_process(p)
result = TestResult.TIMEOUT
except KeyboardInterrupt:
mlog.warning('CTRL-C detected while running {}'.format(self.test.name))
additional_error = kill_process(p)
await try_wait_one(future, timeout=timeout)
if not future.done():
if self.options.verbose:
print('{} time out (After {} seconds)'.format(self.test.name, timeout))
additional_error = await kill_process(p, future)
result = TestResult.TIMEOUT
except asyncio.CancelledError:
# The main loop must have seen Ctrl-C.
additional_error = await kill_process(p, future)
result = TestResult.INTERRUPT
finally:
if self.options.gdb:
# Let us accept ^C again
signal.signal(signal.SIGINT, previous_sigint_handler)
return p.returncode, result, additional_error
if future.done():
return future.result(), result, None
else:
return 0, result, additional_error
def _run_cmd(self, cmd: T.List[str]) -> TestRun:
async def _run_cmd(self, cmd: T.List[str]) -> TestRun:
starttime = time.time()
if self.test.extra_paths:
@ -797,12 +822,12 @@ class SingleTestRunner:
else:
timeout = self.test.timeout
returncode, result, additional_error = self._run_subprocess(cmd + extra_cmd,
timeout=timeout,
stdout=stdout,
stderr=stderr,
env=self.env,
cwd=self.test.workdir)
returncode, result, additional_error = await self._run_subprocess(cmd + extra_cmd,
timeout=timeout,
stdout=stdout,
stderr=stderr,
env=self.env,
cwd=self.test.workdir)
endtime = time.time()
duration = endtime - starttime
if additional_error is None:
@ -1171,7 +1196,7 @@ class TestHarness:
loop.run_until_complete(self._run_tests(tests))
async def _run_tests(self, tests: T.List[TestSerialisation]) -> None:
executor = conc.ThreadPoolExecutor(max_workers=self.options.num_processes)
semaphore = asyncio.Semaphore(self.options.num_processes)
futures = [] # type: T.List[asyncio.Future]
test_count = len(tests)
name_max_len = max([len(self.get_pretty_suite(test)) for test in tests])
@ -1184,11 +1209,12 @@ class TestHarness:
async def run_test(test: SingleTestRunner,
name: str, index: int) -> None:
if interrupted or (self.options.repeat > 1 and self.fail_count):
return
res = await asyncio.get_event_loop().run_in_executor(executor, test.run)
self.process_test_result(res)
self.print_stats(test_count, name_max_len, tests, name, res, index)
async with semaphore:
if interrupted or (self.options.repeat > 1 and self.fail_count):
return
res = await test.run()
self.process_test_result(res)
self.print_stats(test_count, name_max_len, tests, name, res, index)
def test_done(f: asyncio.Future) -> None:
if not f.cancelled():

Loading…
Cancel
Save