From 43fe171683d8fb3ed278cc8566a2df8eeeba7462 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 4 Mar 2020 14:17:26 -0800 Subject: [PATCH] Optimize some implementation of IO manager --- .../_cygrpc/aio/callback_common.pyx.pxi | 2 +- .../grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi | 24 +++++++- .../_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi | 10 ++-- .../_cygrpc/aio/iomgr/resolver.pyx.pxi | 27 ++++----- .../_cython/_cygrpc/aio/iomgr/socket.pyx.pxi | 56 +++++++++---------- .../_cython/_cygrpc/aio/iomgr/timer.pxd.pxi | 7 +-- .../_cython/_cygrpc/aio/iomgr/timer.pyx.pxi | 25 +++++---- .../grpc/_cython/_cygrpc/channel.pyx.pxi | 2 + .../grpcio/grpc/experimental/aio/_channel.py | 2 +- .../grpc/experimental/aio/_interceptor.py | 8 +-- .../grpcio/grpc/experimental/aio/_server.py | 3 +- 11 files changed, 89 insertions(+), 77 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi index 67848cadaf8..24c79533012 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi @@ -72,7 +72,7 @@ cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler cdef class CallbackCompletionQueue: def __cinit__(self): - self._shutdown_completed = asyncio.get_event_loop().create_future() + self._shutdown_completed = grpc_aio_loop().create_future() self._wrapper = CallbackWrapper( self._shutdown_completed, CQ_SHUTDOWN_FAILURE_HANDLER) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi index 4b38779ab63..229367287d5 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi @@ -13,16 +13,31 @@ # limitations under the License. -cdef bint _grpc_aio_initialized = 0 +cdef bint _grpc_aio_initialized = False +# NOTE(lidiz) Theoretically, applications can run in multiple event loops as +# long as they are in the same thread with same magic. However, I don't think +# we should support this use case. So, the gRPC Python Async Stack should use +# a single event loop picked by "init_grpc_aio". +cdef object _grpc_aio_loop def init_grpc_aio(): global _grpc_aio_initialized + global _grpc_aio_loop if _grpc_aio_initialized: return + else: + _grpc_aio_initialized = True + # Anchors the event loop that the gRPC library going to use. + _grpc_aio_loop = asyncio.get_event_loop() + + # Activates asyncio IO manager install_asyncio_iomgr() + + # TODO(lidiz) we need a the grpc_shutdown_blocking() counterpart for this + # call. Otherwise, the gRPC library won't shutdown cleanly. grpc_init() # Timers are triggered by the Asyncio loop. We disable @@ -34,4 +49,9 @@ def init_grpc_aio(): # event loop, as it is being done by the other Asyncio callbacks. Executor.SetThreadingAll(False) - _grpc_aio_initialized = 1 + _grpc_aio_initialized = False + + +def grpc_aio_loop(): + """Returns the one-and-only gRPC Aio event loop.""" + return _grpc_aio_loop diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi index 639cb92d0c5..37ba5f0d346 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi @@ -49,7 +49,6 @@ cdef void asyncio_socket_connect( const grpc_sockaddr* addr, size_t addr_len, grpc_custom_connect_callback connect_cb) with gil: - host, port = sockaddr_to_tuple(addr, addr_len) socket = <_AsyncioSocket>grpc_socket.impl socket.connect(host, port, connect_cb) @@ -185,14 +184,15 @@ cdef void asyncio_resolve_async( cdef void asyncio_timer_start(grpc_custom_timer* grpc_timer) with gil: timer = _AsyncioTimer.create(grpc_timer, grpc_timer.timeout_ms / 1000.0) - Py_INCREF(timer) grpc_timer.timer = timer cdef void asyncio_timer_stop(grpc_custom_timer* grpc_timer) with gil: - timer = <_AsyncioTimer>grpc_timer.timer - timer.stop() - Py_DECREF(timer) + if grpc_timer.timer == NULL: + return + else: + timer = <_AsyncioTimer>grpc_timer.timer + timer.stop() cdef void asyncio_init_loop() with gil: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi index d533d6e3a37..7d47fa77b00 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi @@ -29,34 +29,27 @@ cdef class _AsyncioResolver: id_ = id(self) return f"<{class_name} {id_}>" - def _resolve_cb(self, future): - error = False + async def _async_resolve(self, bytes host, bytes port): + self._task_resolve = None try: - res = future.result() + resolved = await grpc_aio_loop().getaddrinfo(host, port) except Exception as e: - error = True - error_msg = str(e) - finally: - self._task_resolve = None - - if not error: grpc_custom_resolve_callback( self._grpc_resolver, - tuples_to_resolvaddr(res), - 0 + NULL, + grpc_socket_error("Resolve address [{}:{}] failed: {}: {}".format( + host, port, type(e), str(e)).encode()) ) else: grpc_custom_resolve_callback( self._grpc_resolver, - NULL, - grpc_socket_error("getaddrinfo {}".format(error_msg).encode()) + tuples_to_resolvaddr(resolved), + 0 ) cdef void resolve(self, char* host, char* port): assert not self._task_resolve - loop = asyncio.get_event_loop() - self._task_resolve = asyncio.ensure_future( - loop.getaddrinfo(host, port) + self._task_resolve = grpc_aio_loop().create_task( + self._async_resolve(host, port) ) - self._task_resolve.add_done_callback(self._resolve_cb) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi index 1664ef7e35a..65ee6e24e59 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi @@ -35,7 +35,6 @@ cdef class _AsyncioSocket: self._server = None self._py_socket = None self._peername = None - self._loop = asyncio.get_event_loop() @staticmethod cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket, @@ -62,27 +61,37 @@ cdef class _AsyncioSocket: connected = self.is_connected() return f"<{class_name} {id_} connected={connected}>" - def _connect_cb(self, future): + async def _async_connect(self, object host, object port,): + self._task_connect = None try: - self._reader, self._writer = future.result() + self._reader, self._writer = await asyncio.open_connection(host, port) except Exception as e: self._grpc_connect_cb( self._grpc_socket, - grpc_socket_error("Socket connect failed: {}".format(e).encode()) + grpc_socket_error("Socket connect failed: {}: {}".format(type(e), str(e)).encode()) ) - return - finally: - self._task_connect = None + else: + # gRPC default posix implementation disables nagle + # algorithm. + sock = self._writer.transport.get_extra_info('socket') + sock.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True) - # gRPC default posix implementation disables nagle - # algorithm. - sock = self._writer.transport.get_extra_info('socket') - sock.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True) + self._grpc_connect_cb( + self._grpc_socket, + 0 + ) - self._grpc_connect_cb( - self._grpc_socket, - 0 + cdef void connect(self, + object host, + object port, + grpc_custom_connect_callback grpc_connect_cb): + assert not self._reader + assert not self._task_connect + + self._task_connect = grpc_aio_loop().create_task( + self._async_connect(host, port) ) + self._grpc_connect_cb = grpc_connect_cb async def _async_read(self, size_t length): self._task_read = None @@ -106,25 +115,12 @@ cdef class _AsyncioSocket: 0 ) - cdef void connect(self, - object host, - object port, - grpc_custom_connect_callback grpc_connect_cb): - assert not self._reader - assert not self._task_connect - - self._task_connect = asyncio.ensure_future( - asyncio.open_connection(host, port) - ) - self._grpc_connect_cb = grpc_connect_cb - self._task_connect.add_done_callback(self._connect_cb) - cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb): assert not self._task_read self._grpc_read_cb = grpc_read_cb self._read_buffer = buffer_ - self._task_read = self._loop.create_task(self._async_read(length)) + self._task_read = grpc_aio_loop().create_task(self._async_read(length)) async def _async_write(self, bytearray outbound_buffer): self._writer.write(outbound_buffer) @@ -157,7 +153,7 @@ cdef class _AsyncioSocket: outbound_buffer.extend(start[:length]) self._grpc_write_cb = grpc_write_cb - self._task_write = self._loop.create_task(self._async_write(outbound_buffer)) + self._task_write = grpc_aio_loop().create_task(self._async_write(outbound_buffer)) cdef bint is_connected(self): return self._reader and not self._reader._transport.is_closing() @@ -201,7 +197,7 @@ cdef class _AsyncioSocket: sock=self._py_socket, ) - self._loop.create_task(create_asyncio_server()) + grpc_aio_loop().create_task(create_asyncio_server()) cdef accept(self, grpc_custom_socket* grpc_socket_client, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi index 5af5dcd9282..d2979c86b49 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi @@ -15,11 +15,10 @@ cdef class _AsyncioTimer: cdef: grpc_custom_timer * _grpc_timer - object _deadline - object _timer_handler - int _active + object _timer_future + bint _active @staticmethod - cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, deadline) + cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, float timeout) cdef stop(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi index e8edb4a5cf8..fc57e3760ed 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi @@ -16,21 +16,23 @@ cdef class _AsyncioTimer: def __cinit__(self): self._grpc_timer = NULL - self._timer_handler = None - self._active = 0 + self._timer_future = None + self._active = False + cpython.Py_INCREF(self) @staticmethod - cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, deadline): + cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, float timeout): timer = _AsyncioTimer() timer._grpc_timer = grpc_timer - timer._deadline = deadline - timer._timer_handler = asyncio.get_event_loop().call_later(deadline, timer._on_deadline) - timer._active = 1 + timer._timer_future = grpc_aio_loop().create_task(timer._async_time_up(timeout)) + timer._active = True return timer - def _on_deadline(self): - self._active = 0 + async def _async_time_up(self, float timeout): + await asyncio.sleep(timeout) + self._active = False grpc_custom_timer_callback(self._grpc_timer, 0) + cpython.Py_DECREF(self) def __repr__(self): class_name = self.__class__.__name__ @@ -38,8 +40,9 @@ cdef class _AsyncioTimer: return f"<{class_name} {id_} deadline={self._deadline} active={self._active}>" cdef stop(self): - if self._active == 0: + if not self._active: return - self._timer_handler.cancel() - self._active = 0 + self._timer_future.cancel() + self._active = False + cpython.Py_DECREF(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index 01baa5288ef..74c7f6c1405 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -256,6 +256,8 @@ cdef void _call( on_success(started_tags) else: raise ValueError('Cannot invoke RPC: %s' % channel_state.closed_reason) + + cdef void _process_integrated_call_tag( _ChannelState state, _BatchOperationTag tag) except *: cdef _CallState call_state = state.integrated_call_states.pop(tag) diff --git a/src/python/grpcio/grpc/experimental/aio/_channel.py b/src/python/grpcio/grpc/experimental/aio/_channel.py index fcc61338ca3..2664195a9ec 100644 --- a/src/python/grpcio/grpc/experimental/aio/_channel.py +++ b/src/python/grpcio/grpc/experimental/aio/_channel.py @@ -228,7 +228,7 @@ class Channel(_base_channel.Channel): "UnaryUnaryClientInterceptors, the following are invalid: {}"\ .format(invalid_interceptors)) - self._loop = asyncio.get_event_loop() + self._loop = cygrpc.grpc_aio_loop() self._channel = cygrpc.AioChannel( _common.encode(target), _augment_channel_arguments(options, compression), credentials, diff --git a/src/python/grpcio/grpc/experimental/aio/_interceptor.py b/src/python/grpcio/grpc/experimental/aio/_interceptor.py index 04f2f72e275..68deb05f987 100644 --- a/src/python/grpcio/grpc/experimental/aio/_interceptor.py +++ b/src/python/grpcio/grpc/experimental/aio/_interceptor.py @@ -160,10 +160,10 @@ class InterceptedUnaryUnaryCall(_base_call.UnaryUnaryCall): loop: asyncio.AbstractEventLoop) -> None: self._channel = channel self._loop = loop - self._interceptors_task = asyncio.ensure_future(self._invoke( - interceptors, method, timeout, metadata, credentials, - wait_for_ready, request, request_serializer, response_deserializer), - loop=loop) + self._interceptors_task = asyncio.create_task( + self._invoke(interceptors, method, timeout, metadata, credentials, + wait_for_ready, request, request_serializer, + response_deserializer)) self._pending_add_done_callbacks = [] self._interceptors_task.add_done_callback( self._fire_pending_add_done_callbacks) diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py index 587d8096c69..18e2bf65553 100644 --- a/src/python/grpcio/grpc/experimental/aio/_server.py +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -13,7 +13,6 @@ # limitations under the License. """Server-side implementation of gRPC Asyncio Python.""" -import asyncio from concurrent.futures import Executor from typing import Any, Optional, Sequence @@ -41,7 +40,7 @@ class Server(_base_server.Server): options: ChannelArgumentType, maximum_concurrent_rpcs: Optional[int], compression: Optional[grpc.Compression]): - self._loop = asyncio.get_event_loop() + self._loop = cygrpc.grpc_aio_loop() if interceptors: invalid_interceptors = [ interceptor for interceptor in interceptors