Address leak when using request stream interceptors (#25449) (#27571)

* Don't leave pending tasks on the asyncio queue

The results of these pending tasks are not needed, leaving them
on the queue grows the size of the queue until the call completes.

This fix slows the growth of the memory in the test example.

* Address 'leaking' Futures from cygrpc

Cancelling unneeded Tasks is not sufficient as this leaves behind
cancelled Futures in the cygrpc layer, which still occupy memory.

Instead, avoid creating unneeded tasks in the first place.

* Address review comments

1. Ignore unused return values
2. Fix formatting
pull/27714/head
Bolutife Ogunsola 3 years ago committed by GitHub
parent 3b6056aed8
commit c1d4e96433
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 30
      src/python/grpcio/grpc/aio/_interceptor.py

@ -475,6 +475,7 @@ class _InterceptedStreamRequestMixin:
_write_to_iterator_async_gen: Optional[AsyncIterable[RequestType]]
_write_to_iterator_queue: Optional[asyncio.Queue]
_status_code_task: Optional[asyncio.Task]
_FINISH_ITERATOR_SENTINEL = object()
@ -488,6 +489,7 @@ class _InterceptedStreamRequestMixin:
self._write_to_iterator_queue = asyncio.Queue(maxsize=1)
self._write_to_iterator_async_gen = self._proxy_writes_as_request_iterator(
)
self._status_code_task = None
request_iterator = self._write_to_iterator_async_gen
else:
self._write_to_iterator_queue = None
@ -503,6 +505,19 @@ class _InterceptedStreamRequestMixin:
break
yield value
async def _write_to_iterator_queue_interruptible(self, request: RequestType,
call: InterceptedCall):
# Write the specified 'request' to the request iterator queue using the
# specified 'call' to allow for interruption of the write in the case
# of abrupt termination of the call.
if self._status_code_task is None:
self._status_code_task = self._loop.create_task(call.code())
await asyncio.wait(
(self._loop.create_task(self._write_to_iterator_queue.put(request)),
self._status_code_task),
return_when=asyncio.FIRST_COMPLETED)
async def write(self, request: RequestType) -> None:
# If no queue was created it means that requests
# should be expected through an iterators provided
@ -520,12 +535,7 @@ class _InterceptedStreamRequestMixin:
elif call._done_writing_flag:
raise asyncio.InvalidStateError(_RPC_HALF_CLOSED_DETAILS)
# Write might never end up since the call could abrubtly finish,
# we give up on the first awaitable object that finishes.
_, _ = await asyncio.wait(
(self._loop.create_task(self._write_to_iterator_queue.put(request)),
self._loop.create_task(call.code())),
return_when=asyncio.FIRST_COMPLETED)
await self._write_to_iterator_queue_interruptible(request, call)
if call.done():
raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
@ -546,12 +556,8 @@ class _InterceptedStreamRequestMixin:
except asyncio.CancelledError:
raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS)
# Write might never end up since the call could abrubtly finish,
# we give up on the first awaitable object that finishes.
_, _ = await asyncio.wait((self._write_to_iterator_queue.put(
_InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL),
call.code()),
return_when=asyncio.FIRST_COMPLETED)
await self._write_to_iterator_queue_interruptible(
_InterceptedStreamRequestMixin._FINISH_ITERATOR_SENTINEL, call)
class InterceptedUnaryUnaryCall(_InterceptedUnaryResponseMixin, InterceptedCall,

Loading…
Cancel
Save