diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi index 708a2745fdf..2b3be97cb3b 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi @@ -23,7 +23,7 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): cdef object _poller cdef object _poller_running - cdef _polling(self) + cdef void _poll(self) except * cdef class CallbackCompletionQueue(BaseCompletionQueue): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi index 85ae0c4561a..c09c276a67a 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi @@ -36,11 +36,11 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): self._shutdown_completed = asyncio.get_event_loop().create_future() self._poller = None self._poller_running = asyncio.get_event_loop().create_future() - self._poller = threading.Thread(target=self._polling_wrapper) + self._poller = threading.Thread(target=self._poll_wrapper) self._poller.daemon = True self._poller.start() - cdef _polling(self): + cdef void _poll(self) except *: cdef grpc_event event cdef CallbackContext *context cdef object waiter @@ -53,7 +53,7 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): NULL) if event.type == GRPC_QUEUE_TIMEOUT: - raise NotImplementedError() + raise AssertionError("Core should not return timeout error!") elif event.type == GRPC_QUEUE_SHUTDOWN: self._shutdown = True grpc_call_soon_threadsafe(self._shutdown_completed.set_result, None) @@ -64,8 +64,8 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): context.callback_wrapper, event.success) - def _polling_wrapper(self): - self._polling() + def _poll_wrapper(self): + self._poll() async def shutdown(self): grpc_completion_queue_shutdown(self._cq) @@ -103,4 +103,4 @@ cdef BaseCompletionQueue create_completion_queue(): elif grpc_aio_engine is AsyncIOEngine.POLLER: return PollerCompletionQueue() else: - raise ValueError('Unexpected engine type [%s]' % grpc_aio_engine) + raise ValueError('Unsupported engine type [%s]' % grpc_aio_engine) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi index 27a37dfe053..bf76dfebd6a 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi @@ -15,13 +15,14 @@ cdef bint _grpc_aio_initialized = False # NOTE(lidiz) Theoretically, applications can run in multiple event loops as -# long as they are in the same thread with same magic. However, I don't think -# we should support this use case. So, the gRPC Python Async Stack should use -# a single event loop picked by "init_grpc_aio". -cdef object _grpc_aio_loop -cdef object _event_loop_thread_ident +# long as they are in the same thread with same magic. This is not a supported +# use case. So, the gRPC Python Async Stack should use a single event loop +# picked by "init_grpc_aio". +cdef object _grpc_aio_loop # asyncio.AbstractEventLoop +cdef int64_t _event_loop_thread_ident cdef str _GRPC_ASYNCIO_ENGINE = os.environ.get('GRPC_ASYNCIO_ENGINE', 'default').lower() grpc_aio_engine = None +cdef object _grpc_initialization_lock = threading.Lock() class AsyncIOEngine(enum.Enum): @@ -36,51 +37,50 @@ def init_grpc_aio(): global _event_loop_thread_ident global grpc_aio_engine - # Marks this function as called - if _grpc_aio_initialized: - return - else: - _grpc_aio_initialized = True - - # Picks the engine for gRPC AsyncIO Stack - for engine_type in AsyncIOEngine: - if engine_type.value == _GRPC_ASYNCIO_ENGINE: - grpc_aio_engine = engine_type - break - if grpc_aio_engine is None or grpc_aio_engine is AsyncIOEngine.DEFAULT: - grpc_aio_engine = AsyncIOEngine.CUSTOM_IO_MANAGER - - # Anchors the event loop that the gRPC library going to use. - _grpc_aio_loop = asyncio.get_event_loop() - _event_loop_thread_ident = threading.current_thread().ident - - if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER: - # Activates asyncio IO manager. - # NOTE(lidiz) Custom IO manager must be activated before the first - # `grpc_init()`. Otherwise, some special configurations in Core won't - # pick up the change, and resulted in SEGFAULT or ABORT. - install_asyncio_iomgr() - - # TODO(https://github.com/grpc/grpc/issues/22244) we need a the - # grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC - # library won't shutdown cleanly. - grpc_init() - - # Timers are triggered by the Asyncio loop. We disable - # the background thread that is being used by the native - # gRPC iomgr. - grpc_timer_manager_set_threading(False) - - # gRPC callbaks are executed within the same thread used by the Asyncio - # event loop, as it is being done by the other Asyncio callbacks. - Executor.SetThreadingAll(False) - else: - # TODO(https://github.com/grpc/grpc/issues/22244) we need a the - # grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC - # library won't shutdown cleanly. - grpc_init() - - _grpc_aio_initialized = False + with _grpc_initialization_lock: + # Marks this function as called + if _grpc_aio_initialized: + return + else: + _grpc_aio_initialized = True + + # Picks the engine for gRPC AsyncIO Stack + for engine_type in AsyncIOEngine: + if engine_type.value == _GRPC_ASYNCIO_ENGINE: + grpc_aio_engine = engine_type + break + if grpc_aio_engine is None or grpc_aio_engine is AsyncIOEngine.DEFAULT: + grpc_aio_engine = AsyncIOEngine.CUSTOM_IO_MANAGER + + # Anchors the event loop that the gRPC library going to use. + _grpc_aio_loop = asyncio.get_event_loop() + _event_loop_thread_ident = threading.current_thread().ident + + if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER: + # Activates asyncio IO manager. + # NOTE(lidiz) Custom IO manager must be activated before the first + # `grpc_init()`. Otherwise, some special configurations in Core won't + # pick up the change, and resulted in SEGFAULT or ABORT. + install_asyncio_iomgr() + + # TODO(https://github.com/grpc/grpc/issues/22244) we need a the + # grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC + # library won't shutdown cleanly. + grpc_init() + + # Timers are triggered by the Asyncio loop. We disable + # the background thread that is being used by the native + # gRPC iomgr. + grpc_timer_manager_set_threading(False) + + # gRPC callbaks are executed within the same thread used by the Asyncio + # event loop, as it is being done by the other Asyncio callbacks. + Executor.SetThreadingAll(False) + else: + # TODO(https://github.com/grpc/grpc/issues/22244) we need a the + # grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC + # library won't shutdown cleanly. + grpc_init() def grpc_aio_loop():