diff --git a/mesonbuild/mtest.py b/mesonbuild/mtest.py index 300eb11cf..dd7f926b7 100644 --- a/mesonbuild/mtest.py +++ b/mesonbuild/mtest.py @@ -976,6 +976,27 @@ def decode(stream: T.Union[None, bytes]) -> str: except UnicodeDecodeError: return stream.decode('iso-8859-1', errors='ignore') +async def read_decode(reader: asyncio.StreamReader) -> str: + return decode(await reader.read(-1)) + +# Extract lines out of the StreamReader. Print them +# along the way if requested, and at the end collect +# them all into a future. +async def read_decode_lines(reader: asyncio.StreamReader, f: 'asyncio.Future[str]', + console_mode: ConsoleUser) -> T.AsyncIterator[str]: + stdo_lines = [] + try: + while not reader.at_eof(): + line = decode(await reader.readline()) + stdo_lines.append(line) + if console_mode is ConsoleUser.STDOUT: + print(line, end='', flush=True) + yield line + except Exception as e: + f.set_exception(e) + finally: + f.set_result(''.join(stdo_lines)) + def run_with_mono(fname: str) -> bool: return fname.endswith('.exe') and not (is_windows() or is_cygwin()) @@ -1033,17 +1054,27 @@ async def complete_all(futures: T.Iterable[asyncio.Future]) -> None: f.result() class TestSubprocess: - def __init__(self, p: asyncio.subprocess.Process, postwait_fn: T.Callable[[], None] = None): + def __init__(self, p: asyncio.subprocess.Process, + stdout: T.Optional[int], stderr: T.Optional[int], + postwait_fn: T.Callable[[], None] = None): self._process = p + self.stdout = stdout + self.stderr = stderr + self.stdo_task = None # type: T.Optional[T.Awaitable[str]] + self.stde_task = None # type: T.Optional[T.Awaitable[str]] self.postwait_fn = postwait_fn # type: T.Callable[[], None] - @property - def stdout(self) -> T.Optional[asyncio.StreamReader]: - return self._process.stdout + def stdout_lines(self, console_mode: ConsoleUser) -> T.AsyncIterator[str]: + self.stdo_task = asyncio.get_event_loop().create_future() + return read_decode_lines(self._process.stdout, self.stdo_task, console_mode) - @property - def stderr(self) -> T.Optional[asyncio.StreamReader]: - return self._process.stderr + def communicate(self) -> T.Tuple[T.Optional[T.Awaitable[str]], + T.Optional[T.Awaitable[str]]]: + if self.stdo_task is None and self.stdout is not None: + self.stdo_task = read_decode(self._process.stdout) + if self.stderr is not None and self.stderr != asyncio.subprocess.STDOUT: + self.stde_task = read_decode(self._process.stderr) + return self.stdo_task, self.stde_task async def _kill(self) -> T.Optional[str]: # Python does not provide multiplatform support for @@ -1199,7 +1230,8 @@ class SingleTestRunner: env=env, cwd=cwd, preexec_fn=preexec_fn if not is_windows() else None) - return TestSubprocess(p, postwait_fn=postwait_fn if not is_windows() else None) + return TestSubprocess(p, stdout=stdout, stderr=stderr, + postwait_fn=postwait_fn if not is_windows() else None) async def _run_cmd(self, cmd: T.List[str]) -> None: if self.test.extra_paths: @@ -1245,32 +1277,11 @@ class SingleTestRunner: env=self.env, cwd=self.test.workdir) - stdo = stde = '' - stdo_task = stde_task = parse_task = None - - # Extract lines out of the StreamReader and print them - # along the way if requested - async def lines() -> T.AsyncIterator[str]: - stdo_lines = [] - reader = p.stdout - while not reader.at_eof(): - line = decode(await reader.readline()) - stdo_lines.append(line) - if self.console_mode is ConsoleUser.STDOUT: - print(line, end='') - yield line - - nonlocal stdo - stdo = ''.join(stdo_lines) - parse_task = None if self.runobj.needs_parsing: - parse_task = self.runobj.parse(lines()) - elif stdout is not None: - stdo_task = p.stdout.read(-1) - if stderr is not None and stderr != asyncio.subprocess.STDOUT: - stde_task = p.stderr.read(-1) + parse_task = self.runobj.parse(p.stdout_lines(self.console_mode)) + stdo_task, stde_task = p.communicate() returncode, result, additional_error = await p.wait(self.runobj.timeout) if parse_task is not None: @@ -1279,11 +1290,8 @@ class SingleTestRunner: additional_error = join_lines(additional_error, error) result = result or res - if stdo_task is not None: - stdo = decode(await stdo_task) - if stde_task is not None: - stde = decode(await stde_task) - + stdo = await stdo_task if stdo_task else '' + stde = await stde_task if stde_task else '' stde = join_lines(stde, additional_error) self.runobj.complete(returncode, result, stdo, stde)