From 221a50bf87269944907098f516424caf5f52304c Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Fri, 6 Mar 2020 15:03:44 -0800 Subject: [PATCH] Allow multiple asyncio engine --- .../_cygrpc/aio/callback_common.pxd.pxi | 8 ---- .../_cygrpc/aio/callback_common.pyx.pxi | 21 ---------- .../grpc/_cython/_cygrpc/aio/channel.pxd.pxi | 3 +- .../grpc/_cython/_cygrpc/aio/channel.pyx.pxi | 3 +- ...oller.pxd.pxi => completion_queue.pxd.pxi} | 13 +++++- ...oller.pyx.pxi => completion_queue.pyx.pxi} | 41 ++++++++++++++++++- .../grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi | 41 ++++++++++++++----- .../grpc/_cython/_cygrpc/aio/server.pxd.pxi | 3 +- .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 5 +-- src/python/grpcio/grpc/_cython/cygrpc.pxd | 2 +- src/python/grpcio/grpc/_cython/cygrpc.pyx | 3 +- tools/bazel.rc | 2 + .../linux/grpc_python_bazel_test_in_docker.sh | 1 + 13 files changed, 92 insertions(+), 54 deletions(-) rename src/python/grpcio/grpc/_cython/_cygrpc/aio/{poller.pxd.pxi => completion_queue.pxd.pxi} (51%) rename src/python/grpcio/grpc/_cython/_cygrpc/aio/{poller.pyx.pxi => completion_queue.pyx.pxi} (67%) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi index e99f78a18af..70a1c9b3f27 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi @@ -52,13 +52,5 @@ cdef class CallbackWrapper: cdef grpc_experimental_completion_queue_functor *c_functor(self) -cdef class CallbackCompletionQueue: - cdef grpc_completion_queue *_cq - cdef object _shutdown_completed # asyncio.Future - cdef CallbackWrapper _wrapper - - cdef grpc_completion_queue* c_ptr(self) - - cdef class GrpcCallWrapper: cdef grpc_call* call diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi index 24c79533012..33713b8ad64 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi @@ -69,27 +69,6 @@ cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler InternalError) -cdef class CallbackCompletionQueue: - - def __cinit__(self): - self._shutdown_completed = grpc_aio_loop().create_future() - self._wrapper = CallbackWrapper( - self._shutdown_completed, - CQ_SHUTDOWN_FAILURE_HANDLER) - self._cq = grpc_completion_queue_create_for_callback( - self._wrapper.c_functor(), - NULL - ) - - cdef grpc_completion_queue* c_ptr(self): - return self._cq - - async def shutdown(self): - grpc_completion_queue_shutdown(self._cq) - await self._shutdown_completed - grpc_completion_queue_destroy(self._cq) - - class ExecuteBatchError(Exception): pass diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi index 62215b2d24f..569e6763c54 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi @@ -21,8 +21,7 @@ cdef enum AioChannelStatus: cdef class AioChannel: cdef: grpc_channel * channel - # CallbackCompletionQueue cq - BackgroundCompletionQueue cq + BaseCompletionQueue cq object loop bytes _target AioChannelStatus _status diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi index cc6c5d99306..fa99371b211 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi @@ -31,8 +31,7 @@ cdef class AioChannel: options = () cdef _ChannelArgs channel_args = _ChannelArgs(options) self._target = target - # self.cq = CallbackCompletionQueue() - self.cq = BackgroundCompletionQueue() + self.cq = create_completion_queue() self.loop = loop self._status = AIO_CHANNEL_STATUS_READY diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/poller.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi similarity index 51% rename from src/python/grpcio/grpc/_cython/_cygrpc/aio/poller.pxd.pxi rename to src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi index c66d24bd502..b5b8c5036cd 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/poller.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi @@ -1,6 +1,10 @@ cdef gpr_timespec _GPR_INF_FUTURE = gpr_inf_future(GPR_CLOCK_REALTIME) -cdef class BackgroundCompletionQueue: +cdef class BaseCompletionQueue: + + cdef grpc_completion_queue* c_ptr(self) + +cdef class PollerCompletionQueue(BaseCompletionQueue): cdef grpc_completion_queue *_cq cdef bint _shutdown cdef object _shutdown_completed @@ -8,4 +12,9 @@ cdef class BackgroundCompletionQueue: cdef object _poller_running cdef _polling(self) - cdef grpc_completion_queue* c_ptr(self) + + +cdef class CallbackCompletionQueue(BaseCompletionQueue): + cdef grpc_completion_queue *_cq + cdef object _shutdown_completed # asyncio.Future + cdef CallbackWrapper _wrapper diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/poller.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi similarity index 67% rename from src/python/grpcio/grpc/_cython/_cygrpc/aio/poller.pyx.pxi rename to src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi index 51bede8ede5..85ae0c4561a 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/poller.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi @@ -19,7 +19,16 @@ def _handle_callback_wrapper(CallbackWrapper callback_wrapper, int success): CallbackWrapper.functor_run(callback_wrapper.c_functor(), success) -cdef class BackgroundCompletionQueue: +cdef class BaseCompletionQueue: + + async def shutdown(self): + raise NotImplementedError() + + cdef grpc_completion_queue* c_ptr(self): + raise NotImplementedError() + + +cdef class PollerCompletionQueue(BaseCompletionQueue): def __cinit__(self): self._cq = grpc_completion_queue_create_for_next(NULL) @@ -65,3 +74,33 @@ cdef class BackgroundCompletionQueue: cdef grpc_completion_queue* c_ptr(self): return self._cq + + +cdef class CallbackCompletionQueue(BaseCompletionQueue): + + def __cinit__(self): + self._shutdown_completed = grpc_aio_loop().create_future() + self._wrapper = CallbackWrapper( + self._shutdown_completed, + CQ_SHUTDOWN_FAILURE_HANDLER) + self._cq = grpc_completion_queue_create_for_callback( + self._wrapper.c_functor(), + NULL + ) + + cdef grpc_completion_queue* c_ptr(self): + return self._cq + + async def shutdown(self): + grpc_completion_queue_shutdown(self._cq) + await self._shutdown_completed + grpc_completion_queue_destroy(self._cq) + + +cdef BaseCompletionQueue create_completion_queue(): + if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER: + return CallbackCompletionQueue() + elif grpc_aio_engine is AsyncIOEngine.POLLER: + return PollerCompletionQueue() + else: + raise ValueError('Unexpected engine type [%s]' % grpc_aio_engine) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi index 284a6c9a84a..44064ef95ce 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi @@ -20,38 +20,57 @@ cdef bint _grpc_aio_initialized = False # a single event loop picked by "init_grpc_aio". cdef object _grpc_aio_loop cdef object _event_loop_thread_ident +cdef str _GRPC_ASYNCIO_ENGINE = os.environ.get('GRPC_ASYNCIO_ENGINE', 'default').lower() +grpc_aio_engine = None + + +class AsyncIOEngine(enum.Enum): + DEFAULT = 'default' + CUSTOM_IO_MANAGER = 'custom' + CQ_POLLER = 'poller' def init_grpc_aio(): global _grpc_aio_initialized global _grpc_aio_loop global _event_loop_thread_ident + global grpc_aio_engine + # Marks this function as called if _grpc_aio_initialized: return else: _grpc_aio_initialized = True - _event_loop_thread_ident = threading.current_thread().ident + + # Picks the engine for gRPC AsyncIO Stack + for engine_type in AsyncIOEngine: + if engine_type.value == _GRPC_ASYNCIO_ENGINE: + grpc_aio_engine = engine_type + break + if grpc_aio_engine is None or grpc_aio_engine is AsyncIOEngine.DEFAULT: + grpc_aio_engine = AsyncIOEngine.CUSTOM_IO_MANAGER # Anchors the event loop that the gRPC library going to use. _grpc_aio_loop = asyncio.get_event_loop() - - # Activates asyncio IO manager - # install_asyncio_iomgr() + _event_loop_thread_ident = threading.current_thread().ident # TODO(https://github.com/grpc/grpc/issues/22244) we need a the # grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC # library won't shutdown cleanly. grpc_init() - # Timers are triggered by the Asyncio loop. We disable - # the background thread that is being used by the native - # gRPC iomgr. - # grpc_timer_manager_set_threading(False) + if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER: + # Activates asyncio IO manager + install_asyncio_iomgr() + + # Timers are triggered by the Asyncio loop. We disable + # the background thread that is being used by the native + # gRPC iomgr. + grpc_timer_manager_set_threading(False) - # gRPC callbaks are executed within the same thread used by the Asyncio - # event loop, as it is being done by the other Asyncio callbacks. - # Executor.SetThreadingAll(False) + # gRPC callbaks are executed within the same thread used by the Asyncio + # event loop, as it is being done by the other Asyncio callbacks. + Executor.SetThreadingAll(False) _grpc_aio_initialized = False diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi index 54d75d99890..18fc9214b27 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -51,8 +51,7 @@ cdef enum AioServerStatus: cdef class AioServer: cdef Server _server - # cdef CallbackCompletionQueue _cq - cdef BackgroundCompletionQueue _cq + cdef BaseCompletionQueue _cq cdef list _generic_handlers cdef AioServerStatus _status cdef object _loop # asyncio.EventLoop diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 5adb8a949fe..d8ed9a41fe7 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -613,8 +613,7 @@ cdef class AioServer: # NOTE(lidiz) Core objects won't be deallocated automatically. # If AioServer.shutdown is not called, those objects will leak. self._server = Server(options) - # self._cq = CallbackCompletionQueue() - self._cq = BackgroundCompletionQueue() + self._cq = create_completion_queue() grpc_server_register_completion_queue( self._server.c_server, self._cq.c_ptr(), @@ -737,7 +736,7 @@ cdef class AioServer: # The shutdown callback won't be called until there is no live RPC. grpc_server_shutdown_and_notify( self._server.c_server, - self._cq._cq, + self._cq.c_ptr(), self._shutdown_callback_wrapper.c_functor()) # Ensures the serving task (coroutine) exits. diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index 6e7b1aa64b0..166be370227 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -45,7 +45,7 @@ IF UNAME_SYSNAME != "Windows": include "_cygrpc/aio/iomgr/socket.pxd.pxi" include "_cygrpc/aio/iomgr/timer.pxd.pxi" include "_cygrpc/aio/iomgr/resolver.pxd.pxi" -include "_cygrpc/aio/poller.pxd.pxi" +include "_cygrpc/aio/completion_queue.pxd.pxi" include "_cygrpc/aio/rpc_status.pxd.pxi" include "_cygrpc/aio/grpc_aio.pxd.pxi" include "_cygrpc/aio/callback_common.pxd.pxi" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 0c6ab7e590f..5910f10abfd 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -20,6 +20,7 @@ import os import sys import threading import time +import enum import grpc @@ -71,8 +72,8 @@ include "_cygrpc/aio/iomgr/timer.pyx.pxi" include "_cygrpc/aio/iomgr/resolver.pyx.pxi" include "_cygrpc/aio/common.pyx.pxi" include "_cygrpc/aio/rpc_status.pyx.pxi" +include "_cygrpc/aio/completion_queue.pyx.pxi" include "_cygrpc/aio/callback_common.pyx.pxi" -include "_cygrpc/aio/poller.pyx.pxi" include "_cygrpc/aio/grpc_aio.pyx.pxi" include "_cygrpc/aio/call.pyx.pxi" include "_cygrpc/aio/channel.pyx.pxi" diff --git a/tools/bazel.rc b/tools/bazel.rc index 92307805017..6dbcc273223 100644 --- a/tools/bazel.rc +++ b/tools/bazel.rc @@ -89,3 +89,5 @@ build:basicprof --copt=-DGRPC_BASIC_PROFILER build:basicprof --copt=-DGRPC_TIMERS_RDTSC build:python_single_threaded_unary_stream --test_env="GRPC_SINGLE_THREADED_UNARY_STREAM=true" + +build:python_poller_engine --test_env="GRPC_ASYNCIO_ENGINE=poller" diff --git a/tools/internal_ci/linux/grpc_python_bazel_test_in_docker.sh b/tools/internal_ci/linux/grpc_python_bazel_test_in_docker.sh index fef8c8fa9f4..da286c433b5 100755 --- a/tools/internal_ci/linux/grpc_python_bazel_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_python_bazel_test_in_docker.sh @@ -28,6 +28,7 @@ TEST_TARGETS="//src/python/... //examples/python/..." BAZEL_FLAGS="--spawn_strategy=standalone --genrule_strategy=standalone --test_output=errors" bazel test ${BAZEL_FLAGS} ${TEST_TARGETS} bazel test --config=python_single_threaded_unary_stream ${BAZEL_FLAGS} ${TEST_TARGETS} +bazel test --config=python_poller_engine ${BAZEL_FLAGS} ${TEST_TARGETS} # TODO(https://github.com/grpc/grpc/issues/19854): Move this to a new Kokoro