Remove the explicit coroutine tracking

pull/21495/head
Lidi Zheng 5 years ago
parent 8e35f4d458
commit 8ad6152f00
  1. 1
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi
  2. 107
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi

@ -20,6 +20,7 @@ cdef class _HandlerCallDetails:
cdef class RPCState(GrpcCallWrapper): cdef class RPCState(GrpcCallWrapper):
cdef grpc_call_details details cdef grpc_call_details details
cdef grpc_metadata_array request_metadata cdef grpc_metadata_array request_metadata
cdef AioServer server
cdef bytes method(self) cdef bytes method(self)

@ -29,7 +29,8 @@ cdef class _HandlerCallDetails:
cdef class RPCState: cdef class RPCState:
def __cinit__(self): def __cinit__(self, AioServer server):
self.server = server
grpc_metadata_array_init(&self.request_metadata) grpc_metadata_array_init(&self.request_metadata)
grpc_call_details_init(&self.details) grpc_call_details_init(&self.details)
@ -174,7 +175,13 @@ async def _handle_unary_stream_rpc(object method_handler,
# Consumes messages from the generator # Consumes messages from the generator
async for response_message in async_response_generator: async for response_message in async_response_generator:
await servicer_context.write(response_message) if rpc_state.server._status == AIO_SERVER_STATUS_STOPPED:
# The async generator might yield much much later after the
# server is destroied. If we proceed, Core will crash badly.
_LOGGER.warn('RPC Aborted: Server already stopped.')
return
else:
await servicer_context.write(response_message)
# Sends the final status of this RPC # Sends the final status of this RPC
cdef SendStatusFromServerOperation op = SendStatusFromServerOperation( cdef SendStatusFromServerOperation op = SendStatusFromServerOperation(
@ -188,13 +195,19 @@ async def _handle_unary_stream_rpc(object method_handler,
await execute_batch(rpc_state, ops, loop) await execute_batch(rpc_state, ops, loop)
async def _handle_cancellation_from_core(object rpc_task, async def _schedule_rpc_coro_and_handle_cancellation(object rpc_coro,
RPCState rpc_state, RPCState rpc_state,
object loop): object loop):
# Schedules the RPC coroutine.
cdef object rpc_task = loop.create_task(rpc_coro)
cdef ReceiveCloseOnServerOperation op = ReceiveCloseOnServerOperation(_EMPTY_FLAG) cdef ReceiveCloseOnServerOperation op = ReceiveCloseOnServerOperation(_EMPTY_FLAG)
cdef tuple ops = (op,) cdef tuple ops = (op,)
# Awaits cancellation from peer.
await execute_batch(rpc_state, ops, loop) await execute_batch(rpc_state, ops, loop)
if op.cancelled() and not rpc_task.done(): if op.cancelled() and not rpc_task.done():
# Injects `CancelledError` to halt the RPC coroutine
rpc_task.cancel() rpc_task.cancel()
@ -227,32 +240,6 @@ cdef CallbackFailureHandler REQUEST_CALL_FAILURE_HANDLER = CallbackFailureHandle
'grpc_server_request_call', None, _RequestCallError) 'grpc_server_request_call', None, _RequestCallError)
async def _server_call_request_call(Server server,
CallbackCompletionQueue cq,
object loop):
cdef grpc_call_error error
cdef RPCState rpc_state = RPCState()
cdef object future = loop.create_future()
cdef CallbackWrapper wrapper = CallbackWrapper(
future,
REQUEST_CALL_FAILURE_HANDLER)
# NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed
# when calling "await". This is an over-optimization by Cython.
cpython.Py_INCREF(wrapper)
error = grpc_server_request_call(
server.c_server, &rpc_state.call, &rpc_state.details,
&rpc_state.request_metadata,
cq.c_ptr(), cq.c_ptr(),
wrapper.c_functor()
)
if error != GRPC_CALL_OK:
raise RuntimeError("Error in _server_call_request_call: %s" % error)
await future
cpython.Py_DECREF(wrapper)
return rpc_state
cdef CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler( cdef CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler(
'grpc_server_shutdown_and_notify', 'grpc_server_shutdown_and_notify',
None, None,
@ -307,6 +294,29 @@ cdef class AioServer:
return self._server.add_http2_port(address, return self._server.add_http2_port(address,
server_credentials._credentials) server_credentials._credentials)
async def _request_call(self):
cdef grpc_call_error error
cdef RPCState rpc_state = RPCState(self)
cdef object future = self._loop.create_future()
cdef CallbackWrapper wrapper = CallbackWrapper(
future,
REQUEST_CALL_FAILURE_HANDLER)
# NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed
# when calling "await". This is an over-optimization by Cython.
cpython.Py_INCREF(wrapper)
error = grpc_server_request_call(
self._server.c_server, &rpc_state.call, &rpc_state.details,
&rpc_state.request_metadata,
self._cq.c_ptr(), self._cq.c_ptr(),
wrapper.c_functor()
)
if error != GRPC_CALL_OK:
raise RuntimeError("Error in grpc_server_request_call: %s" % error)
await future
cpython.Py_DECREF(wrapper)
return rpc_state
async def _server_main_loop(self, async def _server_main_loop(self,
object server_started): object server_started):
self._server.start() self._server.start()
@ -319,33 +329,26 @@ cdef class AioServer:
break break
# Accepts new request from Core # Accepts new request from Core
rpc_state = await _server_call_request_call( rpc_state = await self._request_call()
self._server,
self._cq, # Creates the dedicated RPC coroutine. If we schedule it right now,
self._loop) # there is no guarantee if the cancellation listening coroutine is
# ready or not. So, we should control the ordering by scheduling
# Schedules the RPC as a separate coroutine # the coroutine onto event loop inside of the cancellation
rpc_task = self._loop.create_task( # coroutine.
_handle_rpc( rpc_coro = _handle_rpc(self._generic_handlers,
self._generic_handlers, rpc_state,
rpc_state, self._loop)
self._loop
)
)
# Fires off a task that listens on the cancellation from client. # Fires off a task that listens on the cancellation from client.
self._loop.create_task( self._loop.create_task(
_handle_cancellation_from_core( _schedule_rpc_coro_and_handle_cancellation(
rpc_task, rpc_coro,
rpc_state, rpc_state,
self._loop self._loop
) )
) )
# Keeps track of created coroutines, so we can clean them up properly.
self._ongoing_rpc_tasks.add(rpc_task)
rpc_task.add_done_callback(lambda _: self._ongoing_rpc_tasks.remove(rpc_task))
def _serving_task_crash_handler(self, object task): def _serving_task_crash_handler(self, object task):
"""Shutdown the server immediately if unexpectedly exited.""" """Shutdown the server immediately if unexpectedly exited."""
if task.exception() is None: if task.exception() is None:
@ -423,10 +426,6 @@ cdef class AioServer:
grpc_server_cancel_all_calls(self._server.c_server) grpc_server_cancel_all_calls(self._server.c_server)
await self._shutdown_completed await self._shutdown_completed
# Cancels all Python layer tasks
for rpc_task in self._ongoing_rpc_tasks:
rpc_task.cancel()
async with self._shutdown_lock: async with self._shutdown_lock:
if self._status == AIO_SERVER_STATUS_STOPPING: if self._status == AIO_SERVER_STATUS_STOPPING:
grpc_server_destroy(self._server.c_server) grpc_server_destroy(self._server.c_server)

Loading…
Cancel
Save