diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi index 1be10f9f2cb..06af100d376 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi @@ -17,6 +17,8 @@ import socket cdef gpr_timespec _GPR_INF_FUTURE = gpr_inf_future(GPR_CLOCK_REALTIME) cdef float _POLL_AWAKE_INTERVAL_S = 0.2 +cdef bint _no_fd_monitoring = False + IF UNAME_SYSNAME == "Windows": cdef void _unified_socket_write(int fd) nogil: @@ -47,7 +49,14 @@ cdef class _BoundEventLoop: handler, loop ) - self.loop.add_reader(self.read_socket, reader_function) + # NOTE(lidiz) There isn't a way to cleanly pre-check if fd monitoring + # support is available or not. Checking the event loop policy is not + # good enough. The application can has its own loop implementation, or + # uses different types of event loops (e.g., 1 Proactor, 3 Selectors). + try: + self.loop.add_reader(self.read_socket, reader_function) + except NotImplementedError: + _no_fd_monitoring = True def close(self): if self.loop: @@ -97,7 +106,15 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): self._queue_mutex.lock() self._queue.push(event) self._queue_mutex.unlock() - _unified_socket_write(self._write_fd) + if not _no_fd_monitoring: + _unified_socket_write(self._write_fd) + else: + with gil: + # Event loops can be paused or killed at any time. The + # most robust way to make sure someone is polling is + # awaking all loops up. + for loop in self._loops: + self._handle_events(loop) def _poll_wrapper(self): with nogil: