Maintain a strong reference to rpc_tasks.

Only a weakref is kept in the loop. See python docs on creating tasks.
https://docs.python.org/3/library/asyncio-task.html#creating-tasks

This fixes an issue with request hangs when tasks were being garbage collected.

PiperOrigin-RevId: 634035959
pull/36519/head
Oskar Bunyan 9 months ago committed by Copybara-Service
parent bc5570bec8
commit 05ae7fb926
  1. 19
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi

@ -780,6 +780,16 @@ async def _schedule_rpc_coro(object rpc_coro,
), name="HandleExceptions[%s]" % _decode(rpc_state.method()))
_add_callback_handler(rpc_task, rpc_state)
await _handle_cancellation_from_core(rpc_task, rpc_state, loop)
try:
# Propagate any errors not handled by _handle_exceptions. If not awaited
# there will be logs of the form "Task exception was never retrieved".
# Catching it here we can provide traceback and debugging logs.
await rpc_task
except:
_LOGGER.exception('Exception not handled by _handle_exceptions in servicer method [%s]' % (
_decode(rpc_state.method()),
))
traceback.print_exc()
async def _handle_rpc(list generic_handlers, tuple interceptors,
@ -954,6 +964,7 @@ cdef class AioServer:
self._server.start(backup_queue=False)
cdef RPCState rpc_state
server_started.set_result(True)
rpc_tasks = set()
while True:
# When shutdown begins, no more new connections.
@ -985,9 +996,15 @@ cdef class AioServer:
rpc_coro,
rpc_state,
self._loop
)
),
name="rpc_task",
)
# loop.create_task only holds a weakref to the task.
# Maintain reference to tasks to avoid garbage collection.
rpc_tasks.add(rpc_task)
rpc_task.add_done_callback(rpc_tasks.discard)
if self._limiter is not None:
self._limiter.decrease_once_finished(rpc_task)

Loading…
Cancel
Save