mtest: use asyncio for run loop

Use asyncio futures for the run loop, while still handling I/O in
a thread pool using run_on_executor.

The handling of the test result is not duplicated anymore between
run_tests and drain_futures.  Instead, the test result is always processed
and printed by run_test after single_test.run() completes and (in verbose
mode) it cannot interleave with the test output.  Therefore the special
case for self.options.num_processes == 1 can be removed.
pull/7836/head
Paolo Bonzini 4 years ago
parent f532b0a9c3
commit 659a5cbaa3
  1. 65
      mesonbuild/mtest.py

@ -18,6 +18,7 @@ from ._pathlib import Path
from collections import namedtuple from collections import namedtuple
from copy import deepcopy from copy import deepcopy
import argparse import argparse
import asyncio
import concurrent.futures as conc import concurrent.futures as conc
import datetime import datetime
import enum import enum
@ -605,6 +606,17 @@ def load_tests(build_dir: str) -> T.List[TestSerialisation]:
objs = check_testdata(pickle.load(f)) objs = check_testdata(pickle.load(f))
return objs return objs
# Custom waiting primitives for asyncio
async def complete_all(futures: T.Iterable[asyncio.Future]) -> None:
"""Wait for completion of all the given futures, ignoring cancellation."""
while futures:
done, futures = await asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION)
# Raise exceptions if needed for all the "done" futures
for f in done:
if not f.cancelled():
f.result()
class SingleTestRunner: class SingleTestRunner:
@ -1135,8 +1147,13 @@ class TestHarness:
return test.name return test.name
def run_tests(self, tests: T.List[TestSerialisation]) -> None: def run_tests(self, tests: T.List[TestSerialisation]) -> None:
executor = None # Replace with asyncio.run once we can require Python 3.7
futures = [] # type: T.List[T.Tuple[conc.Future[TestRun], int, int, T.List[TestSerialisation], str, int]] loop = asyncio.get_event_loop()
loop.run_until_complete(self._run_tests(tests))
async def _run_tests(self, tests: T.List[TestSerialisation]) -> None:
executor = conc.ThreadPoolExecutor(max_workers=self.options.num_processes)
futures = [] # type: T.List[asyncio.Future]
test_count = len(tests) test_count = len(tests)
name_max_len = max([len(self.get_pretty_suite(test)) for test in tests]) name_max_len = max([len(self.get_pretty_suite(test)) for test in tests])
self.open_log_files() self.open_log_files()
@ -1145,29 +1162,36 @@ class TestHarness:
os.chdir(self.options.wd) os.chdir(self.options.wd)
self.build_data = build.load(os.getcwd()) self.build_data = build.load(os.getcwd())
async def run_test(test: SingleTestRunner,
name: str, index: int) -> None:
if self.options.repeat > 1 and self.fail_count:
return
res = await asyncio.get_event_loop().run_in_executor(executor, test.run)
self.process_test_result(res)
self.print_stats(test_count, name_max_len, tests, name, res, index)
def test_done(f: asyncio.Future) -> None:
if not f.cancelled():
f.result()
futures.remove(f)
try: try:
for _ in range(self.options.repeat): for _ in range(self.options.repeat):
for i, test in enumerate(tests, 1): for i, test in enumerate(tests, 1):
visible_name = self.get_pretty_suite(test) visible_name = self.get_pretty_suite(test)
single_test = self.get_test_runner(test) single_test = self.get_test_runner(test)
if not test.is_parallel or self.options.num_processes == 1 or single_test.options.gdb: if not test.is_parallel or single_test.options.gdb:
self.drain_futures(futures) await complete_all(futures)
futures = [] await run_test(single_test, visible_name, i)
res = single_test.run()
self.process_test_result(res)
self.print_stats(test_count, name_max_len, tests, visible_name, res, i)
else: else:
if not executor: future = asyncio.ensure_future(run_test(single_test, visible_name, i))
executor = conc.ThreadPoolExecutor(max_workers=self.options.num_processes) futures.append(future)
f = executor.submit(single_test.run) future.add_done_callback(test_done)
futures.append((f, test_count, name_max_len, tests, visible_name, i))
if self.options.repeat > 1 and self.fail_count:
break
if self.options.repeat > 1 and self.fail_count: if self.options.repeat > 1 and self.fail_count:
break break
self.drain_futures(futures) await complete_all(futures)
self.print_collected_logs() self.print_collected_logs()
self.print_summary() self.print_summary()
@ -1176,17 +1200,6 @@ class TestHarness:
finally: finally:
os.chdir(startdir) os.chdir(startdir)
def drain_futures(self, futures: T.List[T.Tuple['conc.Future[TestRun]', int, int, T.List[TestSerialisation], str, int]]) -> None:
for x in futures:
(result, test_count, name_max_len, tests, name, i) = x
if self.options.repeat > 1 and self.fail_count:
result.cancel()
if self.options.verbose:
result.result()
self.process_test_result(result.result())
self.print_stats(test_count, name_max_len, tests, name, result.result(), i)
def list_tests(th: TestHarness) -> bool: def list_tests(th: TestHarness) -> bool:
tests = th.get_tests() tests = th.get_tests()
for t in tests: for t in tests:

Loading…
Cancel
Save