From e58e24afb9ce736c4f9f6850bc0c59b4aea75ea0 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 1 Jul 2020 11:19:55 -0700 Subject: [PATCH 1/3] Allows poller to bound to ephemeral loops in multiple threads --- .../grpc/_cython/_cygrpc/aio/common.pyx.pxi | 23 ++++----- .../_cygrpc/aio/completion_queue.pxd.pxi | 8 +++- .../_cygrpc/aio/completion_queue.pyx.pxi | 44 ++++++++++++++--- .../grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi | 7 +++ .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 3 ++ .../grpcio/grpc/experimental/aio/_server.py | 9 ++-- .../tests_aio/unit/outside_init_test.py | 48 +++++++++++++++---- 7 files changed, 108 insertions(+), 34 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi index bea1f09a564..f759fbbec0c 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi @@ -170,20 +170,17 @@ async def generator_to_async_generator(object gen, object loop, object thread_po if PY_MAJOR_VERSION >= 3 and PY_MINOR_VERSION >= 7: - def get_working_loop(): - """Returns a running event loop.""" - return asyncio.get_running_loop() -else: def get_working_loop(): """Returns a running event loop. - + Due to a defect of asyncio.get_event_loop, its returned event loop might - not be set as the default event loop for the main thread. So, we will - raise RuntimeError if the returned event loop is not running. + not be set as the default event loop for the main thread. """ - loop = asyncio.get_event_loop() - if loop.is_running(): - return loop - else: - raise RuntimeError('No running event loop detected. This function ' - + 'must be called from inside of a running event loop.') + try: + return asyncio.get_running_loop() + except RuntimeError: + return asyncio.get_event_loop() +else: + def get_working_loop(): + """Returns a running event loop.""" + return asyncio.get_event_loop() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi index e69200c7376..03d91025d0e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi @@ -49,6 +49,12 @@ cdef class BaseCompletionQueue: cdef grpc_completion_queue* c_ptr(self) + +cdef class _EventLoopBound: + cdef readonly object loop + cdef readonly object read_socket + + cdef class PollerCompletionQueue(BaseCompletionQueue): cdef bint _shutdown cdef cpp_event_queue _queue @@ -57,7 +63,7 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): cdef int _write_fd cdef object _read_socket cdef object _write_socket - cdef object _loop + cdef dict _loops cdef void _poll(self) nogil cdef shutdown(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi index 233fbb1de7e..e8e569fdd86 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi @@ -38,11 +38,25 @@ cdef class BaseCompletionQueue: return self._cq +cdef class _EventLoopBound: + + def __cinit__(self, object loop, object read_socket, object handler): + self.loop = loop + self.read_socket = read_socket + reader_function = functools.partial( + handler, + loop + ) + self.loop.add_reader(self.read_socket, reader_function) + + def close(self): + if self.loop: + self.loop.remove_reader(self.read_socket) + + cdef class PollerCompletionQueue(BaseCompletionQueue): def __cinit__(self): - - self._loop = get_working_loop() self._cq = grpc_completion_queue_create_for_next(NULL) self._shutdown = False self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True) @@ -50,10 +64,21 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): self._read_socket, self._write_socket = socket.socketpair() self._write_fd = self._write_socket.fileno() - self._loop.add_reader(self._read_socket, self._handle_events) + self._loops = {} + + # The read socket might be read by multiple threads. But only one of them will + # read the 1 byte sent by the poller thread. This setting is essential to allow + # multiple loops in multiple threads bound to the same poller. + self._read_socket.setblocking(False) self._queue = cpp_event_queue() + def bound_loop(self, object loop): + if loop in self._loops: + return + else: + self._loops[loop] = _EventLoopBound(loop, self._read_socket, self._handle_events) + cdef void _poll(self) nogil: cdef grpc_event event cdef CallbackContext *context @@ -79,14 +104,21 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): self._poll() cdef shutdown(self): - self._loop.remove_reader(self._read_socket) + # Removes the socket hook from loops + for loop in self._loops: + self._loops.get(loop).close() + # TODO(https://github.com/grpc/grpc/issues/22365) perform graceful shutdown grpc_completion_queue_shutdown(self._cq) while not self._shutdown: self._poller_thread.join(timeout=_POLL_AWAKE_INTERVAL_S) grpc_completion_queue_destroy(self._cq) - def _handle_events(self): + # Clean up socket resources + self._read_socket.close() + self._write_socket.close() + + def _handle_events(self, object context_loop): cdef bytes data = self._read_socket.recv(1) cdef grpc_event event cdef CallbackContext *context @@ -103,7 +135,7 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): context = event.tag loop = context.loop - if loop is self._loop: + if loop is context_loop: # Executes callbacks: complete the future CallbackWrapper.functor_run( event.tag, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi index eaddb3952d0..548c564d757 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi @@ -111,6 +111,12 @@ cdef _actual_aio_shutdown(): raise ValueError('Unsupported engine type [%s]' % _global_aio_state.engine) +cdef _per_loop_initialization(): + cdef object loop = get_working_loop() + if _global_aio_state.engine is AsyncIOEngine.POLLER: + _global_aio_state.cq.bound_loop(loop) + + cpdef init_grpc_aio(): """Initializes the gRPC AsyncIO module. @@ -121,6 +127,7 @@ cpdef init_grpc_aio(): _global_aio_state.refcount += 1 if _global_aio_state.refcount == 1: _actual_aio_initialization() + _per_loop_initialization() cpdef shutdown_grpc_aio(): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index f69100f5a56..760fef91cf0 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -1000,3 +1000,6 @@ cdef class AioServer: cdef thread_pool(self): """Access the thread pool instance.""" return self._thread_pool + + def is_running(self): + return self._status == AIO_SERVER_STATUS_RUNNING diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py index 009d85d3767..00c53490c5d 100644 --- a/src/python/grpcio/grpc/experimental/aio/_server.py +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -162,10 +162,11 @@ class Server(_base_server.Server): be safe to slightly extend the underlying Cython object's life span. """ if hasattr(self, '_server'): - cygrpc.schedule_coro_threadsafe( - self._server.shutdown(None), - self._loop, - ) + if self._server.is_running(): + cygrpc.schedule_coro_threadsafe( + self._server.shutdown(None), + self._loop, + ) def server(migration_thread_pool: Optional[Executor] = None, diff --git a/src/python/grpcio_tests/tests_aio/unit/outside_init_test.py b/src/python/grpcio_tests/tests_aio/unit/outside_init_test.py index ca43f5727fd..879796cf0f5 100644 --- a/src/python/grpcio_tests/tests_aio/unit/outside_init_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/outside_init_test.py @@ -19,6 +19,11 @@ import unittest from grpc.experimental import aio import grpc +from tests_aio.unit._test_server import start_test_server +from src.proto.grpc.testing import messages_pb2, test_pb2_grpc + +_NUM_OF_LOOPS = 50 + class TestOutsideInit(unittest.TestCase): @@ -26,19 +31,42 @@ class TestOutsideInit(unittest.TestCase): # Ensures non-AsyncIO object can be initiated channel_creds = grpc.ssl_channel_credentials() - # Ensures AsyncIO API NOT working outside of AsyncIO - with self.assertRaises(RuntimeError): - aio.insecure_channel('') + # Ensures AsyncIO API not raising outside of AsyncIO. + # NOTE(lidiz) This behavior is bound with GAPIC generator, and required + # by test frameworks like pytest. In test frameworks, objects shared + # across cases need to be created outside of AsyncIO coroutines. + aio.insecure_channel('') + aio.secure_channel('', channel_creds) + aio.server() + aio.init_grpc_aio() + aio.shutdown_grpc_aio() + + def test_multi_ephemeral_loops(self): + # Initializes AIO module outside. It's part of the test. We especially + # want to ensure the closing of the default loop won't cause deadlocks. + aio.init_grpc_aio() + + async def ping_pong(): + address, server = await start_test_server() + channel = aio.insecure_channel(address) + stub = test_pb2_grpc.TestServiceStub(channel) + + await stub.UnaryCall(messages_pb2.SimpleRequest()) + + await channel.close() + await server.stop(None) + + for i in range(_NUM_OF_LOOPS): + old_loop = asyncio.get_event_loop() + old_loop.close() - with self.assertRaises(RuntimeError): - aio.secure_channel('', channel_creds) + loop = asyncio.new_event_loop() + loop.set_debug(True) + asyncio.set_event_loop(loop) - with self.assertRaises(RuntimeError): - aio.server() + loop.run_until_complete(ping_pong()) - # Ensures init_grpc_aio fail outside of AsyncIO - with self.assertRaises(RuntimeError): - aio.init_grpc_aio() + aio.shutdown_grpc_aio() if __name__ == "__main__": From b61fe7ab3323a9573cd99e470b164e521ca329bd Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 7 Jul 2020 10:17:16 -0700 Subject: [PATCH 2/3] Address comments --- .../grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi | 2 +- .../grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi | 6 +++--- src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi index 03d91025d0e..49848b62df2 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi @@ -50,7 +50,7 @@ cdef class BaseCompletionQueue: cdef grpc_completion_queue* c_ptr(self) -cdef class _EventLoopBound: +cdef class _BoundEventLoop: cdef readonly object loop cdef readonly object read_socket diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi index e8e569fdd86..1be10f9f2cb 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi @@ -38,7 +38,7 @@ cdef class BaseCompletionQueue: return self._cq -cdef class _EventLoopBound: +cdef class _BoundEventLoop: def __cinit__(self, object loop, object read_socket, object handler): self.loop = loop @@ -73,11 +73,11 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): self._queue = cpp_event_queue() - def bound_loop(self, object loop): + def bind_loop(self, object loop): if loop in self._loops: return else: - self._loops[loop] = _EventLoopBound(loop, self._read_socket, self._handle_events) + self._loops[loop] = _BoundEventLoop(loop, self._read_socket, self._handle_events) cdef void _poll(self) nogil: cdef grpc_event event diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi index 548c564d757..06c92cac586 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi @@ -111,10 +111,10 @@ cdef _actual_aio_shutdown(): raise ValueError('Unsupported engine type [%s]' % _global_aio_state.engine) -cdef _per_loop_initialization(): +cdef _initialize_per_loop(): cdef object loop = get_working_loop() if _global_aio_state.engine is AsyncIOEngine.POLLER: - _global_aio_state.cq.bound_loop(loop) + _global_aio_state.cq.bind_loop(loop) cpdef init_grpc_aio(): @@ -127,7 +127,7 @@ cpdef init_grpc_aio(): _global_aio_state.refcount += 1 if _global_aio_state.refcount == 1: _actual_aio_initialization() - _per_loop_initialization() + _initialize_per_loop() cpdef shutdown_grpc_aio(): From e7f42f1ff994ecbd4f010af0b6db781f1c04075b Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 7 Jul 2020 10:45:11 -0700 Subject: [PATCH 3/3] Add type annotation in comments --- .../grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi index 49848b62df2..1c5fb8dd219 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi @@ -52,18 +52,18 @@ cdef class BaseCompletionQueue: cdef class _BoundEventLoop: cdef readonly object loop - cdef readonly object read_socket + cdef readonly object read_socket # socket.socket cdef class PollerCompletionQueue(BaseCompletionQueue): cdef bint _shutdown cdef cpp_event_queue _queue cdef mutex _queue_mutex - cdef object _poller_thread + cdef object _poller_thread # threading.Thread cdef int _write_fd - cdef object _read_socket - cdef object _write_socket - cdef dict _loops + cdef object _read_socket # socket.socket + cdef object _write_socket # socket.socket + cdef dict _loops # Mapping[asyncio.AbstractLoop, _BoundEventLoop] cdef void _poll(self) nogil cdef shutdown(self)