Improve readability

pull/22258/head
Lidi Zheng 5 years ago
parent ce68d53dd1
commit 231f9c0c94
  1. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi
  2. 12
      src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi
  3. 100
      src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi

@ -23,7 +23,7 @@ cdef class PollerCompletionQueue(BaseCompletionQueue):
cdef object _poller
cdef object _poller_running
cdef _polling(self)
cdef void _poll(self) except *
cdef class CallbackCompletionQueue(BaseCompletionQueue):

@ -36,11 +36,11 @@ cdef class PollerCompletionQueue(BaseCompletionQueue):
self._shutdown_completed = asyncio.get_event_loop().create_future()
self._poller = None
self._poller_running = asyncio.get_event_loop().create_future()
self._poller = threading.Thread(target=self._polling_wrapper)
self._poller = threading.Thread(target=self._poll_wrapper)
self._poller.daemon = True
self._poller.start()
cdef _polling(self):
cdef void _poll(self) except *:
cdef grpc_event event
cdef CallbackContext *context
cdef object waiter
@ -53,7 +53,7 @@ cdef class PollerCompletionQueue(BaseCompletionQueue):
NULL)
if event.type == GRPC_QUEUE_TIMEOUT:
raise NotImplementedError()
raise AssertionError("Core should not return timeout error!")
elif event.type == GRPC_QUEUE_SHUTDOWN:
self._shutdown = True
grpc_call_soon_threadsafe(self._shutdown_completed.set_result, None)
@ -64,8 +64,8 @@ cdef class PollerCompletionQueue(BaseCompletionQueue):
<CallbackWrapper>context.callback_wrapper,
event.success)
def _polling_wrapper(self):
self._polling()
def _poll_wrapper(self):
self._poll()
async def shutdown(self):
grpc_completion_queue_shutdown(self._cq)
@ -103,4 +103,4 @@ cdef BaseCompletionQueue create_completion_queue():
elif grpc_aio_engine is AsyncIOEngine.POLLER:
return PollerCompletionQueue()
else:
raise ValueError('Unexpected engine type [%s]' % grpc_aio_engine)
raise ValueError('Unsupported engine type [%s]' % grpc_aio_engine)

@ -15,13 +15,14 @@
cdef bint _grpc_aio_initialized = False
# NOTE(lidiz) Theoretically, applications can run in multiple event loops as
# long as they are in the same thread with same magic. However, I don't think
# we should support this use case. So, the gRPC Python Async Stack should use
# a single event loop picked by "init_grpc_aio".
cdef object _grpc_aio_loop
cdef object _event_loop_thread_ident
# long as they are in the same thread with same magic. This is not a supported
# use case. So, the gRPC Python Async Stack should use a single event loop
# picked by "init_grpc_aio".
cdef object _grpc_aio_loop # asyncio.AbstractEventLoop
cdef int64_t _event_loop_thread_ident
cdef str _GRPC_ASYNCIO_ENGINE = os.environ.get('GRPC_ASYNCIO_ENGINE', 'default').lower()
grpc_aio_engine = None
cdef object _grpc_initialization_lock = threading.Lock()
class AsyncIOEngine(enum.Enum):
@ -36,51 +37,50 @@ def init_grpc_aio():
global _event_loop_thread_ident
global grpc_aio_engine
# Marks this function as called
if _grpc_aio_initialized:
return
else:
_grpc_aio_initialized = True
# Picks the engine for gRPC AsyncIO Stack
for engine_type in AsyncIOEngine:
if engine_type.value == _GRPC_ASYNCIO_ENGINE:
grpc_aio_engine = engine_type
break
if grpc_aio_engine is None or grpc_aio_engine is AsyncIOEngine.DEFAULT:
grpc_aio_engine = AsyncIOEngine.CUSTOM_IO_MANAGER
# Anchors the event loop that the gRPC library going to use.
_grpc_aio_loop = asyncio.get_event_loop()
_event_loop_thread_ident = threading.current_thread().ident
if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER:
# Activates asyncio IO manager.
# NOTE(lidiz) Custom IO manager must be activated before the first
# `grpc_init()`. Otherwise, some special configurations in Core won't
# pick up the change, and resulted in SEGFAULT or ABORT.
install_asyncio_iomgr()
# TODO(https://github.com/grpc/grpc/issues/22244) we need a the
# grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
# library won't shutdown cleanly.
grpc_init()
# Timers are triggered by the Asyncio loop. We disable
# the background thread that is being used by the native
# gRPC iomgr.
grpc_timer_manager_set_threading(False)
# gRPC callbaks are executed within the same thread used by the Asyncio
# event loop, as it is being done by the other Asyncio callbacks.
Executor.SetThreadingAll(False)
else:
# TODO(https://github.com/grpc/grpc/issues/22244) we need a the
# grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
# library won't shutdown cleanly.
grpc_init()
_grpc_aio_initialized = False
with _grpc_initialization_lock:
# Marks this function as called
if _grpc_aio_initialized:
return
else:
_grpc_aio_initialized = True
# Picks the engine for gRPC AsyncIO Stack
for engine_type in AsyncIOEngine:
if engine_type.value == _GRPC_ASYNCIO_ENGINE:
grpc_aio_engine = engine_type
break
if grpc_aio_engine is None or grpc_aio_engine is AsyncIOEngine.DEFAULT:
grpc_aio_engine = AsyncIOEngine.CUSTOM_IO_MANAGER
# Anchors the event loop that the gRPC library going to use.
_grpc_aio_loop = asyncio.get_event_loop()
_event_loop_thread_ident = threading.current_thread().ident
if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER:
# Activates asyncio IO manager.
# NOTE(lidiz) Custom IO manager must be activated before the first
# `grpc_init()`. Otherwise, some special configurations in Core won't
# pick up the change, and resulted in SEGFAULT or ABORT.
install_asyncio_iomgr()
# TODO(https://github.com/grpc/grpc/issues/22244) we need a the
# grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
# library won't shutdown cleanly.
grpc_init()
# Timers are triggered by the Asyncio loop. We disable
# the background thread that is being used by the native
# gRPC iomgr.
grpc_timer_manager_set_threading(False)
# gRPC callbaks are executed within the same thread used by the Asyncio
# event loop, as it is being done by the other Asyncio callbacks.
Executor.SetThreadingAll(False)
else:
# TODO(https://github.com/grpc/grpc/issues/22244) we need a the
# grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
# library won't shutdown cleanly.
grpc_init()
def grpc_aio_loop():

Loading…
Cancel
Save