From 03813bf6fc427a964edde6cd03495d9d19d8569d Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 11 Aug 2020 13:56:40 -0700 Subject: [PATCH] Make the fail back mode triggering more robust --- .../_cygrpc/aio/completion_queue.pxd.pxi | 1 + .../_cygrpc/aio/completion_queue.pyx.pxi | 43 +++++++++++++------ 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi index 1c5fb8dd219..a5189ad5267 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi @@ -53,6 +53,7 @@ cdef class BaseCompletionQueue: cdef class _BoundEventLoop: cdef readonly object loop cdef readonly object read_socket # socket.socket + cdef bint _has_reader cdef class PollerCompletionQueue(BaseCompletionQueue): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi index 06af100d376..fdf3b339a0e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi @@ -17,12 +17,22 @@ import socket cdef gpr_timespec _GPR_INF_FUTURE = gpr_inf_future(GPR_CLOCK_REALTIME) cdef float _POLL_AWAKE_INTERVAL_S = 0.2 -cdef bint _no_fd_monitoring = False +# This bool indicates if the event loop impl can monitor a given fd, or has +# loop.add_reader method. +cdef bint _has_fd_monitoring = True IF UNAME_SYSNAME == "Windows": cdef void _unified_socket_write(int fd) nogil: win_socket_send(fd, b"1", 1, 0) + + # If the event loop policy is Proactor, then immediately turn on fall back + # mode. + if asyncio.get_event_loop_policy() == asyncio.WindowsProactorEventLoopPolicy: + _has_fd_monitoring = False + elif asyncio.get_event_loop_policy() == asyncio.DefaultEventLoopPolicy: + if sys.version_info >= (3, 8, 0): + _has_fd_monitoring = False ELSE: from posix cimport unistd @@ -43,6 +53,7 @@ cdef class BaseCompletionQueue: cdef class _BoundEventLoop: def __cinit__(self, object loop, object read_socket, object handler): + global _has_fd_monitoring self.loop = loop self.read_socket = read_socket reader_function = functools.partial( @@ -53,14 +64,18 @@ cdef class _BoundEventLoop: # support is available or not. Checking the event loop policy is not # good enough. The application can has its own loop implementation, or # uses different types of event loops (e.g., 1 Proactor, 3 Selectors). - try: - self.loop.add_reader(self.read_socket, reader_function) - except NotImplementedError: - _no_fd_monitoring = True + if _has_fd_monitoring: + try: + self.loop.add_reader(self.read_socket, reader_function) + self._has_reader = True + except NotImplementedError: + _has_fd_monitoring = False + self._has_reader = False def close(self): if self.loop: - self.loop.remove_reader(self.read_socket) + if self._has_reader: + self.loop.remove_reader(self.read_socket) cdef class PollerCompletionQueue(BaseCompletionQueue): @@ -106,15 +121,14 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): self._queue_mutex.lock() self._queue.push(event) self._queue_mutex.unlock() - if not _no_fd_monitoring: + if _has_fd_monitoring: _unified_socket_write(self._write_fd) else: with gil: - # Event loops can be paused or killed at any time. The - # most robust way to make sure someone is polling is - # awaking all loops up. - for loop in self._loops: - self._handle_events(loop) + # Event loops can be paused or killed at any time. So, + # instead of deligate to any thread, the polling thread + # should handle the distribution of the event. + self._handle_events(None) def _poll_wrapper(self): with nogil: @@ -136,7 +150,10 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): self._write_socket.close() def _handle_events(self, object context_loop): - cdef bytes data = self._read_socket.recv(1) + cdef bytes data + if _has_fd_monitoring: + # If fd monitoring is working, clean the socket without blocking. + data = self._read_socket.recv(1) cdef grpc_event event cdef CallbackContext *context