diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index f36f1d147..2caa3678d 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -18,6 +18,7 @@ from ._pathlib import Path from collections import namedtuple from copy import deepcopy import argparse +import asyncio import concurrent.futures as conc import datetime import enum @@ -605,6 +606,17 @@ def load_tests(build_dir: str) -> T.List[TestSerialisation]: objs = check_testdata(pickle.load(f)) return objs +# Custom waiting primitives for asyncio + +async def complete_all(futures: T.Iterable[asyncio.Future]) -> None: + """Wait for completion of all the given futures, ignoring cancellation.""" + while futures: + done, futures = await asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION) + # Raise exceptions if needed for all the "done" futures + for f in done: + if not f.cancelled(): + f.result() + class SingleTestRunner: @@ -1135,8 +1147,13 @@ class TestHarness: return test.name def run_tests(self, tests: T.List[TestSerialisation]) -> None: - executor = None - futures = [] # type: T.List[T.Tuple[conc.Future[TestRun], int, int, T.List[TestSerialisation], str, int]] + # Replace with asyncio.run once we can require Python 3.7 + loop = asyncio.get_event_loop() + loop.run_until_complete(self._run_tests(tests)) + + async def _run_tests(self, tests: T.List[TestSerialisation]) -> None: + executor = conc.ThreadPoolExecutor(max_workers=self.options.num_processes) + futures = [] # type: T.List[asyncio.Future] test_count = len(tests) name_max_len = max([len(self.get_pretty_suite(test)) for test in tests]) self.open_log_files() @@ -1145,29 +1162,36 @@ class TestHarness: os.chdir(self.options.wd) self.build_data = build.load(os.getcwd()) + async def run_test(test: SingleTestRunner, + name: str, index: int) -> None: + if self.options.repeat > 1 and self.fail_count: + return + res = await asyncio.get_event_loop().run_in_executor(executor, test.run) + self.process_test_result(res) + self.print_stats(test_count, name_max_len, tests, name, res, index) + + def test_done(f: asyncio.Future) -> None: + if not f.cancelled(): + f.result() + futures.remove(f) + try: for _ in range(self.options.repeat): for i, test in enumerate(tests, 1): visible_name = self.get_pretty_suite(test) single_test = self.get_test_runner(test) - if not test.is_parallel or self.options.num_processes == 1 or single_test.options.gdb: - self.drain_futures(futures) - futures = [] - res = single_test.run() - self.process_test_result(res) - self.print_stats(test_count, name_max_len, tests, visible_name, res, i) + if not test.is_parallel or single_test.options.gdb: + await complete_all(futures) + await run_test(single_test, visible_name, i) else: - if not executor: - executor = conc.ThreadPoolExecutor(max_workers=self.options.num_processes) - f = executor.submit(single_test.run) - futures.append((f, test_count, name_max_len, tests, visible_name, i)) - if self.options.repeat > 1 and self.fail_count: - break + future = asyncio.ensure_future(run_test(single_test, visible_name, i)) + futures.append(future) + future.add_done_callback(test_done) if self.options.repeat > 1 and self.fail_count: break - self.drain_futures(futures) + await complete_all(futures) self.print_collected_logs() self.print_summary() @@ -1176,17 +1200,6 @@ class TestHarness: finally: os.chdir(startdir) - def drain_futures(self, futures: T.List[T.Tuple['conc.Future[TestRun]', int, int, T.List[TestSerialisation], str, int]]) -> None: - for x in futures: - (result, test_count, name_max_len, tests, name, i) = x - if self.options.repeat > 1 and self.fail_count: - result.cancel() - if self.options.verbose: - result.result() - self.process_test_result(result.result()) - self.print_stats(test_count, name_max_len, tests, name, result.result(), i) - - def list_tests(th: TestHarness) -> bool: tests = th.get_tests() for t in tests: