diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi index f1a7a996644..e69200c7376 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi @@ -24,6 +24,13 @@ cdef extern from "" namespace "std" nogil: size_t size() +cdef extern from "" namespace "std" nogil: + cdef cppclass mutex: + mutex() + void lock() + void unlock() + + ctypedef queue[grpc_event] cpp_event_queue @@ -45,6 +52,7 @@ cdef class BaseCompletionQueue: cdef class PollerCompletionQueue(BaseCompletionQueue): cdef bint _shutdown cdef cpp_event_queue _queue + cdef mutex _queue_mutex cdef object _poller_thread cdef int _write_fd cdef object _read_socket diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi index 8c4fa014130..15d2e18f3d7 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi @@ -67,7 +67,9 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): elif event.type == GRPC_QUEUE_SHUTDOWN: self._shutdown = True else: + self._queue_mutex.lock() self._queue.push(event) + self._queue_mutex.unlock() _unified_socket_write(self._write_fd) def _poll_wrapper(self): @@ -85,9 +87,15 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): cdef grpc_event event cdef CallbackContext *context - while not self._queue.empty(): - event = self._queue.front() - self._queue.pop() + while True: + self._queue_mutex.lock() + if self._queue.empty(): + self._queue_mutex.unlock() + break + else: + event = self._queue.front() + self._queue.pop() + self._queue_mutex.unlock() context = event.tag loop = context.loop