From 8ad6152f008d09c8192b8ce8efebd561091ffb08 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 16 Dec 2019 18:21:34 -0800 Subject: [PATCH] Remove the explicit coroutine tracking --- .../grpc/_cython/_cygrpc/aio/server.pxd.pxi | 1 + .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 107 +++++++++--------- 2 files changed, 54 insertions(+), 54 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi index f41e5f395d3..e8852f4f5b9 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -20,6 +20,7 @@ cdef class _HandlerCallDetails: cdef class RPCState(GrpcCallWrapper): cdef grpc_call_details details cdef grpc_metadata_array request_metadata + cdef AioServer server cdef bytes method(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 64705abd459..e3fe250358e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -29,7 +29,8 @@ cdef class _HandlerCallDetails: cdef class RPCState: - def __cinit__(self): + def __cinit__(self, AioServer server): + self.server = server grpc_metadata_array_init(&self.request_metadata) grpc_call_details_init(&self.details) @@ -174,7 +175,13 @@ async def _handle_unary_stream_rpc(object method_handler, # Consumes messages from the generator async for response_message in async_response_generator: - await servicer_context.write(response_message) + if rpc_state.server._status == AIO_SERVER_STATUS_STOPPED: + # The async generator might yield much much later after the + # server is destroied. If we proceed, Core will crash badly. + _LOGGER.warn('RPC Aborted: Server already stopped.') + return + else: + await servicer_context.write(response_message) # Sends the final status of this RPC cdef SendStatusFromServerOperation op = SendStatusFromServerOperation( @@ -188,13 +195,19 @@ async def _handle_unary_stream_rpc(object method_handler, await execute_batch(rpc_state, ops, loop) -async def _handle_cancellation_from_core(object rpc_task, - RPCState rpc_state, - object loop): +async def _schedule_rpc_coro_and_handle_cancellation(object rpc_coro, + RPCState rpc_state, + object loop): + # Schedules the RPC coroutine. + cdef object rpc_task = loop.create_task(rpc_coro) + cdef ReceiveCloseOnServerOperation op = ReceiveCloseOnServerOperation(_EMPTY_FLAG) cdef tuple ops = (op,) + + # Awaits cancellation from peer. await execute_batch(rpc_state, ops, loop) if op.cancelled() and not rpc_task.done(): + # Injects `CancelledError` to halt the RPC coroutine rpc_task.cancel() @@ -227,32 +240,6 @@ cdef CallbackFailureHandler REQUEST_CALL_FAILURE_HANDLER = CallbackFailureHandle 'grpc_server_request_call', None, _RequestCallError) -async def _server_call_request_call(Server server, - CallbackCompletionQueue cq, - object loop): - cdef grpc_call_error error - cdef RPCState rpc_state = RPCState() - cdef object future = loop.create_future() - cdef CallbackWrapper wrapper = CallbackWrapper( - future, - REQUEST_CALL_FAILURE_HANDLER) - # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed - # when calling "await". This is an over-optimization by Cython. - cpython.Py_INCREF(wrapper) - error = grpc_server_request_call( - server.c_server, &rpc_state.call, &rpc_state.details, - &rpc_state.request_metadata, - cq.c_ptr(), cq.c_ptr(), - wrapper.c_functor() - ) - if error != GRPC_CALL_OK: - raise RuntimeError("Error in _server_call_request_call: %s" % error) - - await future - cpython.Py_DECREF(wrapper) - return rpc_state - - cdef CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler( 'grpc_server_shutdown_and_notify', None, @@ -307,6 +294,29 @@ cdef class AioServer: return self._server.add_http2_port(address, server_credentials._credentials) + async def _request_call(self): + cdef grpc_call_error error + cdef RPCState rpc_state = RPCState(self) + cdef object future = self._loop.create_future() + cdef CallbackWrapper wrapper = CallbackWrapper( + future, + REQUEST_CALL_FAILURE_HANDLER) + # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed + # when calling "await". This is an over-optimization by Cython. + cpython.Py_INCREF(wrapper) + error = grpc_server_request_call( + self._server.c_server, &rpc_state.call, &rpc_state.details, + &rpc_state.request_metadata, + self._cq.c_ptr(), self._cq.c_ptr(), + wrapper.c_functor() + ) + if error != GRPC_CALL_OK: + raise RuntimeError("Error in grpc_server_request_call: %s" % error) + + await future + cpython.Py_DECREF(wrapper) + return rpc_state + async def _server_main_loop(self, object server_started): self._server.start() @@ -319,33 +329,26 @@ cdef class AioServer: break # Accepts new request from Core - rpc_state = await _server_call_request_call( - self._server, - self._cq, - self._loop) - - # Schedules the RPC as a separate coroutine - rpc_task = self._loop.create_task( - _handle_rpc( - self._generic_handlers, - rpc_state, - self._loop - ) - ) + rpc_state = await self._request_call() + + # Creates the dedicated RPC coroutine. If we schedule it right now, + # there is no guarantee if the cancellation listening coroutine is + # ready or not. So, we should control the ordering by scheduling + # the coroutine onto event loop inside of the cancellation + # coroutine. + rpc_coro = _handle_rpc(self._generic_handlers, + rpc_state, + self._loop) # Fires off a task that listens on the cancellation from client. self._loop.create_task( - _handle_cancellation_from_core( - rpc_task, + _schedule_rpc_coro_and_handle_cancellation( + rpc_coro, rpc_state, self._loop ) ) - # Keeps track of created coroutines, so we can clean them up properly. - self._ongoing_rpc_tasks.add(rpc_task) - rpc_task.add_done_callback(lambda _: self._ongoing_rpc_tasks.remove(rpc_task)) - def _serving_task_crash_handler(self, object task): """Shutdown the server immediately if unexpectedly exited.""" if task.exception() is None: @@ -423,10 +426,6 @@ cdef class AioServer: grpc_server_cancel_all_calls(self._server.c_server) await self._shutdown_completed - # Cancels all Python layer tasks - for rpc_task in self._ongoing_rpc_tasks: - rpc_task.cancel() - async with self._shutdown_lock: if self._status == AIO_SERVER_STATUS_STOPPING: grpc_server_destroy(self._server.c_server)