From 6c40b134df49ae8dea373cd5c75a1348c15397e1 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Wed, 10 Feb 2021 09:49:48 +0100 Subject: [PATCH] mtest: cancel stdout/stderr tasks on timeout Avoid that the tasks linger and SingleTestRunner.run() never terminates. In order to do this, we need read_decode() and read_decode_lines() to be cancellable, and to handle the CancelledError gracefully while returning the output they have collected so far. For read_decode(), this means always operating on a line-by-line basis, even if console_mode is not ConsoleUser.STDOUT. For read_decode_lines(), instead, we cannot return an iterator. Rather, read_decode_lines() returns the output directly (similar to read_decode) and communication with the parser is mediated by an asyncio.Queue. Signed-off-by: Paolo Bonzini --- mesonbuild/mtest.py | 53 ++++++++++++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index 1b7c774ff..2b5eb658e 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -1078,21 +1078,22 @@ def decode(stream: T.Union[None, bytes]) -> str: return stream.decode('iso-8859-1', errors='ignore') async def read_decode(reader: asyncio.StreamReader, console_mode: ConsoleUser) -> str: - if console_mode is not ConsoleUser.STDOUT: - return decode(await reader.read(-1)) - stdo_lines = [] - while not reader.at_eof(): - line = decode(await reader.readline()) - stdo_lines.append(line) - print(line, end='', flush=True) - return ''.join(stdo_lines) + try: + while not reader.at_eof(): + line = decode(await reader.readline()) + stdo_lines.append(line) + if console_mode is ConsoleUser.STDOUT: + print(line, end='', flush=True) + return ''.join(stdo_lines) + except asyncio.CancelledError: + return ''.join(stdo_lines) # Extract lines out of the StreamReader. Print them # along the way if requested, and at the end collect # them all into a future. -async def read_decode_lines(reader: asyncio.StreamReader, f: 'asyncio.Future[str]', - console_mode: ConsoleUser) -> T.AsyncIterator[str]: +async def read_decode_lines(reader: asyncio.StreamReader, q: 'asyncio.Queue[T.Optional[str]]', + console_mode: ConsoleUser) -> str: stdo_lines = [] try: while not reader.at_eof(): @@ -1100,11 +1101,12 @@ async def read_decode_lines(reader: asyncio.StreamReader, f: 'asyncio.Future[str stdo_lines.append(line) if console_mode is ConsoleUser.STDOUT: print(line, end='', flush=True) - yield line - except Exception as e: - f.set_exception(e) + await q.put(line) + return ''.join(stdo_lines) + except asyncio.CancelledError: + return ''.join(stdo_lines) finally: - f.set_result(''.join(stdo_lines)) + await q.put(None) def run_with_mono(fname: str) -> bool: return fname.endswith('.exe') and not (is_windows() or is_cygwin()) @@ -1130,6 +1132,14 @@ async def try_wait_one(*awaitables: T.Any, timeout: T.Optional[T.Union[int, floa except asyncio.TimeoutError: pass +async def queue_iter(q: 'asyncio.Queue[T.Optional[str]]') -> T.AsyncIterator[str]: + while True: + item = await q.get() + q.task_done() + if item is None: + break + yield item + async def complete(future: asyncio.Future) -> None: """Wait for completion of the given future, ignoring cancellation.""" try: @@ -1153,13 +1163,15 @@ class TestSubprocess: self._process = p self.stdout = stdout self.stderr = stderr - self.stdo_task = None # type: T.Optional[T.Awaitable[str]] - self.stde_task = None # type: T.Optional[T.Awaitable[str]] + self.stdo_task = None # type: T.Optional[asyncio.Future[str]] + self.stde_task = None # type: T.Optional[asyncio.Future[str]] self.postwait_fn = postwait_fn # type: T.Callable[[], None] def stdout_lines(self, console_mode: ConsoleUser) -> T.AsyncIterator[str]: - self.stdo_task = asyncio.get_event_loop().create_future() - return read_decode_lines(self._process.stdout, self.stdo_task, console_mode) + q = asyncio.Queue() # type: asyncio.Queue[T.Optional[str]] + decode_coro = read_decode_lines(self._process.stdout, q, console_mode) + self.stdo_task = asyncio.ensure_future(decode_coro) + return queue_iter(q) def communicate(self, console_mode: ConsoleUser) -> T.Tuple[T.Optional[T.Awaitable[str]], T.Optional[T.Awaitable[str]]]: @@ -1213,6 +1225,11 @@ class TestSubprocess: # for the event loop to pick that up. await p.wait() return None + finally: + if self.stdo_task: + self.stdo_task.cancel() + if self.stde_task: + self.stde_task.cancel() async def wait(self, timeout: T.Optional[int]) -> T.Tuple[int, TestResult, T.Optional[str]]: p = self._process