Merge pull request #5786 from soltanmm/this-is-not-a-pipe

Don't use a pipe for output capturing in Python's test runner
pull/4865/merge
Jan Tattermusch 9 years ago
commit 220986ea83
  1. 91
      src/python/grpcio/tests/_runner.py

@ -35,6 +35,7 @@ import os
import select import select
import signal import signal
import sys import sys
import tempfile
import threading import threading
import time import time
import unittest import unittest
@ -43,72 +44,47 @@ import uuid
from tests import _loader from tests import _loader
from tests import _result from tests import _result
# This number needs to be large enough to outpace output on stdout and stderr
# from the gRPC core, otherwise we could end up in a potential deadlock. This
# stems from the OS waiting on someone to clear a filled pipe buffer while the
# GIL is held from a write to stderr from gRPC core, but said someone is in
# Python code thus necessitating GIL acquisition.
_READ_BYTES = 2**20
class CaptureFile(object):
"""A context-managed file to redirect output to a byte array.
class CapturePipe(object): Use by invoking `start` (`__enter__`) and at some point invoking `stop`
"""A context-manager pipe to redirect output to a byte array. (`__exit__`). At any point after the initial call to `start` call `output` to
get the current redirected output. Note that we don't currently use file
locking, so calling `output` between calls to `start` and `stop` may muddle
the result (you should only be doing this during a Python-handled interrupt as
a last ditch effort to provide output to the user).
Attributes: Attributes:
_redirect_fd (int): File descriptor of file to redirect writes from. _redirected_fd (int): File descriptor of file to redirect writes from.
_saved_fd (int): A copy of the original value of the redirected file _saved_fd (int): A copy of the original value of the redirected file
descriptor. descriptor.
_read_thread (threading.Thread or None): Thread upon which reads through the _into_file (TemporaryFile or None): File to which writes are redirected.
pipe are performed. Only non-None when self is started. Only non-None when self is started.
_read_fd (int or None): File descriptor of the read end of the redirect
pipe. Only non-None when self is started.
_write_fd (int or None): File descriptor of the write end of the redirect
pipe. Only non-None when self is started.
output (bytearray or None): Redirected output from writes to the redirected
file descriptor. Only valid during and after self has started.
""" """
def __init__(self, fd): def __init__(self, fd):
self._redirect_fd = fd self._redirected_fd = fd
self._saved_fd = os.dup(self._redirect_fd) self._saved_fd = os.dup(self._redirected_fd)
self._read_thread = None self._into_file = None
self._read_fd = None
self._write_fd = None def output(self):
self.output = None """Get all output from the redirected-to file if it exists."""
if self._into_file:
self._into_file.seek(0)
return bytes(self._into_file.read())
else:
return bytes()
def start(self): def start(self):
"""Start redirection of writes to the file descriptor.""" """Start redirection of writes to the file descriptor."""
self._read_fd, self._write_fd = os.pipe() self._into_file = tempfile.TemporaryFile()
os.dup2(self._write_fd, self._redirect_fd) os.dup2(self._into_file.fileno(), self._redirected_fd)
flags = fcntl.fcntl(self._read_fd, fcntl.F_GETFL)
fcntl.fcntl(self._read_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
self._read_thread = threading.Thread(target=self._read)
# If the user wants to exit from the Python program and hits ctrl-C and the
# read thread is somehow deadlocked with something else, the Python code may
# refuse to exit. This prevents that by making the read thread second-class.
self._read_thread.daemon = True
self._read_thread.start()
def stop(self): def stop(self):
"""Stop redirection of writes to the file descriptor.""" """Stop redirection of writes to the file descriptor."""
os.close(self._write_fd) # n.b. this dup2 call auto-closes self._redirected_fd
os.dup2(self._saved_fd, self._redirect_fd) # auto-close self._redirect_fd os.dup2(self._saved_fd, self._redirected_fd)
self._read_thread.join()
self._read_thread = None
# we waited for the read thread to finish, so _read_fd has been read and we
# can close it.
os.close(self._read_fd)
def _read(self):
"""Read-thread target for self."""
self.output = bytearray()
while True:
select.select([self._read_fd], [], [])
read_bytes = os.read(self._read_fd, _READ_BYTES)
if read_bytes:
self.output.extend(read_bytes)
else:
break
def write_bypass(self, value): def write_bypass(self, value):
"""Bypass the redirection and write directly to the original file. """Bypass the redirection and write directly to the original file.
@ -170,8 +146,8 @@ class Runner(object):
result_out = StringIO.StringIO() result_out = StringIO.StringIO()
result = _result.TerminalResult( result = _result.TerminalResult(
result_out, id_map=lambda case: case_id_by_case[case]) result_out, id_map=lambda case: case_id_by_case[case])
stdout_pipe = CapturePipe(sys.stdout.fileno()) stdout_pipe = CaptureFile(sys.stdout.fileno())
stderr_pipe = CapturePipe(sys.stderr.fileno()) stderr_pipe = CaptureFile(sys.stderr.fileno())
kill_flag = [False] kill_flag = [False]
def sigint_handler(signal_number, frame): def sigint_handler(signal_number, frame):
@ -182,7 +158,8 @@ class Runner(object):
def fault_handler(signal_number, frame): def fault_handler(signal_number, frame):
stdout_pipe.write_bypass( stdout_pipe.write_bypass(
'Received fault signal {}\nstdout:\n{}\n\nstderr:{}\n' 'Received fault signal {}\nstdout:\n{}\n\nstderr:{}\n'
.format(signal_number, stdout_pipe.output, stderr_pipe.output)) .format(signal_number, stdout_pipe.output(),
stderr_pipe.output()))
os._exit(1) os._exit(1)
def check_kill_self(): def check_kill_self():
@ -191,9 +168,9 @@ class Runner(object):
result.stopTestRun() result.stopTestRun()
stdout_pipe.write_bypass(result_out.getvalue()) stdout_pipe.write_bypass(result_out.getvalue())
stdout_pipe.write_bypass( stdout_pipe.write_bypass(
'\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output)) '\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output()))
stderr_pipe.write_bypass( stderr_pipe.write_bypass(
'\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output)) '\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output()))
os._exit(1) os._exit(1)
signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGSEGV, fault_handler) signal.signal(signal.SIGSEGV, fault_handler)
@ -223,7 +200,7 @@ class Runner(object):
# re-raise the exception after forcing the with-block to end # re-raise the exception after forcing the with-block to end
raise raise
result.set_output( result.set_output(
augmented_case.case, stdout_pipe.output, stderr_pipe.output) augmented_case.case, stdout_pipe.output(), stderr_pipe.output())
sys.stdout.write(result_out.getvalue()) sys.stdout.write(result_out.getvalue())
sys.stdout.flush() sys.stdout.flush()
result_out.truncate(0) result_out.truncate(0)

Loading…
Cancel
Save