diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi index a67c9636843..01089c3dc0b 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi @@ -31,9 +31,6 @@ cdef class CompletionQueue: cdef grpc_completion_queue *c_completion_queue - cdef object pluck_condition - cdef int num_plucking - cdef int num_polling cdef bint is_shutting_down cdef bint is_shutdown diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index cdae39d5194..90266516fed 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -32,6 +32,8 @@ cimport cpython import threading import time +cdef int _INTERRUPT_CHECK_PERIOD_MS = 200 + cdef class CompletionQueue: @@ -40,9 +42,6 @@ cdef class CompletionQueue: self.c_completion_queue = grpc_completion_queue_create(NULL) self.is_shutting_down = False self.is_shutdown = False - self.pluck_condition = threading.Condition() - self.num_plucking = 0 - self.num_polling = 0 cdef _interpret_event(self, grpc_event event): cdef OperationTag tag = None @@ -83,45 +82,27 @@ cdef class CompletionQueue: def poll(self, Timespec deadline=None): # We name this 'poll' to avoid problems with CPython's expectations for # 'special' methods (like next and __next__). + cdef gpr_timespec c_increment + cdef gpr_timespec c_timeout cdef gpr_timespec c_deadline with nogil: + c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN) c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) - if deadline is not None: - c_deadline = deadline.c_time - cdef grpc_event event - - # Poll within a critical section to detect contention - with self.pluck_condition: - assert self.num_plucking == 0, 'cannot simultaneously pluck and poll' - self.num_polling += 1 - with nogil: - event = grpc_completion_queue_next( - self.c_completion_queue, c_deadline, NULL) - with self.pluck_condition: - self.num_polling -= 1 - return self._interpret_event(event) - - def pluck(self, OperationTag tag, Timespec deadline=None): - # Plucking a 'None' tag is equivalent to passing control to GRPC core until - # the deadline. - cdef gpr_timespec c_deadline = gpr_inf_future( - GPR_CLOCK_REALTIME) - if deadline is not None: - c_deadline = deadline.c_time - cdef grpc_event event - - # Pluck within a critical section to detect contention - with self.pluck_condition: - assert self.num_polling == 0, 'cannot simultaneously pluck and poll' - assert self.num_plucking < GRPC_MAX_COMPLETION_QUEUE_PLUCKERS, ( - 'cannot pluck more than {} times simultaneously'.format( - GRPC_MAX_COMPLETION_QUEUE_PLUCKERS)) - self.num_plucking += 1 - with nogil: - event = grpc_completion_queue_pluck( - self.c_completion_queue, tag, c_deadline, NULL) - with self.pluck_condition: - self.num_plucking -= 1 + if deadline is not None: + c_deadline = deadline.c_time + + while True: + c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment) + if gpr_time_cmp(c_timeout, c_deadline) > 0: + c_timeout = c_deadline + event = grpc_completion_queue_next( + self.c_completion_queue, c_timeout, NULL) + if event.type != GRPC_QUEUE_TIMEOUT or gpr_time_cmp(c_timeout, c_deadline) == 0: + break; + + # Handle any signals + with gil: + cpython.PyErr_CheckSignals() return self._interpret_event(event) def shutdown(self): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 05b8886df73..168b9751aa2 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -80,6 +80,12 @@ cdef extern from "grpc/_cython/loader.h": gpr_timespec gpr_convert_clock_type(gpr_timespec t, gpr_clock_type target_clock) nogil + gpr_timespec gpr_time_from_millis(int64_t ms, gpr_clock_type type) nogil + + gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) nogil + + int gpr_time_cmp(gpr_timespec a, gpr_timespec b) nogil + ctypedef enum grpc_status_code: GRPC_STATUS_OK GRPC_STATUS_CANCELLED diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 55948755b2f..6ce589dbdc2 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -99,7 +99,7 @@ cdef class Server: with nogil: grpc_server_start(self.c_server) # Ensure the core has gotten a chance to do the start-up work - self.backup_shutdown_queue.pluck(None, Timespec(None)) + self.backup_shutdown_queue.poll(Timespec(None)) def add_http2_port(self, address, ServerCredentials server_credentials=None):