From fd5a3ef9d5240f7d5c4e5725641f22f505d0fe48 Mon Sep 17 00:00:00 2001 From: Masood Malekghassemi <atash@google.com> Date: Wed, 16 Mar 2016 18:49:54 -0700 Subject: [PATCH] Don't use a pipe for capturing in test runner Apparently Python can call arbitrarily deallocation code whenever its allocator is invoked, which can cause output from gRPC core to stderr, which can happen on the thread that is emptying the stderr pipe, thus causing the stderr pipe to deadlock on itself. --- src/python/grpcio/tests/_runner.py | 91 +++++++++++------------------- 1 file changed, 34 insertions(+), 57 deletions(-) diff --git a/src/python/grpcio/tests/_runner.py b/src/python/grpcio/tests/_runner.py index 32a31ce00e1..3b5ca03dd9c 100644 --- a/src/python/grpcio/tests/_runner.py +++ b/src/python/grpcio/tests/_runner.py @@ -35,6 +35,7 @@ import os import select import signal import sys +import tempfile import threading import time import unittest @@ -43,72 +44,47 @@ import uuid from tests import _loader from tests import _result -# This number needs to be large enough to outpace output on stdout and stderr -# from the gRPC core, otherwise we could end up in a potential deadlock. This -# stems from the OS waiting on someone to clear a filled pipe buffer while the -# GIL is held from a write to stderr from gRPC core, but said someone is in -# Python code thus necessitating GIL acquisition. -_READ_BYTES = 2**20 +class CaptureFile(object): + """A context-managed file to redirect output to a byte array. -class CapturePipe(object): - """A context-manager pipe to redirect output to a byte array. + Use by invoking `start` (`__enter__`) and at some point invoking `stop` + (`__exit__`). At any point after the initial call to `start` call `output` to + get the current redirected output. Note that we don't currently use file + locking, so calling `output` between calls to `start` and `stop` may muddle + the result (you should only be doing this during a Python-handled interrupt as + a last ditch effort to provide output to the user). Attributes: - _redirect_fd (int): File descriptor of file to redirect writes from. + _redirected_fd (int): File descriptor of file to redirect writes from. _saved_fd (int): A copy of the original value of the redirected file descriptor. - _read_thread (threading.Thread or None): Thread upon which reads through the - pipe are performed. Only non-None when self is started. - _read_fd (int or None): File descriptor of the read end of the redirect - pipe. Only non-None when self is started. - _write_fd (int or None): File descriptor of the write end of the redirect - pipe. Only non-None when self is started. - output (bytearray or None): Redirected output from writes to the redirected - file descriptor. Only valid during and after self has started. + _into_file (TemporaryFile or None): File to which writes are redirected. + Only non-None when self is started. """ def __init__(self, fd): - self._redirect_fd = fd - self._saved_fd = os.dup(self._redirect_fd) - self._read_thread = None - self._read_fd = None - self._write_fd = None - self.output = None + self._redirected_fd = fd + self._saved_fd = os.dup(self._redirected_fd) + self._into_file = None + + def output(self): + """Get all output from the redirected-to file if it exists.""" + if self._into_file: + self._into_file.seek(0) + return bytes(self._into_file.read()) + else: + return bytes() def start(self): """Start redirection of writes to the file descriptor.""" - self._read_fd, self._write_fd = os.pipe() - os.dup2(self._write_fd, self._redirect_fd) - flags = fcntl.fcntl(self._read_fd, fcntl.F_GETFL) - fcntl.fcntl(self._read_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - self._read_thread = threading.Thread(target=self._read) - # If the user wants to exit from the Python program and hits ctrl-C and the - # read thread is somehow deadlocked with something else, the Python code may - # refuse to exit. This prevents that by making the read thread second-class. - self._read_thread.daemon = True - self._read_thread.start() + self._into_file = tempfile.TemporaryFile() + os.dup2(self._into_file.fileno(), self._redirected_fd) def stop(self): """Stop redirection of writes to the file descriptor.""" - os.close(self._write_fd) - os.dup2(self._saved_fd, self._redirect_fd) # auto-close self._redirect_fd - self._read_thread.join() - self._read_thread = None - # we waited for the read thread to finish, so _read_fd has been read and we - # can close it. - os.close(self._read_fd) - - def _read(self): - """Read-thread target for self.""" - self.output = bytearray() - while True: - select.select([self._read_fd], [], []) - read_bytes = os.read(self._read_fd, _READ_BYTES) - if read_bytes: - self.output.extend(read_bytes) - else: - break + # n.b. this dup2 call auto-closes self._redirected_fd + os.dup2(self._saved_fd, self._redirected_fd) def write_bypass(self, value): """Bypass the redirection and write directly to the original file. @@ -170,8 +146,8 @@ class Runner(object): result_out = StringIO.StringIO() result = _result.TerminalResult( result_out, id_map=lambda case: case_id_by_case[case]) - stdout_pipe = CapturePipe(sys.stdout.fileno()) - stderr_pipe = CapturePipe(sys.stderr.fileno()) + stdout_pipe = CaptureFile(sys.stdout.fileno()) + stderr_pipe = CaptureFile(sys.stderr.fileno()) kill_flag = [False] def sigint_handler(signal_number, frame): @@ -182,7 +158,8 @@ class Runner(object): def fault_handler(signal_number, frame): stdout_pipe.write_bypass( 'Received fault signal {}\nstdout:\n{}\n\nstderr:{}\n' - .format(signal_number, stdout_pipe.output, stderr_pipe.output)) + .format(signal_number, stdout_pipe.output(), + stderr_pipe.output())) os._exit(1) def check_kill_self(): @@ -191,9 +168,9 @@ class Runner(object): result.stopTestRun() stdout_pipe.write_bypass(result_out.getvalue()) stdout_pipe.write_bypass( - '\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output)) + '\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output())) stderr_pipe.write_bypass( - '\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output)) + '\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output())) os._exit(1) signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGSEGV, fault_handler) @@ -223,7 +200,7 @@ class Runner(object): # re-raise the exception after forcing the with-block to end raise result.set_output( - augmented_case.case, stdout_pipe.output, stderr_pipe.output) + augmented_case.case, stdout_pipe.output(), stderr_pipe.output()) sys.stdout.write(result_out.getvalue()) sys.stdout.flush() result_out.truncate(0)