From 8fc872ed2e05281f5433924a43e27f30c8bd4e49 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 11 Mar 2020 14:24:04 -0700 Subject: [PATCH] Enforce one cq and support many-thread many-loop env - Label channel_argument_test as flaky - Add compatibility test - Add many-loop test case --- .../grpc/_cython/_cygrpc/aio/call.pyx.pxi | 2 +- .../_cygrpc/aio/callback_common.pxd.pxi | 1 + .../_cygrpc/aio/callback_common.pyx.pxi | 2 + .../grpc/_cython/_cygrpc/aio/channel.pxd.pxi | 1 - .../grpc/_cython/_cygrpc/aio/channel.pyx.pxi | 9 +- .../grpc/_cython/_cygrpc/aio/common.pyx.pxi | 13 ++ .../_cygrpc/aio/completion_queue.pxd.pxi | 2 + .../_cygrpc/aio/completion_queue.pyx.pxi | 32 +-- .../grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi | 23 +- .../grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi | 196 +++++++++------- .../_cygrpc/aio/iomgr/resolver.pxd.pxi | 1 + .../_cygrpc/aio/iomgr/resolver.pyx.pxi | 5 +- .../_cython/_cygrpc/aio/iomgr/socket.pyx.pxi | 9 +- .../_cython/_cygrpc/aio/iomgr/timer.pxd.pxi | 1 + .../_cython/_cygrpc/aio/iomgr/timer.pyx.pxi | 3 +- .../grpc/_cython/_cygrpc/aio/server.pxd.pxi | 1 - .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 12 +- src/python/grpcio/grpc/_cython/cygrpc.pyx | 1 - .../grpcio/grpc/experimental/aio/__init__.py | 3 +- .../grpcio/grpc/experimental/aio/_channel.py | 2 +- .../grpcio/grpc/experimental/aio/_server.py | 8 +- src/python/grpcio_tests/commands.py | 2 - .../tests_aio/benchmark/server.py | 1 - .../tests_aio/benchmark/worker.py | 1 - .../grpcio_tests/tests_aio/interop/client.py | 1 - .../grpcio_tests/tests_aio/interop/server.py | 3 - src/python/grpcio_tests/tests_aio/tests.json | 1 + .../grpcio_tests/tests_aio/unit/BUILD.bazel | 6 + .../grpcio_tests/tests_aio/unit/_test_base.py | 3 - .../tests_aio/unit/compatibility_test.py | 215 ++++++++++++++++++ 30 files changed, 418 insertions(+), 142 deletions(-) create mode 100644 src/python/grpcio_tests/tests_aio/unit/compatibility_test.py diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi index e70122d65e1..dbe673d0bf6 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi @@ -115,7 +115,7 @@ cdef class _AioCall(GrpcCallWrapper): self._channel.channel, NULL, _EMPTY_MASK, - self._channel.cq.c_ptr(), + global_completion_queue(), method_slice, NULL, c_deadline, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi index 70a1c9b3f27..e5620cd166d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi @@ -35,6 +35,7 @@ cdef struct CallbackContext: # management. grpc_experimental_completion_queue_functor functor cpython.PyObject *waiter + cpython.PyObject *loop cpython.PyObject *failure_handler cpython.PyObject *callback_wrapper diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi index 33713b8ad64..a1824c5c40c 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi @@ -35,6 +35,8 @@ cdef class CallbackWrapper: def __cinit__(self, object future, CallbackFailureHandler failure_handler): self.context.functor.functor_run = self.functor_run self.context.waiter = future + # TODO(lidiz) switch to future.get_loop() which is available 3.7+. + self.context.loop = future._loop self.context.failure_handler = failure_handler self.context.callback_wrapper = self # NOTE(lidiz) Not using a list here, because this class is critical in diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi index 569e6763c54..f49681a4588 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi @@ -21,7 +21,6 @@ cdef enum AioChannelStatus: cdef class AioChannel: cdef: grpc_channel * channel - BaseCompletionQueue cq object loop bytes _target AioChannelStatus _status diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi index fa99371b211..4f64b50b355 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi @@ -27,11 +27,11 @@ cdef CallbackFailureHandler _WATCH_CONNECTIVITY_FAILURE_HANDLER = CallbackFailur cdef class AioChannel: def __cinit__(self, bytes target, tuple options, ChannelCredentials credentials, object loop): + init_grpc_aio() if options is None: options = () cdef _ChannelArgs channel_args = _ChannelArgs(options) self._target = target - self.cq = create_completion_queue() self.loop = loop self._status = AIO_CHANNEL_STATUS_READY @@ -47,6 +47,9 @@ cdef class AioChannel: channel_args.c_args(), NULL) + def __dealloc__(self): + shutdown_grpc_aio() + def __repr__(self): class_name = self.__class__.__name__ id_ = id(self) @@ -83,7 +86,7 @@ cdef class AioChannel: self.channel, last_observed_state, c_deadline, - self.cq.c_ptr(), + global_completion_queue(), wrapper.c_functor()) try: @@ -111,7 +114,7 @@ cdef class AioChannel: """Assembles a Cython Call object. Returns: - The _AioCall object. + An _AioCall object. """ if self.closed(): raise UsageError('Channel is closed.') diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi index 27d16cb0475..6af2499e6e8 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi @@ -99,3 +99,16 @@ class AbortError(BaseError): class InternalError(BaseError): """Raised upon unexpected errors in native code.""" + + +def schedule_coro_threadsafe(object coro, object loop): + try: + return loop.create_task(coro) + except RuntimeError as runtime_error: + if 'Non-thread-safe operation' in str(runtime_error): + return asyncio.run_coroutine_threadsafe( + coro, + loop, + ) + else: + raise diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi index 54a0e90184a..d26e6cf5fa8 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi @@ -21,6 +21,7 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): cdef bint _shutdown cdef object _shutdown_completed cdef object _poller_thread + cdef object _loop cdef void _poll(self) except * @@ -28,3 +29,4 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): cdef class CallbackCompletionQueue(BaseCompletionQueue): cdef object _shutdown_completed # asyncio.Future cdef CallbackWrapper _wrapper + cdef object _loop diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi index 7dd1f68f10c..093e8868bf7 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi @@ -21,9 +21,6 @@ def _handle_callback_wrapper(CallbackWrapper callback_wrapper, int success): cdef class BaseCompletionQueue: - async def shutdown(self): - raise NotImplementedError() - cdef grpc_completion_queue* c_ptr(self): return self._cq @@ -33,7 +30,7 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): def __cinit__(self): self._cq = grpc_completion_queue_create_for_next(NULL) self._shutdown = False - self._shutdown_completed = asyncio.get_event_loop().create_future() + self._shutdown_completed = threading.Event() self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True) self._poller_thread.start() @@ -44,17 +41,18 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): while not self._shutdown: with nogil: event = grpc_completion_queue_next(self._cq, - _GPR_INF_FUTURE, - NULL) + _GPR_INF_FUTURE, + NULL) if event.type == GRPC_QUEUE_TIMEOUT: - raise AssertionError("Core should not return timeout error!") + raise AssertionError("Core should not return GRPC_QUEUE_TIMEOUT!") elif event.type == GRPC_QUEUE_SHUTDOWN: self._shutdown = True - aio_loop_call_soon_threadsafe(self._shutdown_completed.set_result, None) + self._shutdown_completed.set() else: context = event.tag - aio_loop_call_soon_threadsafe( + loop = context.loop + loop.call_soon_threadsafe( _handle_callback_wrapper, context.callback_wrapper, event.success) @@ -62,9 +60,9 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): def _poll_wrapper(self): self._poll() - async def shutdown(self): + def shutdown(self): grpc_completion_queue_shutdown(self._cq) - await self._shutdown_completed + self._shutdown_completed.wait() grpc_completion_queue_destroy(self._cq) self._poller_thread.join() @@ -72,7 +70,8 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): cdef class CallbackCompletionQueue(BaseCompletionQueue): def __cinit__(self): - self._shutdown_completed = grpc_aio_loop().create_future() + self._loop = asyncio.get_event_loop() + self._shutdown_completed = self._loop.create_future() self._wrapper = CallbackWrapper( self._shutdown_completed, CQ_SHUTDOWN_FAILURE_HANDLER) @@ -85,12 +84,3 @@ cdef class CallbackCompletionQueue(BaseCompletionQueue): grpc_completion_queue_shutdown(self._cq) await self._shutdown_completed grpc_completion_queue_destroy(self._cq) - - -cdef BaseCompletionQueue create_completion_queue(): - if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER: - return CallbackCompletionQueue() - elif grpc_aio_engine is AsyncIOEngine.POLLER: - return PollerCompletionQueue() - else: - raise ValueError('Unsupported engine type [%s]' % grpc_aio_engine) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi index 9b9252ff3a5..1755b702015 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi @@ -13,14 +13,31 @@ # limitations under the License. # distutils: language=c++ +cdef class _AioState: + cdef object lock # threading.RLock + cdef int refcount + cdef object engine # AsyncIOEngine + cdef BaseCompletionQueue cq + + +cdef grpc_completion_queue *global_completion_queue() + + +cdef init_grpc_aio() + + +cdef shutdown_grpc_aio() + cdef extern from "src/core/lib/iomgr/timer_manager.h": - void grpc_timer_manager_set_threading(bint enabled); + void grpc_timer_manager_set_threading(bint enabled) + cdef extern from "src/core/lib/iomgr/iomgr_internal.h": - void grpc_set_default_iomgr_platform(); + void grpc_set_default_iomgr_platform() + cdef extern from "src/core/lib/iomgr/executor.h" namespace "grpc_core": cdef cppclass Executor: @staticmethod - void SetThreadingAll(bint enable); + void SetThreadingAll(bint enable) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi index 9a6fdecace3..5233d0b3bb7 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi @@ -12,17 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import enum -cdef bint _grpc_aio_initialized = False -# NOTE(lidiz) Theoretically, applications can run in multiple event loops as -# long as they are in the same thread with same magic. This is not a supported -# use case. So, the gRPC Python Async Stack should use a single event loop -# picked by "init_grpc_aio". -cdef object _grpc_aio_loop # asyncio.AbstractEventLoop -cdef int64_t _event_loop_thread_ident cdef str _GRPC_ASYNCIO_ENGINE = os.environ.get('GRPC_ASYNCIO_ENGINE', 'default').lower() -grpc_aio_engine = None -cdef object _grpc_initialization_lock = threading.Lock() +cdef _AioState _global_aio_state = _AioState() class AsyncIOEngine(enum.Enum): @@ -31,79 +24,120 @@ class AsyncIOEngine(enum.Enum): POLLER = 'poller' -def init_grpc_aio(): - global _grpc_aio_initialized - global _grpc_aio_loop - global _event_loop_thread_ident - global grpc_aio_engine - - with _grpc_initialization_lock: - # Marks this function as called - if _grpc_aio_initialized: - return - else: - _grpc_aio_initialized = True - - # Picks the engine for gRPC AsyncIO Stack - for engine_type in AsyncIOEngine: - if engine_type.value == _GRPC_ASYNCIO_ENGINE: - grpc_aio_engine = engine_type - break - if grpc_aio_engine is None or grpc_aio_engine is AsyncIOEngine.DEFAULT: - grpc_aio_engine = AsyncIOEngine.CUSTOM_IO_MANAGER - - # Anchors the event loop that the gRPC library going to use. - _grpc_aio_loop = asyncio.get_event_loop() - _event_loop_thread_ident = threading.current_thread().ident - - if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER: - # Activates asyncio IO manager. - # NOTE(lidiz) Custom IO manager must be activated before the first - # `grpc_init()`. Otherwise, some special configurations in Core won't - # pick up the change, and resulted in SEGFAULT or ABORT. - install_asyncio_iomgr() - - # TODO(https://github.com/grpc/grpc/issues/22244) we need a the - # grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC - # library won't shutdown cleanly. - grpc_init() - - # Timers are triggered by the Asyncio loop. We disable - # the background thread that is being used by the native - # gRPC iomgr. - grpc_timer_manager_set_threading(False) - - # gRPC callbaks are executed within the same thread used by the Asyncio - # event loop, as it is being done by the other Asyncio callbacks. - Executor.SetThreadingAll(False) - else: - # TODO(https://github.com/grpc/grpc/issues/22244) we need a the - # grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC - # library won't shutdown cleanly. - grpc_init() - - -def grpc_aio_loop(): - """Returns the one-and-only gRPC Aio event loop.""" - return _grpc_aio_loop - - -def aio_loop_schedule_coroutine(object coro): - """Thread-safely schedules coroutine to gRPC Aio event loop. - - If invoked within the same thread as the event loop, return an - Asyncio.Task. Otherwise, return a concurrent.futures.Future (the sync - Future). For non-asyncio threads, sync Future objects are probably easier - to handle (without worrying other thread-safety stuff). +cdef _default_asyncio_engine(): + return AsyncIOEngine.CUSTOM_IO_MANAGER + + +def grpc_aio_engine(): + """Read-only access to the picked engine type.""" + return _global_aio_state.engine + + +cdef grpc_completion_queue *global_completion_queue(): + return _global_aio_state.cq.c_ptr() + + +cdef class _AioState: + + def __cinit__(self): + self.lock = threading.RLock() + self.refcount = 0 + self.engine = None + self.cq = None + + +cdef _initialize_custom_io_manager(): + # Activates asyncio IO manager. + # NOTE(lidiz) Custom IO manager must be activated before the first + # `grpc_init()`. Otherwise, some special configurations in Core won't + # pick up the change, and resulted in SEGFAULT or ABORT. + install_asyncio_iomgr() + + # Initializes gRPC Core, must be called before other Core API + grpc_init() + + # Timers are triggered by the Asyncio loop. We disable + # the background thread that is being used by the native + # gRPC iomgr. + grpc_timer_manager_set_threading(False) + + # gRPC callbaks are executed within the same thread used by the Asyncio + # event loop, as it is being done by the other Asyncio callbacks. + Executor.SetThreadingAll(False) + + # Creates the only completion queue + _global_aio_state.cq = CallbackCompletionQueue() + + +cdef _initialize_poller(): + # Initializes gRPC Core, must be called before other Core API + grpc_init() + + # Creates the only completion queue + _global_aio_state.cq = PollerCompletionQueue() + + +cdef _actual_aio_initialization(): + # Picks the engine for gRPC AsyncIO Stack + _global_aio_state.engine = AsyncIOEngine.__members__.get( + _GRPC_ASYNCIO_ENGINE, + AsyncIOEngine.DEFAULT, + ) + if _global_aio_state.engine is AsyncIOEngine.DEFAULT: + _global_aio_state.engine = _default_asyncio_engine() + + # Initializes the process-level state accordingly + if _global_aio_state.engine is AsyncIOEngine.CUSTOM_IO_MANAGER: + _initialize_custom_io_manager() + elif _global_aio_state.engine is AsyncIOEngine.POLLER: + _initialize_poller() + else: + raise ValueError('Unsupported engine type [%s]' % _global_aio_state.engine) + + +def _grpc_shutdown_wrapper(_): + """A thin Python wrapper of Core's shutdown function. + + Define functions are not allowed in "cdef" functions, and Cython complains + about a simple lambda with a C function. """ - if _event_loop_thread_ident != threading.current_thread().ident: - return asyncio.run_coroutine_threadsafe(coro, _grpc_aio_loop) + grpc_shutdown_blocking() + + +cdef _actual_aio_shutdown(): + if _global_aio_state.engine is AsyncIOEngine.CUSTOM_IO_MANAGER: + future = schedule_coro_threadsafe( + _global_aio_state.cq.shutdown, + (_global_aio_state.cq)._loop + ) + future.add_done_callback(_grpc_shutdown_wrapper) + elif _global_aio_state.engine is AsyncIOEngine.POLLER: + _global_aio_state.cq.shutdown() + grpc_shutdown_blocking() else: - return _grpc_aio_loop.create_task(coro) + raise ValueError('Unsupported engine type [%s]' % _global_aio_state.engine) -def aio_loop_call_soon_threadsafe(object func, *args): - # TODO(lidiz) After we are confident, we can drop this assert. Otherwsie, - # we should limit this function to non-grpc-event-loop thread. - assert _event_loop_thread_ident != threading.current_thread().ident - return _grpc_aio_loop.call_soon_threadsafe(func, *args) +cdef init_grpc_aio(): + """Initialis the gRPC AsyncIO module. + + Expected to be invoked on critical class constructors. + E.g., AioChannel, AioServer. + """ + with _global_aio_state.lock: + _global_aio_state.refcount += 1 + if _global_aio_state.refcount == 1: + _actual_aio_initialization() + + +cdef shutdown_grpc_aio(): + """Shuts down the gRPC AsyncIO module. + + Expected to be invoked on critical class destructors. + E.g., AioChannel, AioServer. + """ + with _global_aio_state.lock: + assert _global_aio_state.refcount > 0 + _global_aio_state.refcount -= 1 + if not _global_aio_state.refcount: + _actual_aio_shutdown() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pxd.pxi index 26089c95337..35d4e484b09 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pxd.pxi @@ -14,6 +14,7 @@ cdef class _AsyncioResolver: cdef: + object _loop grpc_custom_resolver* _grpc_resolver object _task_resolve diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi index 7d47fa77b00..4983eab5113 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi @@ -15,6 +15,7 @@ cdef class _AsyncioResolver: def __cinit__(self): + self._loop = asyncio.get_event_loop() self._grpc_resolver = NULL self._task_resolve = None @@ -32,7 +33,7 @@ cdef class _AsyncioResolver: async def _async_resolve(self, bytes host, bytes port): self._task_resolve = None try: - resolved = await grpc_aio_loop().getaddrinfo(host, port) + resolved = await self._loop.getaddrinfo(host, port) except Exception as e: grpc_custom_resolve_callback( self._grpc_resolver, @@ -50,6 +51,6 @@ cdef class _AsyncioResolver: cdef void resolve(self, char* host, char* port): assert not self._task_resolve - self._task_resolve = grpc_aio_loop().create_task( + self._task_resolve = self._loop.create_task( self._async_resolve(host, port) ) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi index f46e657c263..7524e9da94b 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi @@ -37,6 +37,7 @@ cdef class _AsyncioSocket: self._py_socket = None self._peername = None self._closed = False + self._loop = asyncio.get_event_loop() @staticmethod cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket, @@ -90,7 +91,7 @@ cdef class _AsyncioSocket: assert not self._reader assert not self._task_connect - self._task_connect = grpc_aio_loop().create_task( + self._task_connect = self._loop.create_task( self._async_connect(host, port) ) self._grpc_connect_cb = grpc_connect_cb @@ -122,7 +123,7 @@ cdef class _AsyncioSocket: self._grpc_read_cb = grpc_read_cb self._read_buffer = buffer_ - self._task_read = grpc_aio_loop().create_task(self._async_read(length)) + self._task_read = self._loop.create_task(self._async_read(length)) async def _async_write(self, bytearray outbound_buffer): self._writer.write(outbound_buffer) @@ -155,7 +156,7 @@ cdef class _AsyncioSocket: outbound_buffer.extend(start[:length]) self._grpc_write_cb = grpc_write_cb - self._task_write = grpc_aio_loop().create_task(self._async_write(outbound_buffer)) + self._task_write = self._loop.create_task(self._async_write(outbound_buffer)) cdef bint is_connected(self): return self._reader and not self._reader._transport.is_closing() @@ -209,7 +210,7 @@ cdef class _AsyncioSocket: sock=self._py_socket, ) - self._task_listen = grpc_aio_loop().create_task(create_asyncio_server()) + self._task_listen = self._loop.create_task(create_asyncio_server()) cdef accept(self, grpc_custom_socket* grpc_socket_client, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi index d2979c86b49..76c3be0c57c 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi @@ -17,6 +17,7 @@ cdef class _AsyncioTimer: grpc_custom_timer * _grpc_timer object _timer_future bint _active + object _loop @staticmethod cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, float timeout) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi index 51145116e21..286cd9a9d43 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi @@ -18,13 +18,14 @@ cdef class _AsyncioTimer: self._grpc_timer = NULL self._timer_future = None self._active = False + self._loop = asyncio.get_event_loop() cpython.Py_INCREF(self) @staticmethod cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, float timeout): timer = _AsyncioTimer() timer._grpc_timer = grpc_timer - timer._timer_future = grpc_aio_loop().call_later(timeout, timer.on_time_up) + timer._timer_future = timer._loop.call_later(timeout, timer.on_time_up) timer._active = True return timer diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi index 18fc9214b27..3923244748a 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -51,7 +51,6 @@ cdef enum AioServerStatus: cdef class AioServer: cdef Server _server - cdef BaseCompletionQueue _cq cdef list _generic_handlers cdef AioServerStatus _status cdef object _loop # asyncio.EventLoop diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index d8ed9a41fe7..07d1e45577f 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -610,13 +610,13 @@ cdef class AioServer: def __init__(self, loop, thread_pool, generic_handlers, interceptors, options, maximum_concurrent_rpcs): + init_grpc_aio() # NOTE(lidiz) Core objects won't be deallocated automatically. # If AioServer.shutdown is not called, those objects will leak. self._server = Server(options) - self._cq = create_completion_queue() grpc_server_register_completion_queue( self._server.c_server, - self._cq.c_ptr(), + global_completion_queue(), NULL ) @@ -663,7 +663,7 @@ cdef class AioServer: error = grpc_server_request_call( self._server.c_server, &rpc_state.call, &rpc_state.details, &rpc_state.request_metadata, - self._cq.c_ptr(), self._cq.c_ptr(), + global_completion_queue(), global_completion_queue(), wrapper.c_functor() ) if error != GRPC_CALL_OK: @@ -736,7 +736,7 @@ cdef class AioServer: # The shutdown callback won't be called until there is no live RPC. grpc_server_shutdown_and_notify( self._server.c_server, - self._cq.c_ptr(), + global_completion_queue(), self._shutdown_callback_wrapper.c_functor()) # Ensures the serving task (coroutine) exits. @@ -789,9 +789,6 @@ cdef class AioServer: self._server.is_shutdown = True self._status = AIO_SERVER_STATUS_STOPPED - # Shuts down the completion queue - await self._cq.shutdown() - async def wait_for_termination(self, object timeout): if timeout is None: await self._shutdown_completed @@ -823,3 +820,4 @@ cdef class AioServer: self, self._status ) + shutdown_grpc_aio() diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 5910f10abfd..b0a753c7ebe 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -20,7 +20,6 @@ import os import sys import threading import time -import enum import grpc diff --git a/src/python/grpcio/grpc/experimental/aio/__init__.py b/src/python/grpcio/grpc/experimental/aio/__init__.py index 899fea38a49..31882be24fc 100644 --- a/src/python/grpcio/grpc/experimental/aio/__init__.py +++ b/src/python/grpcio/grpc/experimental/aio/__init__.py @@ -21,7 +21,7 @@ from typing import Any, Optional, Sequence, Tuple import grpc from grpc._cython.cygrpc import (EOF, AbortError, BaseError, InternalError, - UsageError, init_grpc_aio) + UsageError) from ._base_call import (Call, RpcContext, StreamStreamCall, StreamUnaryCall, UnaryStreamCall, UnaryUnaryCall) @@ -46,7 +46,6 @@ __all__ = ( 'UnaryStreamCall', 'StreamUnaryCall', 'StreamStreamCall', - 'init_grpc_aio', 'Channel', 'UnaryUnaryMultiCallable', 'UnaryStreamMultiCallable', diff --git a/src/python/grpcio/grpc/experimental/aio/_channel.py b/src/python/grpcio/grpc/experimental/aio/_channel.py index 300a1b3490d..24a38e1f3d0 100644 --- a/src/python/grpcio/grpc/experimental/aio/_channel.py +++ b/src/python/grpcio/grpc/experimental/aio/_channel.py @@ -228,7 +228,7 @@ class Channel(_base_channel.Channel): "UnaryUnaryClientInterceptors, the following are invalid: {}"\ .format(invalid_interceptors)) - self._loop = cygrpc.grpc_aio_loop() + self._loop = asyncio.get_event_loop() self._channel = cygrpc.AioChannel( _common.encode(target), _augment_channel_arguments(options, compression), credentials, diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py index dc90c0a6dca..478049e5db4 100644 --- a/src/python/grpcio/grpc/experimental/aio/_server.py +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -13,6 +13,7 @@ # limitations under the License. """Server-side implementation of gRPC Asyncio Python.""" +import asyncio from concurrent.futures import Executor from typing import Any, Optional, Sequence @@ -40,7 +41,7 @@ class Server(_base_server.Server): options: ChannelArgumentType, maximum_concurrent_rpcs: Optional[int], compression: Optional[grpc.Compression]): - self._loop = cygrpc.grpc_aio_loop() + self._loop = asyncio.get_event_loop() if interceptors: invalid_interceptors = [ interceptor for interceptor in interceptors @@ -162,7 +163,10 @@ class Server(_base_server.Server): be safe to slightly extend the underlying Cython object's life span. """ if hasattr(self, '_server'): - cygrpc.aio_loop_schedule_coroutine(self._server.shutdown(None)) + cygrpc.schedule_coro_threadsafe( + self._server.shutdown(None), + self._loop, + ) def server(migration_thread_pool: Optional[Executor] = None, diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index 53999c3caa4..f7cd7c6b8a1 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -151,8 +151,6 @@ class TestAio(setuptools.Command): def run(self): self._add_eggs_to_path() - from grpc.experimental.aio import init_grpc_aio - init_grpc_aio() import tests loader = tests.Loader() diff --git a/src/python/grpcio_tests/tests_aio/benchmark/server.py b/src/python/grpcio_tests/tests_aio/benchmark/server.py index 05479a2997d..561298a626b 100644 --- a/src/python/grpcio_tests/tests_aio/benchmark/server.py +++ b/src/python/grpcio_tests/tests_aio/benchmark/server.py @@ -36,7 +36,6 @@ async def _start_async_server(): def main(): - aio.init_grpc_aio() loop = asyncio.get_event_loop() loop.create_task(_start_async_server()) loop.run_forever() diff --git a/src/python/grpcio_tests/tests_aio/benchmark/worker.py b/src/python/grpcio_tests/tests_aio/benchmark/worker.py index 94651b55057..98b87b19910 100644 --- a/src/python/grpcio_tests/tests_aio/benchmark/worker.py +++ b/src/python/grpcio_tests/tests_aio/benchmark/worker.py @@ -23,7 +23,6 @@ from tests_aio.benchmark import worker_servicer async def run_worker_server(port: int) -> None: - aio.init_grpc_aio() server = aio.server() servicer = worker_servicer.WorkerServicer() diff --git a/src/python/grpcio_tests/tests_aio/interop/client.py b/src/python/grpcio_tests/tests_aio/interop/client.py index 905effa45bf..89793f94054 100644 --- a/src/python/grpcio_tests/tests_aio/interop/client.py +++ b/src/python/grpcio_tests/tests_aio/interop/client.py @@ -47,7 +47,6 @@ def _test_case_from_arg(test_case_arg): async def test_interoperability(): - aio.init_grpc_aio() args = interop_client_lib.parse_interop_client_args() channel = _create_channel(args) diff --git a/src/python/grpcio_tests/tests_aio/interop/server.py b/src/python/grpcio_tests/tests_aio/interop/server.py index 5a5180075a3..06a6c51d13a 100644 --- a/src/python/grpcio_tests/tests_aio/interop/server.py +++ b/src/python/grpcio_tests/tests_aio/interop/server.py @@ -18,7 +18,6 @@ import argparse import logging import grpc -from grpc.experimental.aio import init_grpc_aio from tests.interop import server as interop_server_lib from tests_aio.unit import _test_server @@ -29,8 +28,6 @@ _LOGGER.setLevel(logging.DEBUG) async def serve(): - init_grpc_aio() - args = interop_server_lib.parse_interop_server_arguments() if args.use_tls: diff --git a/src/python/grpcio_tests/tests_aio/tests.json b/src/python/grpcio_tests/tests_aio/tests.json index 6ee095ae3d8..d79ed422596 100644 --- a/src/python/grpcio_tests/tests_aio/tests.json +++ b/src/python/grpcio_tests/tests_aio/tests.json @@ -15,6 +15,7 @@ "unit.client_interceptor_test.TestInterceptedUnaryUnaryCall", "unit.client_interceptor_test.TestUnaryUnaryClientInterceptor", "unit.close_channel_test.TestCloseChannel", + "unit.compatibility_test.TestCompatibility", "unit.compression_test.TestCompression", "unit.connectivity_test.TestConnectivityState", "unit.done_callback_test.TestDoneCallback", diff --git a/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel b/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel index 1e6dd4720f6..6c6b36f9908 100644 --- a/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel @@ -50,6 +50,11 @@ py_library( srcs_version = "PY3", ) +_FLAKY_TESTS = [ + # NOTE(lidiz) this tests use many tcp ports; flaky under parallel runs. + "channel_argument_test.py", +] + [ py_test( name = test_file_name[:-3], @@ -59,6 +64,7 @@ py_library( "//src/python/grpcio_tests/tests/unit/credentials", ], imports = ["../../"], + flaky = test_file_name in _FLAKY_TESTS, main = test_file_name, python_version = "PY3", deps = [ diff --git a/src/python/grpcio_tests/tests_aio/unit/_test_base.py b/src/python/grpcio_tests/tests_aio/unit/_test_base.py index d9284585c4b..06563e08166 100644 --- a/src/python/grpcio_tests/tests_aio/unit/_test_base.py +++ b/src/python/grpcio_tests/tests_aio/unit/_test_base.py @@ -61,6 +61,3 @@ class AioTestBase(unittest.TestCase): return _async_to_sync_decorator(attr, _get_default_loop()) # For other attributes, let them pass. return attr - - -aio.init_grpc_aio() diff --git a/src/python/grpcio_tests/tests_aio/unit/compatibility_test.py b/src/python/grpcio_tests/tests_aio/unit/compatibility_test.py new file mode 100644 index 00000000000..06631ecb6fb --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/unit/compatibility_test.py @@ -0,0 +1,215 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Testing the compatibility between AsyncIO stack and the old stack.""" + +import asyncio +import logging +import os +import unittest +import threading +from concurrent.futures import ThreadPoolExecutor +import time +import random +from typing import Callable, Sequence, Tuple + +import grpc +from grpc.experimental import aio +from grpc._cython import cygrpc + +from tests_aio.unit._test_base import AioTestBase +from tests.unit.framework.common import test_constants +from tests.unit.framework.common import get_socket +from src.proto.grpc.testing import messages_pb2, test_pb2_grpc +from tests_aio.unit._test_server import start_test_server +from tests_aio.unit import _common + +_NUM_STREAM_RESPONSES = 5 +_REQUEST_PAYLOAD_SIZE = 7 +_RESPONSE_PAYLOAD_SIZE = 42 + + +def _unique_options() -> Sequence[Tuple[str, float]]: + return (('iv', random.random()),) + + +@unittest.skipIf(cygrpc.grpc_aio_engine() != cygrpc.AsyncIOEngine.POLLER, + 'Compatible mode needs POLLER completion queue.') +class TestCompatibility(AioTestBase): + + async def setUp(self): + address, self._async_server = await start_test_server() + # Create async stub + self._async_channel = aio.insecure_channel(address, + options=_unique_options()) + self._async_stub = test_pb2_grpc.TestServiceStub(self._async_channel) + + # Create sync stub + self._sync_channel = grpc.insecure_channel(address, + options=_unique_options()) + self._sync_stub = test_pb2_grpc.TestServiceStub(self._sync_channel) + + async def tearDown(self): + self._sync_channel.close() + await self._async_channel.close() + await self._async_server.stop(None) + + async def _run_in_another_thread(self, func: Callable[[], None]): + work_done = asyncio.Event() + + def thread_work(): + func() + self.loop.call_soon_threadsafe(work_done.set) + + thread = threading.Thread(target=thread_work) + thread.daemon = True + thread.start() + await work_done.wait() + thread.join() + + async def test_unary_unary(self): + # Calling async API in this thread + await self._async_stub.UnaryCall(messages_pb2.SimpleRequest(), + timeout=test_constants.LONG_TIMEOUT) + + # Calling sync API in a different thread + def sync_work() -> None: + response, call = self._sync_stub.UnaryCall.with_call( + messages_pb2.SimpleRequest(), + timeout=test_constants.LONG_TIMEOUT) + self.assertIsInstance(response, messages_pb2.SimpleResponse) + self.assertEqual(grpc.StatusCode.OK, call.code()) + + await self._run_in_another_thread(sync_work) + + async def test_unary_stream(self): + request = messages_pb2.StreamingOutputCallRequest() + for _ in range(_NUM_STREAM_RESPONSES): + request.response_parameters.append( + messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)) + + # Calling async API in this thread + call = self._async_stub.StreamingOutputCall(request) + + for _ in range(_NUM_STREAM_RESPONSES): + await call.read() + self.assertEqual(grpc.StatusCode.OK, await call.code()) + + # Calling sync API in a different thread + def sync_work() -> None: + response_iterator = self._sync_stub.StreamingOutputCall(request) + for response in response_iterator: + assert _RESPONSE_PAYLOAD_SIZE == len(response.payload.body) + self.assertEqual(grpc.StatusCode.OK, response_iterator.code()) + + await self._run_in_another_thread(sync_work) + + async def test_stream_unary(self): + payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE) + request = messages_pb2.StreamingInputCallRequest(payload=payload) + + # Calling async API in this thread + async def gen(): + for _ in range(_NUM_STREAM_RESPONSES): + yield request + + response = await self._async_stub.StreamingInputCall(gen()) + self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE, + response.aggregated_payload_size) + + # Calling sync API in a different thread + def sync_work() -> None: + response = self._sync_stub.StreamingInputCall( + iter([request] * _NUM_STREAM_RESPONSES)) + self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE, + response.aggregated_payload_size) + + await self._run_in_another_thread(sync_work) + + async def test_stream_stream(self): + request = messages_pb2.StreamingOutputCallRequest() + request.response_parameters.append( + messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)) + + # Calling async API in this thread + call = self._async_stub.FullDuplexCall() + + for _ in range(_NUM_STREAM_RESPONSES): + await call.write(request) + response = await call.read() + assert _RESPONSE_PAYLOAD_SIZE == len(response.payload.body) + + await call.done_writing() + assert await call.code() == grpc.StatusCode.OK + + # Calling sync API in a different thread + def sync_work() -> None: + response_iterator = self._sync_stub.FullDuplexCall(iter([request])) + for response in response_iterator: + assert _RESPONSE_PAYLOAD_SIZE == len(response.payload.body) + self.assertEqual(grpc.StatusCode.OK, response_iterator.code()) + + await self._run_in_another_thread(sync_work) + + async def test_server(self): + + def echo(a, b): + return a + + class GenericHandlers(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + return grpc.unary_unary_rpc_method_handler(echo) + + # It's fine to instantiate server object in the event loop thread. + # The server will spawn its own serving thread. + server = grpc.server(ThreadPoolExecutor(max_workers=10), + handlers=(GenericHandlers(),)) + port = server.add_insecure_port('0') + server.start() + + def sync_work() -> None: + for _ in range(100): + with grpc.insecure_channel('localhost:%d' % port) as channel: + response = channel.unary_unary('/test/test')(b'\x07\x08') + self.assertEqual(response, b'\x07\x08') + + await self._run_in_another_thread(sync_work) + + async def test_many_loop(self): + address, server = await start_test_server() + + # Run another loop in another thread + def sync_work(): + + async def async_work(): + # Create async stub + async_channel = aio.insecure_channel(address, + options=_unique_options()) + async_stub = test_pb2_grpc.TestServiceStub(async_channel) + + call = async_stub.UnaryCall(messages_pb2.SimpleRequest()) + response = await call + self.assertIsInstance(response, messages_pb2.SimpleResponse) + self.assertEqual(grpc.StatusCode.OK, call.code()) + + loop = asyncio.new_event_loop() + loop.run_until_complete(async_work()) + + await self._run_in_another_thread(sync_work) + await server.stop(None) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + unittest.main(verbosity=2)