diff --git a/bazel/_gevent_test_main.py b/bazel/_gevent_test_main.py index 52629c61d3c..f7936daaf0d 100644 --- a/bazel/_gevent_test_main.py +++ b/bazel/_gevent_test_main.py @@ -12,13 +12,59 @@ # See the License for the specific language governing permissions and # limitations under the License. +import gevent +from gevent import monkey + +monkey.patch_all() +threadpool = gevent.hub.get_hub().threadpool + +# Currently, each channel corresponds to a single native thread in the +# gevent threadpool. Thus, when the unit test suite spins up hundreds of +# channels concurrently, some will be starved out, causing the test to +# increase in duration. We increase the max size here so this does not +# happen. +threadpool.maxsize = 1024 +threadpool.size = 32 + +import traceback, signal +from typing import Sequence + + +import grpc.experimental.gevent +grpc.experimental.gevent.init_gevent() + +import gevent +import greenlet +import datetime + import grpc import unittest import sys import os import pkgutil -from typing import Sequence +def trace_callback(event, args): + if event in ("switch", "throw"): + origin, target = args + sys.stderr.write("{} Transfer from {} to {} with {}\n".format(datetime.datetime.now(), origin, target, event)) + else: + sys.stderr.write("Unknown event {}.\n".format(event)) + sys.stderr.flush() + +if os.getenv("GREENLET_TRACE") is not None: + greenlet.settrace(trace_callback) + +def debug(sig, frame): + d={'_frame':frame} + d.update(frame.f_globals) + d.update(frame.f_locals) + + sys.stderr.write("Traceback:\n{}".format("\n".join(traceback.format_stack(frame)))) + import gevent.util; gevent.util.print_run_info() + sys.stderr.flush() + +signal.signal(signal.SIGTERM, debug) + class SingleLoader(object): def __init__(self, pattern: str): @@ -38,13 +84,6 @@ class SingleLoader(object): return self.suite if __name__ == "__main__": - from gevent import monkey - - monkey.patch_all() - - import grpc.experimental.gevent - grpc.experimental.gevent.init_gevent() - import gevent if len(sys.argv) != 2: print(f"USAGE: {sys.argv[0]} TARGET_MODULE", file=sys.stderr) diff --git a/bazel/gevent_test.bzl b/bazel/gevent_test.bzl index 9be3ae003eb..173d7064696 100644 --- a/bazel/gevent_test.bzl +++ b/bazel/gevent_test.bzl @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ Houses py_grpc_gevent_test. """ @@ -81,6 +80,6 @@ def py_grpc_gevent_test( srcs = [copied_main_filename], main = copied_main_filename, python_version = "PY3", - flaky = True, + flaky = False, **kwargs ) diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 8f9d7e972a2..b36e70f4a99 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -1481,6 +1481,8 @@ class Channel(grpc.Channel): self._call_state = _ChannelCallState(self._channel) self._connectivity_state = _ChannelConnectivityState(self._channel) cygrpc.fork_register_channel(self) + if cygrpc.g_gevent_activated: + cygrpc.gevent_increment_channel_count() def _process_python_options(self, python_options): """Sets channel attributes according to python-only channel options.""" @@ -1547,6 +1549,8 @@ class Channel(grpc.Channel): self._unsubscribe_all() self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!') cygrpc.fork_unregister_channel(self) + if cygrpc.g_gevent_activated: + cygrpc.gevent_decrement_channel_count() def _close_on_fork(self): self._unsubscribe_all() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi index 712a589798e..578131f7eef 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi @@ -12,24 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# NOTE(lidiz) Unfortunately, we can't use "cimport" here because Cython -# links it with exception handling. It introduces new dependencies. -cdef extern from "" namespace "std" nogil: - cdef cppclass queue[T]: - queue() - bint empty() - T& front() - void pop() - void push(T&) - size_t size() - - -cdef extern from "" namespace "std" nogil: - cdef cppclass mutex: - mutex() - void lock() - void unlock() - ctypedef queue[grpc_event] cpp_event_queue diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi index 0307f74cbef..983aa6a87b1 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi @@ -12,12 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +cdef int g_interrupt_check_period_ms = 200 cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) except * cdef _interpret_event(grpc_event c_event) +cdef class _LatentEventArg: + cdef grpc_completion_queue *c_completion_queue + cdef object deadline cdef class CompletionQueue: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 5eb5f087067..f9f5de2d83b 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -13,14 +13,12 @@ # limitations under the License. -cdef int _INTERRUPT_CHECK_PERIOD_MS = 200 - - cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) except *: + global g_interrupt_check_period_ms cdef gpr_timespec c_increment cdef gpr_timespec c_timeout cdef gpr_timespec c_deadline - c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN) + c_increment = gpr_time_from_millis(g_interrupt_check_period_ms, GPR_TIMESPAN) if deadline is None: c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) else: @@ -33,7 +31,7 @@ cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) excep c_timeout = c_deadline c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL) - + if (c_event.type != GRPC_QUEUE_TIMEOUT or gpr_time_cmp(c_timeout, c_deadline) == 0): break @@ -42,7 +40,6 @@ cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) excep cpython.PyErr_CheckSignals() return c_event - cdef _interpret_event(grpc_event c_event): cdef _Tag tag if c_event.type == GRPC_QUEUE_TIMEOUT: @@ -58,11 +55,25 @@ cdef _interpret_event(grpc_event c_event): cpython.Py_DECREF(tag) return tag, tag.event(c_event) +cdef _internal_latent_event(_LatentEventArg latent_event_arg): + cdef grpc_event c_event = _next(latent_event_arg.c_completion_queue, latent_event_arg.deadline) + return _interpret_event(c_event) cdef _latent_event(grpc_completion_queue *c_completion_queue, object deadline): - cdef grpc_event c_event = _next(c_completion_queue, deadline) - return _interpret_event(c_event) + global g_gevent_activated + + latent_event_arg = _LatentEventArg() + latent_event_arg.c_completion_queue = c_completion_queue + latent_event_arg.deadline = deadline + if g_gevent_activated: + # For gevent, completion_queue_next is run in a native thread pool. + global g_gevent_threadpool + + result = g_gevent_threadpool.apply(_internal_latent_event, (latent_event_arg,)) + return result + else: + return _internal_latent_event(latent_event_arg) cdef class CompletionQueue: @@ -87,10 +98,17 @@ cdef class CompletionQueue: self.is_shutdown = True return event + def _internal_poll(self, deadline): + return self._interpret_event(_next(self.c_completion_queue, deadline)) + # We name this 'poll' to avoid problems with CPython's expectations for # 'special' methods (like next and __next__). def poll(self, deadline=None): - return self._interpret_event(_next(self.c_completion_queue, deadline)) + global g_gevent_activated + if g_gevent_activated: + return g_gevent_threadpool.apply(CompletionQueue._internal_poll, (self, deadline)) + else: + return self._internal_poll(deadline) def shutdown(self): with nogil: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 7fde61d7dd2..c7f09ede7e8 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -25,6 +25,37 @@ ctypedef unsigned short uint16_t ctypedef unsigned int uint32_t ctypedef unsigned long long uint64_t +# C++ Utilities + +# NOTE(lidiz) Unfortunately, we can't use "cimport" here because Cython +# links it with exception handling. It introduces new dependencies. +cdef extern from "" namespace "std" nogil: + cdef cppclass queue[T]: + queue() + bint empty() + T& front() + T& back() + void pop() + void push(T&) + size_t size() + + +cdef extern from "" namespace "std" nogil: + cdef cppclass mutex: + mutex() + void lock() + void unlock() + + cdef cppclass unique_lock[Mutex]: + unique_lock(Mutex&) + +cdef extern from "" namespace "std" nogil: + cdef cppclass condition_variable: + condition_variable() + void notify_all() + void wait(unique_lock[mutex]&) + +# gRPC Core Declarations cdef extern from "grpc/support/alloc.h": @@ -700,3 +731,5 @@ cdef extern from "grpc/grpc_security_constants.h": cdef extern from "src/core/lib/iomgr/error.h": ctypedef grpc_error* grpc_error_handle grpc_error_handle GRPC_ERROR_CANCELLED + struct grpc_error: + pass diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi index bd1d73145b9..baa9fb54a3e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi @@ -13,29 +13,9 @@ # limitations under the License. # distutils: language=c++ -cdef class TimerWrapper: +g_gevent_threadpool = None +g_gevent_activated = False - cdef grpc_custom_timer *c_timer - cdef object timer - cdef object event +cpdef void gevent_increment_channel_count() -cdef class SocketWrapper: - cdef object sockopts - cdef object socket - cdef object closed - cdef grpc_custom_socket *c_socket - cdef char* c_buffer - cdef size_t len - cdef grpc_custom_socket *accepting_socket - - cdef grpc_custom_connect_callback connect_cb - cdef grpc_custom_write_callback write_cb - cdef grpc_custom_read_callback read_cb - cdef grpc_custom_accept_callback accept_cb - cdef grpc_custom_close_callback close_cb - - -cdef class ResolveWrapper: - cdef grpc_custom_resolver *c_resolver - cdef const char* c_host - cdef const char* c_port +cpdef void gevent_decrement_channel_count() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi index 9aa012de0b1..41d27df5948 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi @@ -14,418 +14,124 @@ # distutils: language=c++ from libc cimport string -import errno -import sys -gevent_g = None -gevent_socket = None -gevent_hub = None -gevent_event = None -g_event = None -g_pool = None - -def _spawn_greenlet(*args): - greenlet = g_pool.spawn(*args) - -############################### -### socket implementation ### -############################### - -cdef class SocketWrapper: - def __cinit__(self): - fork_handlers_and_grpc_init() - self.sockopts = [] - self.socket = None - self.c_socket = NULL - self.c_buffer = NULL - self.len = 0 - - def __dealloc__(self): - grpc_shutdown() - -cdef grpc_error_handle socket_init(grpc_custom_socket* socket, int domain) with gil: - sw = SocketWrapper() - sw.c_socket = socket - sw.sockopts = [] - cpython.Py_INCREF(sw) - # Python doesn't support AF_UNSPEC sockets, so we defer creation until - # bind/connect when we know what type of socket we need - sw.socket = None - sw.closed = False - sw.accepting_socket = NULL - socket.impl = sw - return grpc_error_none() - -cdef socket_connect_async_cython(SocketWrapper socket_wrapper, addr_tuple): - try: - socket_wrapper.socket.connect(addr_tuple) - socket_wrapper.connect_cb(socket_wrapper.c_socket, - grpc_error_none()) - except IOError as io_error: - socket_wrapper.connect_cb(socket_wrapper.c_socket, - socket_error("connect", str(io_error))) - g_event.set() - -def socket_connect_async(socket_wrapper, addr_tuple): - socket_connect_async_cython(socket_wrapper, addr_tuple) - -cdef void socket_connect(grpc_custom_socket* socket, const grpc_sockaddr* addr, - size_t addr_len, - grpc_custom_connect_callback cb) with gil: - py_socket = None - socket_wrapper = socket.impl - socket_wrapper.connect_cb = cb - addr_tuple = sockaddr_to_tuple(addr, addr_len) - if sockaddr_is_ipv4(addr, addr_len): - py_socket = gevent_socket.socket(gevent_socket.AF_INET) - else: - py_socket = gevent_socket.socket(gevent_socket.AF_INET6) - applysockopts(py_socket) - socket_wrapper.socket = py_socket - _spawn_greenlet(socket_connect_async, socket_wrapper, addr_tuple) - -cdef void socket_destroy(grpc_custom_socket* socket) with gil: - cpython.Py_DECREF(socket.impl) - -cdef void socket_shutdown(grpc_custom_socket* socket) with gil: - try: - (socket.impl).socket.shutdown(gevent_socket.SHUT_RDWR) - except IOError as io_error: - if io_error.errno != errno.ENOTCONN: - raise io_error - -cdef void socket_close(grpc_custom_socket* socket, - grpc_custom_close_callback cb) with gil: - socket_wrapper = (socket.impl) - if socket_wrapper.socket is not None: - socket_wrapper.socket.close() - socket_wrapper.closed = True - socket_wrapper.close_cb = cb - # Delay the close callback until the accept() call has picked it up - if socket_wrapper.accepting_socket != NULL: - return - socket_wrapper.close_cb(socket) - -def socket_sendmsg(socket, write_bytes): - try: - return socket.sendmsg(write_bytes) - except AttributeError: - # sendmsg not available on all Pythons/Platforms - return socket.send(b''.join(write_bytes)) - -cdef socket_write_async_cython(SocketWrapper socket_wrapper, write_bytes): - try: - while write_bytes: - sent_byte_count = socket_sendmsg(socket_wrapper.socket, write_bytes) - while sent_byte_count > 0: - if sent_byte_count < len(write_bytes[0]): - write_bytes[0] = write_bytes[0][sent_byte_count:] - sent_byte_count = 0 - else: - sent_byte_count -= len(write_bytes[0]) - write_bytes = write_bytes[1:] - socket_wrapper.write_cb(socket_wrapper.c_socket, - grpc_error_none()) - except IOError as io_error: - socket_wrapper.write_cb(socket_wrapper.c_socket, - socket_error("send", str(io_error))) - g_event.set() - -def socket_write_async(socket_wrapper, write_bytes): - socket_write_async_cython(socket_wrapper, write_bytes) - -cdef void socket_write(grpc_custom_socket* socket, grpc_slice_buffer* buffer, - grpc_custom_write_callback cb) with gil: - cdef char* start - sw = socket.impl - sw.write_cb = cb - write_bytes = [] - for i in range(buffer.count): - start = grpc_slice_buffer_start(buffer, i) - length = grpc_slice_buffer_length(buffer, i) - write_bytes.append(start[:length]) - _spawn_greenlet(socket_write_async, socket.impl, write_bytes) - -cdef socket_read_async_cython(SocketWrapper socket_wrapper): - cdef char* buff_char_arr - try: - buff_str = socket_wrapper.socket.recv(socket_wrapper.len) - buff_char_arr = buff_str - string.memcpy(socket_wrapper.c_buffer, buff_char_arr, len(buff_str)) - socket_wrapper.read_cb(socket_wrapper.c_socket, - len(buff_str), grpc_error_none()) - except IOError as io_error: - socket_wrapper.read_cb(socket_wrapper.c_socket, - -1, socket_error("recv", str(io_error))) - g_event.set() - -def socket_read_async(socket_wrapper): - socket_read_async_cython(socket_wrapper) - -cdef void socket_read(grpc_custom_socket* socket, char* buffer, - size_t length, grpc_custom_read_callback cb) with gil: - sw = socket.impl - sw.read_cb = cb - sw.c_buffer = buffer - sw.len = length - _spawn_greenlet(socket_read_async, sw) - -cdef grpc_error_handle socket_getpeername(grpc_custom_socket* socket, - const grpc_sockaddr* addr, - int* length) with gil: - cdef char* src_buf - peer = (socket.impl).socket.getpeername() - - cdef grpc_resolved_address c_addr - hostname = str_to_bytes(peer[0]) - grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) - string.memcpy(addr, c_addr.addr, c_addr.len) - length[0] = c_addr.len - return grpc_error_none() - -cdef grpc_error_handle socket_getsockname(grpc_custom_socket* socket, - const grpc_sockaddr* addr, - int* length) with gil: - cdef char* src_buf - cdef grpc_resolved_address c_addr - if (socket.impl).socket is None: - peer = ('0.0.0.0', 0) - else: - peer = (socket.impl).socket.getsockname() - hostname = str_to_bytes(peer[0]) - grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) - string.memcpy(addr, c_addr.addr, c_addr.len) - length[0] = c_addr.len - return grpc_error_none() - -def applysockopts(s): - s.setsockopt(gevent_socket.SOL_SOCKET, gevent_socket.SO_REUSEADDR, 1) - s.setsockopt(gevent_socket.IPPROTO_TCP, gevent_socket.TCP_NODELAY, True) - -cdef grpc_error_handle socket_bind(grpc_custom_socket* socket, - const grpc_sockaddr* addr, - size_t len, int flags) with gil: - addr_tuple = sockaddr_to_tuple(addr, len) - try: - try: - py_socket = gevent_socket.socket(gevent_socket.AF_INET) - applysockopts(py_socket) - py_socket.bind(addr_tuple) - except gevent_socket.gaierror as e: - py_socket = gevent_socket.socket(gevent_socket.AF_INET6) - applysockopts(py_socket) - py_socket.bind(addr_tuple) - (socket.impl).socket = py_socket - except IOError as io_error: - return socket_error("bind", str(io_error)) - else: - return grpc_error_none() - -cdef grpc_error_handle socket_listen(grpc_custom_socket* socket) with gil: - (socket.impl).socket.listen(50) - return grpc_error_none() - -cdef void accept_callback_cython(SocketWrapper s) except *: - try: - conn, address = s.socket.accept() - sw = SocketWrapper() - sw.closed = False - sw.c_socket = s.accepting_socket - sw.sockopts = [] - sw.socket = conn - sw.c_socket.impl = sw - sw.accepting_socket = NULL - cpython.Py_INCREF(sw) - s.accepting_socket = NULL - s.accept_cb(s.c_socket, sw.c_socket, grpc_error_none()) - except IOError as io_error: - #TODO actual error - s.accepting_socket = NULL - s.accept_cb(s.c_socket, s.accepting_socket, - socket_error("accept", str(io_error))) - if s.closed: - s.close_cb(s.c_socket) - g_event.set() - -def socket_accept_async(s): - accept_callback_cython(s) +from cython.operator cimport dereference -cdef void socket_accept(grpc_custom_socket* socket, grpc_custom_socket* client, - grpc_custom_accept_callback cb) with gil: - sw = socket.impl - sw.accepting_socket = client - sw.accept_cb = cb - _spawn_greenlet(socket_accept_async, sw) +from cpython cimport Py_INCREF, Py_DECREF -##################################### -######Resolver implementation ####### -##################################### - -cdef class ResolveWrapper: - def __cinit__(self): - fork_handlers_and_grpc_init() - self.c_resolver = NULL - self.c_host = NULL - self.c_port = NULL - - def __dealloc__(self): - grpc_shutdown() - -cdef socket_resolve_async_cython(ResolveWrapper resolve_wrapper): - try: - res = gevent_socket.getaddrinfo(resolve_wrapper.c_host, resolve_wrapper.c_port) - grpc_custom_resolve_callback(resolve_wrapper.c_resolver, - tuples_to_resolvaddr(res), grpc_error_none()) - except IOError as io_error: - grpc_custom_resolve_callback(resolve_wrapper.c_resolver, - 0, - socket_error("getaddrinfo", str(io_error))) - g_event.set() - -def socket_resolve_async_python(resolve_wrapper): - socket_resolve_async_cython(resolve_wrapper) - -cdef void socket_resolve_async(grpc_custom_resolver* r, const char* host, const char* port) with gil: - rw = ResolveWrapper() - rw.c_resolver = r - rw.c_host = host - rw.c_port = port - _spawn_greenlet(socket_resolve_async_python, rw) - -cdef grpc_error_handle socket_resolve(const char* host, const char* port, - grpc_resolved_addresses** res) with gil: - try: - result = gevent_socket.getaddrinfo(host, port) - res[0] = tuples_to_resolvaddr(result) - return grpc_error_none() - except IOError as io_error: - return socket_error("getaddrinfo", str(io_error)) - -############################### -### timer implementation ###### -############################### - -cdef class TimerWrapper: - def __cinit__(self, deadline): - fork_handlers_and_grpc_init() - self.timer = gevent_hub.get_hub().loop.timer(deadline) - self.event = None - - def start(self): - self.event = gevent_event.Event() - self.timer.start(self.on_finish) - - def on_finish(self): - grpc_custom_timer_callback(self.c_timer, grpc_error_none()) - self.timer.stop() - g_event.set() - - def stop(self): - self.event.set() - self.timer.stop() - - def __dealloc__(self): - grpc_shutdown() - -cdef void timer_start(grpc_custom_timer* t) with gil: - timer = TimerWrapper(t.timeout_ms / 1000.0) - timer.c_timer = t - t.timer = timer - timer.start() - -cdef void timer_stop(grpc_custom_timer* t) with gil: - time_wrapper = t.timer - time_wrapper.stop() - -############################### -### pollset implementation ### -############################### - -cdef void init_loop() with gil: - pass - -cdef void destroy_loop() with gil: - g_pool.join() - -cdef void kick_loop() with gil: - g_event.set() - -def _run_loop(timeout_ms): - timeout = timeout_ms / 1000.0 - if timeout_ms > 0: - try: - g_event.wait(timeout) - finally: - g_event.clear() - -cdef grpc_error_handle run_loop(size_t timeout_ms) with gil: - try: - _run_loop(timeout_ms) - return grpc_error_none() - except BaseException: - exc_info = sys.exc_info() - # Avoid running any Python code after setting the exception - cpython.PyErr_SetObject(exc_info[0], exc_info[1]) - return GRPC_ERROR_CANCELLED - -############################### -### Initializer ############### -############################### +import atexit +import errno +import sys -cdef grpc_socket_vtable gevent_socket_vtable -cdef grpc_custom_resolver_vtable gevent_resolver_vtable -cdef grpc_custom_timer_vtable gevent_timer_vtable -cdef grpc_custom_poller_vtable gevent_pollset_vtable +gevent_hub = None +g_gevent_pool = None +g_gevent_threadpool = None +g_gevent_activated = False + + +cdef queue[void*] g_greenlets_to_run +cdef condition_variable g_greenlets_cv +cdef mutex g_greenlets_mu +cdef bint g_shutdown_greenlets_to_run_queue = False +cdef int g_channel_count = 0 + + +cdef _submit_to_greenlet_queue(object cb, tuple args): + cdef tuple to_call = (cb,) + args + cdef unique_lock[mutex]* lk + Py_INCREF(to_call) + with nogil: + lk = new unique_lock[mutex](g_greenlets_mu) + g_greenlets_to_run.push((to_call)) + del lk + g_greenlets_cv.notify_all() + + +cpdef void gevent_increment_channel_count(): + global g_channel_count + cdef int old_channel_count + with nogil: + lk = new unique_lock[mutex](g_greenlets_mu) + old_channel_count = g_channel_count + g_channel_count += 1 + del lk + if old_channel_count == 0: + run_spawn_greenlets() + + +cpdef void gevent_decrement_channel_count(): + global g_channel_count + with nogil: + lk = new unique_lock[mutex](g_greenlets_mu) + g_channel_count -= 1 + if g_channel_count == 0: + g_greenlets_cv.notify_all() + del lk + + +cdef object await_next_greenlet(): + cdef unique_lock[mutex]* lk + with nogil: + # Cython doesn't allow us to do proper stack allocations, so we can't take + # advantage of RAII. + lk = new unique_lock[mutex](g_greenlets_mu) + while not g_shutdown_greenlets_to_run_queue and g_channel_count != 0: + if not g_greenlets_to_run.empty(): + break + g_greenlets_cv.wait(dereference(lk)) + if g_channel_count == 0: + del lk + return None + if g_shutdown_greenlets_to_run_queue: + del lk + return None + cdef object to_call = g_greenlets_to_run.front() + Py_DECREF(to_call) + g_greenlets_to_run.pop() + del lk + return to_call + +def spawn_greenlets(): + while True: + to_call = g_gevent_threadpool.apply(await_next_greenlet, ()) + if to_call is None: + break + fn = to_call[0] + args = to_call[1:] + fn(*args) + +def run_spawn_greenlets(): + g_gevent_pool.spawn(spawn_greenlets) + +def shutdown_await_next_greenlet(): + global g_shutdown_greenlets_to_run_queue + cdef unique_lock[mutex]* lk + with nogil: + lk = new unique_lock[mutex](g_greenlets_mu) + g_shutdown_greenlets_to_run_queue = True + del lk + g_greenlets_cv.notify_all() def init_grpc_gevent(): # Lazily import gevent - global gevent_socket - global gevent_g global gevent_hub - global gevent_event - global g_event - global g_pool + global g_gevent_threadpool + global g_gevent_activated + global g_interrupt_check_period_ms + global g_gevent_pool + import gevent - gevent_g = gevent - import gevent.socket - gevent_socket = gevent.socket - import gevent.hub - gevent_hub = gevent.hub - import gevent.event - gevent_event = gevent.event import gevent.pool - g_event = gevent.event.Event() - g_pool = gevent.pool.Group() - - def cb_func(cb, args): - _spawn_greenlet(cb, *args) - set_async_callback_func(cb_func) + gevent_hub = gevent.hub + g_gevent_threadpool = gevent_hub.get_hub().threadpool - gevent_resolver_vtable.resolve = socket_resolve - gevent_resolver_vtable.resolve_async = socket_resolve_async + g_gevent_activated = True + g_interrupt_check_period_ms = 2000 - gevent_socket_vtable.init = socket_init - gevent_socket_vtable.connect = socket_connect - gevent_socket_vtable.destroy = socket_destroy - gevent_socket_vtable.shutdown = socket_shutdown - gevent_socket_vtable.close = socket_close - gevent_socket_vtable.write = socket_write - gevent_socket_vtable.read = socket_read - gevent_socket_vtable.getpeername = socket_getpeername - gevent_socket_vtable.getsockname = socket_getsockname - gevent_socket_vtable.bind = socket_bind - gevent_socket_vtable.listen = socket_listen - gevent_socket_vtable.accept = socket_accept + g_gevent_pool = gevent.pool.Group() - gevent_timer_vtable.start = timer_start - gevent_timer_vtable.stop = timer_stop - gevent_pollset_vtable.init = init_loop - gevent_pollset_vtable.poll = run_loop - gevent_pollset_vtable.kick = kick_loop - gevent_pollset_vtable.shutdown = destroy_loop + set_async_callback_func(_submit_to_greenlet_queue) - grpc_custom_iomgr_init(&gevent_socket_vtable, - &gevent_resolver_vtable, - &gevent_timer_vtable, - &gevent_pollset_vtable) + # TODO: Document how this all works. + atexit.register(shutdown_await_next_greenlet) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi deleted file mode 100644 index 54174b3c13b..00000000000 --- a/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi +++ /dev/null @@ -1,132 +0,0 @@ -# Copyright 2019 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# distutils: language=c++ - -from libcpp cimport bool as bool_t -from libcpp.string cimport string as cppstring - -cdef extern from "grpc/impl/codegen/slice.h": - struct grpc_slice_buffer: - int count - -cdef extern from "src/core/lib/iomgr/error.h": - struct grpc_error: - pass - ctypedef grpc_error* grpc_error_handle - -# TODO(https://github.com/grpc/grpc/issues/20135) Change the filename -# for something more meaningful. -cdef extern from "src/core/lib/iomgr/python_util.h": - grpc_error_handle grpc_socket_error(char* error) - char* grpc_slice_buffer_start(grpc_slice_buffer* buffer, int i) - int grpc_slice_buffer_length(grpc_slice_buffer* buffer, int i) - -cdef extern from "src/core/lib/iomgr/sockaddr.h": - ctypedef struct grpc_sockaddr: - pass - -cdef extern from "src/core/lib/iomgr/resolve_address.h": - ctypedef struct grpc_resolved_addresses: - size_t naddrs - grpc_resolved_address* addrs - - ctypedef struct grpc_resolved_address: - char[128] addr - size_t len - -cdef extern from "src/core/lib/iomgr/resolve_address_custom.h": - struct grpc_custom_resolver: - pass - - struct grpc_custom_resolver_vtable: - grpc_error_handle (*resolve)(const char* host, const char* port, grpc_resolved_addresses** res); - void (*resolve_async)(grpc_custom_resolver* resolver, const char* host, const char* port); - - void grpc_custom_resolve_callback(grpc_custom_resolver* resolver, - grpc_resolved_addresses* result, - grpc_error_handle error); - -cdef extern from "src/core/lib/iomgr/tcp_custom.h": - cdef int GRPC_CUSTOM_SOCKET_OPT_SO_REUSEPORT - - struct grpc_custom_socket: - void* impl - # We don't care about the rest of the fields - ctypedef void (*grpc_custom_connect_callback)(grpc_custom_socket* socket, - grpc_error_handle error) - ctypedef void (*grpc_custom_write_callback)(grpc_custom_socket* socket, - grpc_error_handle error) - ctypedef void (*grpc_custom_read_callback)(grpc_custom_socket* socket, - size_t nread, grpc_error_handle error) - ctypedef void (*grpc_custom_accept_callback)(grpc_custom_socket* socket, - grpc_custom_socket* client, - grpc_error_handle error) - ctypedef void (*grpc_custom_close_callback)(grpc_custom_socket* socket) - - struct grpc_socket_vtable: - grpc_error_handle (*init)(grpc_custom_socket* socket, int domain); - void (*connect)(grpc_custom_socket* socket, const grpc_sockaddr* addr, - size_t len, grpc_custom_connect_callback cb); - void (*destroy)(grpc_custom_socket* socket); - void (*shutdown)(grpc_custom_socket* socket); - void (*close)(grpc_custom_socket* socket, grpc_custom_close_callback cb); - void (*write)(grpc_custom_socket* socket, grpc_slice_buffer* slices, - grpc_custom_write_callback cb); - void (*read)(grpc_custom_socket* socket, char* buffer, size_t length, - grpc_custom_read_callback cb); - grpc_error_handle (*getpeername)(grpc_custom_socket* socket, - const grpc_sockaddr* addr, int* len); - grpc_error_handle (*getsockname)(grpc_custom_socket* socket, - const grpc_sockaddr* addr, int* len); - grpc_error_handle (*bind)(grpc_custom_socket* socket, const grpc_sockaddr* addr, - size_t len, int flags); - grpc_error_handle (*listen)(grpc_custom_socket* socket); - void (*accept)(grpc_custom_socket* socket, grpc_custom_socket* client, - grpc_custom_accept_callback cb); - -cdef extern from "src/core/lib/iomgr/timer_custom.h": - struct grpc_custom_timer: - void* timer - int timeout_ms - # We don't care about the rest of the fields - - struct grpc_custom_timer_vtable: - void (*start)(grpc_custom_timer* t); - void (*stop)(grpc_custom_timer* t); - - void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error_handle error); - -cdef extern from "src/core/lib/iomgr/pollset_custom.h": - struct grpc_custom_poller_vtable: - void (*init)() - grpc_error_handle (*poll)(size_t timeout_ms) - void (*kick)() - void (*shutdown)() - -cdef extern from "src/core/lib/iomgr/iomgr_custom.h": - void grpc_custom_iomgr_init(grpc_socket_vtable* socket, - grpc_custom_resolver_vtable* resolver, - grpc_custom_timer_vtable* timer, - grpc_custom_poller_vtable* poller); - -cdef extern from "src/core/lib/address_utils/sockaddr_utils.h": - int grpc_sockaddr_get_port(const grpc_resolved_address *addr); - cppstring grpc_sockaddr_to_string(const grpc_resolved_address *addr, - bool_t normalize); - int grpc_sockaddr_set_port(const grpc_resolved_address *resolved_addr, - int port) - const char* grpc_sockaddr_get_uri_scheme(const grpc_resolved_address* resolved_addr) - -cdef extern from "src/core/lib/address_utils/parse_address.h": - grpc_error_handle grpc_string_to_sockaddr(grpc_resolved_address *out, char* addr, int port); diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pyx.pxi deleted file mode 100644 index 292e0473af6..00000000000 --- a/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pyx.pxi +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright 2019 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# distutils: language=c++ - -from libc cimport string -from libc.stdlib cimport malloc -from libcpp.string cimport string as cppstring - -cdef grpc_error_handle grpc_error_none(): - return 0 - -cdef grpc_error_handle socket_error(str syscall, str err): - error_str = "{} failed: {}".format(syscall, err) - error_bytes = str_to_bytes(error_str) - return grpc_socket_error(error_bytes) - -cdef resolved_addr_to_tuple(grpc_resolved_address* address): - cdef cppstring res_str - port = grpc_sockaddr_get_port(address) - res_str = grpc_sockaddr_to_string(address, False) - byte_str = _decode(res_str) - if byte_str.endswith(':' + str(port)): - byte_str = byte_str[:(0 - len(str(port)) - 1)] - byte_str = byte_str.lstrip('[') - byte_str = byte_str.rstrip(']') - byte_str = '{}'.format(byte_str) - return byte_str, port - -cdef sockaddr_to_tuple(const grpc_sockaddr* address, size_t length): - cdef grpc_resolved_address c_addr - string.memcpy(c_addr.addr, address, length) - c_addr.len = length - return resolved_addr_to_tuple(&c_addr) - -cdef sockaddr_is_ipv4(const grpc_sockaddr* address, size_t length): - cdef grpc_resolved_address c_addr - string.memcpy(c_addr.addr, address, length) - c_addr.len = length - return grpc_sockaddr_get_uri_scheme(&c_addr) == b'ipv4' - -cdef grpc_resolved_addresses* tuples_to_resolvaddr(tups): - cdef grpc_resolved_addresses* addresses - tups_set = set((tup[4][0], tup[4][1]) for tup in tups) - addresses = malloc(sizeof(grpc_resolved_addresses)) - addresses.naddrs = len(tups_set) - addresses.addrs = malloc(sizeof(grpc_resolved_address) * len(tups_set)) - i = 0 - for tup in set(tups_set): - hostname = str_to_bytes(tup[0]) - grpc_string_to_sockaddr(&addresses.addrs[i], hostname, tup[1]) - i += 1 - return addresses diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index 70c5b5d31d0..ed04119143a 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -34,7 +34,6 @@ include "_cygrpc/time.pxd.pxi" include "_cygrpc/vtable.pxd.pxi" include "_cygrpc/_hooks.pxd.pxi" -include "_cygrpc/iomgr.pxd.pxi" include "_cygrpc/grpc_gevent.pxd.pxi" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 4c0452d700f..ca1b4c89f9c 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -56,8 +56,6 @@ include "_cygrpc/time.pyx.pxi" include "_cygrpc/vtable.pyx.pxi" include "_cygrpc/_hooks.pyx.pxi" -include "_cygrpc/iomgr.pyx.pxi" - include "_cygrpc/grpc_gevent.pyx.pxi" include "_cygrpc/thread.pyx.pxi" diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index 84331aed63a..b9336d899db 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -245,9 +245,20 @@ class TestGevent(setuptools.Command): pass def run(self): + import gevent from gevent import monkey monkey.patch_all() + threadpool = gevent.hub.get_hub().threadpool + + # Currently, each channel corresponds to a single native thread in the + # gevent threadpool. Thus, when the unit test suite spins up hundreds of + # channels concurrently, some will be starved out, causing the test to + # increase in duration. We increase the max size here so this does not + # happen. + threadpool.maxsize = 1024 + threadpool.size = 32 + import grpc.experimental.gevent import tests diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel index bf8bdef0e38..2d2a246ed47 100644 --- a/src/python/grpcio_tests/tests/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. load("//bazel:internal_python_rules.bzl", "internal_py_grpc_test") +load("@grpc_python_dependencies//:requirements.bzl", "requirement") package(default_visibility = ["//visibility:public"]) @@ -64,6 +65,9 @@ py_library( py_library( name = "_signal_client", srcs = ["_signal_client.py"], + deps = [ + requirement("gevent"), + ], ) py_library( diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py index cd55db58baf..d58f4dc06ba 100644 --- a/src/python/grpcio_tests/tests/unit/_compression_test.py +++ b/src/python/grpcio_tests/tests/unit/_compression_test.py @@ -25,7 +25,6 @@ import grpc from grpc import _grpcio_metadata from tests.unit import _tcp_proxy -from tests.unit import test_common from tests.unit.framework.common import test_constants _UNARY_UNARY = '/test/UnaryUnary' @@ -259,8 +258,6 @@ def _stream_stream_client(channel, multicallable_kwargs, message): i, response)) -@unittest.skipIf(test_common.running_under_gevent(), - "This test is nondeterministic under gevent.") class CompressionTest(unittest.TestCase): def assertCompressed(self, compression_ratio): diff --git a/src/python/grpcio_tests/tests/unit/_contextvars_propagation_test.py b/src/python/grpcio_tests/tests/unit/_contextvars_propagation_test.py index 128ec514d06..a90f07ed089 100644 --- a/src/python/grpcio_tests/tests/unit/_contextvars_propagation_test.py +++ b/src/python/grpcio_tests/tests/unit/_contextvars_propagation_test.py @@ -75,7 +75,11 @@ if contextvars_supported(): class TestCallCredentials(grpc.AuthMetadataPlugin): def __call__(self, context, callback): - if test_var.get() != _EXPECTED_VALUE: + if test_var.get( + ) != _EXPECTED_VALUE and not test_common.running_under_gevent(): + # contextvars do not work under gevent, but the rest of this + # test is still valuable as a test of concurrent runs of the + # metadata credentials code path. raise AssertionError("{} != {}".format(test_var.get(), _EXPECTED_VALUE)) callback((), None) @@ -97,8 +101,6 @@ else: # TODO(https://github.com/grpc/grpc/issues/22257) @unittest.skipIf(os.name == "nt", "LocalCredentials not supported on Windows.") -@unittest.skipIf(test_common.running_under_gevent(), - "ThreadLocals do not work under gevent.") class ContextVarsPropagationTest(unittest.TestCase): def test_propagation_to_auth_plugin(self): @@ -145,7 +147,8 @@ class ContextVarsPropagationTest(unittest.TestCase): exception_queue.put(e) threads = [] - for _ in range(_RPC_COUNT): + + for _ in range(_THREAD_COUNT): q = queue.Queue() thread = threading.Thread(target=_run_on_thread, args=(q,)) thread.setDaemon(True) diff --git a/src/python/grpcio_tests/tests/unit/_local_credentials_test.py b/src/python/grpcio_tests/tests/unit/_local_credentials_test.py index 5351a2b4cc1..ce92feed4b3 100644 --- a/src/python/grpcio_tests/tests/unit/_local_credentials_test.py +++ b/src/python/grpcio_tests/tests/unit/_local_credentials_test.py @@ -19,8 +19,6 @@ import unittest import grpc -from tests.unit import test_common - class _GenericHandler(grpc.GenericRpcHandler): @@ -58,8 +56,6 @@ class LocalCredentialsTest(unittest.TestCase): @unittest.skipIf(os.name == 'nt', 'Unix Domain Socket is not supported on Windows') - @unittest.skipIf(test_common.running_under_gevent(), - 'UDS not supported under gevent.') def test_uds(self): server_addr = 'unix:/tmp/grpc_fullstack_test' channel_creds = grpc.local_channel_credentials( diff --git a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py index 87441833c17..89c028b307b 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py @@ -188,8 +188,6 @@ def _generic_handler(servicer): return grpc.method_handlers_generic_handler(_SERVICE, method_handlers) -@unittest.skipIf(test_common.running_under_gevent(), - "Causes deadlock in gevent.") class MetadataCodeDetailsTest(unittest.TestCase): def setUp(self): diff --git a/src/python/grpcio_tests/tests/unit/_reconnect_test.py b/src/python/grpcio_tests/tests/unit/_reconnect_test.py index 62ab5a58fc6..90d010b9360 100644 --- a/src/python/grpcio_tests/tests/unit/_reconnect_test.py +++ b/src/python/grpcio_tests/tests/unit/_reconnect_test.py @@ -21,7 +21,6 @@ import unittest import grpc from grpc.framework.foundation import logging_pool -from tests.unit import test_common from tests.unit.framework.common import bound_socket from tests.unit.framework.common import test_constants @@ -35,8 +34,6 @@ def _handle_unary_unary(unused_request, unused_servicer_context): return _RESPONSE -@unittest.skipIf(test_common.running_under_gevent(), - "Test is nondeterministic under gevent.") class ReconnectTest(unittest.TestCase): def test_reconnect(self): diff --git a/src/python/grpcio_tests/tests/unit/_rpc_part_1_test.py b/src/python/grpcio_tests/tests/unit/_rpc_part_1_test.py index 0ffa9eff94f..1c0886cc069 100644 --- a/src/python/grpcio_tests/tests/unit/_rpc_part_1_test.py +++ b/src/python/grpcio_tests/tests/unit/_rpc_part_1_test.py @@ -22,7 +22,6 @@ import unittest import grpc from grpc.framework.foundation import logging_pool -from tests.unit import test_common from tests.unit._rpc_test_helpers import BaseRPCTest from tests.unit._rpc_test_helpers import Callback from tests.unit._rpc_test_helpers import TIMEOUT_SHORT @@ -37,8 +36,6 @@ from tests.unit._rpc_test_helpers import unary_unary_multi_callable from tests.unit.framework.common import test_constants -@unittest.skipIf(test_common.running_under_gevent(), - "This test is nondeterministic under gevent.") class RPCPart1Test(BaseRPCTest, unittest.TestCase): def testExpiredStreamRequestBlockingUnaryResponse(self): @@ -238,4 +235,4 @@ class RPCPart1Test(BaseRPCTest, unittest.TestCase): if __name__ == '__main__': logging.basicConfig() - unittest.main(verbosity=2) + unittest.main(verbosity=3) diff --git a/src/python/grpcio_tests/tests/unit/_rpc_part_2_test.py b/src/python/grpcio_tests/tests/unit/_rpc_part_2_test.py index 6a82a9588f6..a8e6ddeb534 100644 --- a/src/python/grpcio_tests/tests/unit/_rpc_part_2_test.py +++ b/src/python/grpcio_tests/tests/unit/_rpc_part_2_test.py @@ -22,7 +22,6 @@ import unittest import grpc from grpc.framework.foundation import logging_pool -from tests.unit import test_common from tests.unit._rpc_test_helpers import BaseRPCTest from tests.unit._rpc_test_helpers import Callback from tests.unit._rpc_test_helpers import TIMEOUT_SHORT @@ -37,8 +36,6 @@ from tests.unit._rpc_test_helpers import unary_unary_multi_callable from tests.unit.framework.common import test_constants -@unittest.skipIf(test_common.running_under_gevent(), - "Causes deadlock under gevent.") class RPCPart2Test(BaseRPCTest, unittest.TestCase): def testDefaultThreadPoolIsUsed(self): diff --git a/src/python/grpcio_tests/tests/unit/_rpc_test_helpers.py b/src/python/grpcio_tests/tests/unit/_rpc_test_helpers.py index a3f18a9a490..49c08e009a7 100644 --- a/src/python/grpcio_tests/tests/unit/_rpc_test_helpers.py +++ b/src/python/grpcio_tests/tests/unit/_rpc_test_helpers.py @@ -36,7 +36,7 @@ _STREAM_UNARY = '/test/StreamUnary' _STREAM_STREAM = '/test/StreamStream' _STREAM_STREAM_NON_BLOCKING = '/test/StreamStreamNonBlocking' -TIMEOUT_SHORT = datetime.timedelta(seconds=1).total_seconds() +TIMEOUT_SHORT = datetime.timedelta(seconds=4).total_seconds() class Callback(object): diff --git a/src/python/grpcio_tests/tests/unit/_signal_client.py b/src/python/grpcio_tests/tests/unit/_signal_client.py index 5cc68831f7e..eac83b1844a 100644 --- a/src/python/grpcio_tests/tests/unit/_signal_client.py +++ b/src/python/grpcio_tests/tests/unit/_signal_client.py @@ -108,7 +108,19 @@ if __name__ == '__main__': parser.add_argument('--exception', help='Whether the signal throws an exception', action='store_true') + parser.add_argument('--gevent', + help='Whether to run under gevent.', + action='store_true') args = parser.parse_args() + if args.gevent: + from gevent import monkey + import gevent.util + + monkey.patch_all() + + import grpc.experimental.gevent + grpc.experimental.gevent.init_gevent() + if args.arity == 'unary' and not args.exception: main_unary(args.server) elif args.arity == 'streaming' and not args.exception: diff --git a/src/python/grpcio_tests/tests/unit/_signal_handling_test.py b/src/python/grpcio_tests/tests/unit/_signal_handling_test.py index 600bd8fce64..df38159d546 100644 --- a/src/python/grpcio_tests/tests/unit/_signal_handling_test.py +++ b/src/python/grpcio_tests/tests/unit/_signal_handling_test.py @@ -41,6 +41,10 @@ else: _HOST = 'localhost' +# The gevent test harness cannot run the monkeypatch code for the child process, +# so we need to instrument it manually. +_GEVENT_ARG = ("--gevent",) if test_common.running_under_gevent() else () + class _GenericHandler(grpc.GenericRpcHandler): @@ -142,8 +146,8 @@ class SignalHandlingTest(unittest.TestCase): server_target = '{}:{}'.format(_HOST, self._port) with tempfile.TemporaryFile(mode='r') as client_stdout: with tempfile.TemporaryFile(mode='r') as client_stderr: - client = _start_client((server_target, 'unary'), client_stdout, - client_stderr) + client = _start_client((server_target, 'unary') + _GEVENT_ARG, + client_stdout, client_stderr) self._handler.await_connected_client() client.send_signal(signal.SIGINT) self.assertFalse(client.wait(), msg=_read_stream(client_stderr)) @@ -157,8 +161,9 @@ class SignalHandlingTest(unittest.TestCase): server_target = '{}:{}'.format(_HOST, self._port) with tempfile.TemporaryFile(mode='r') as client_stdout: with tempfile.TemporaryFile(mode='r') as client_stderr: - client = _start_client((server_target, 'streaming'), - client_stdout, client_stderr) + client = _start_client( + (server_target, 'streaming') + _GEVENT_ARG, client_stdout, + client_stderr) self._handler.await_connected_client() client.send_signal(signal.SIGINT) self.assertFalse(client.wait(), msg=_read_stream(client_stderr)) @@ -171,8 +176,9 @@ class SignalHandlingTest(unittest.TestCase): server_target = '{}:{}'.format(_HOST, self._port) with tempfile.TemporaryFile(mode='r') as client_stdout: with tempfile.TemporaryFile(mode='r') as client_stderr: - client = _start_client(('--exception', server_target, 'unary'), - client_stdout, client_stderr) + client = _start_client( + ('--exception', server_target, 'unary') + _GEVENT_ARG, + client_stdout, client_stderr) self._handler.await_connected_client() client.send_signal(signal.SIGINT) client.wait() @@ -184,8 +190,8 @@ class SignalHandlingTest(unittest.TestCase): with tempfile.TemporaryFile(mode='r') as client_stdout: with tempfile.TemporaryFile(mode='r') as client_stderr: client = _start_client( - ('--exception', server_target, 'streaming'), client_stdout, - client_stderr) + ('--exception', server_target, 'streaming') + _GEVENT_ARG, + client_stdout, client_stderr) self._handler.await_connected_client() client.send_signal(signal.SIGINT) client.wait()