From f1b29deea62afeb62237d740421415cdb43a57c0 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 17 Dec 2019 17:44:52 -0800 Subject: [PATCH 01/10] Improve cancellation mechanism: * Remove the weird cancellation_future; * Convert all CancelledError into RpcError with CANCELLED; * Move part of call logic from Cython to Python layer; * Make unary-stream call based on reader API instead of async generator. --- .../grpc/_cython/_cygrpc/aio/call.pxd.pxi | 13 +- .../grpc/_cython/_cygrpc/aio/call.pyx.pxi | 107 +++++--------- .../grpc/_cython/_cygrpc/aio/channel.pyx.pxi | 37 +---- .../grpcio/grpc/experimental/aio/_call.py | 132 ++++++++++-------- .../grpcio_tests/tests_aio/unit/call_test.py | 15 +- 5 files changed, 129 insertions(+), 175 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi index 3844797c50e..b800cee6028 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi @@ -22,13 +22,12 @@ cdef class _AioCall: # time we need access to the event loop. object _loop - # Streaming call only attributes: - # - # A asyncio.Event that indicates if the status is received on the client side. - object _status_received - # A tuple of key value pairs representing the initial metadata sent by peer. - tuple _initial_metadata + # Flag indicates whether cancel being called or not. Cancellation from + # Core or peer works perfectly fine with normal procedure. However, we + # need this flag to clean up resources for cancellation from the + # application layer. Directly cancelling tasks might cause segfault + # because Core is holding a pointer for the callback handler. + bint _is_locally_cancelled cdef grpc_call* _create_grpc_call(self, object timeout, bytes method) except * cdef void _destroy_grpc_call(self) - cdef AioRpcStatus _cancel_and_create_status(self, object cancellation_future) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi index b98809a12e0..0b5c9a7589a 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi @@ -33,8 +33,7 @@ cdef class _AioCall: self._grpc_call_wrapper = GrpcCallWrapper() self._loop = asyncio.get_event_loop() self._create_grpc_call(deadline, method) - - self._status_received = asyncio.Event(loop=self._loop) + self._is_locally_cancelled = False def __dealloc__(self): self._destroy_grpc_call() @@ -78,17 +77,21 @@ cdef class _AioCall: """Destroys the corresponding Core object for this RPC.""" grpc_call_unref(self._grpc_call_wrapper.call) - cdef AioRpcStatus _cancel_and_create_status(self, object cancellation_future): - """Cancels the RPC in Core, and return the final RPC status.""" - cdef AioRpcStatus status + def cancel(self, AioRpcStatus status): + """Cancels the RPC in Core with given RPC status. + + Above abstractions must invoke this method to set Core objects into + proper state. + """ + self._is_locally_cancelled = True + cdef object details cdef char *c_details cdef grpc_call_error error # Try to fetch application layer cancellation details in the future. # * If cancellation details present, cancel with status; # * If details not present, cancel with unknown reason. - if cancellation_future.done(): - status = cancellation_future.result() + if status is not None: details = str_to_bytes(status.details()) self._references.append(details) c_details = details @@ -100,7 +103,6 @@ cdef class _AioCall: NULL, ) assert error == GRPC_CALL_OK - return status else: # By implementation, grpc_call_cancel always return OK error = grpc_call_cancel(self._grpc_call_wrapper.call, NULL) @@ -111,12 +113,9 @@ cdef class _AioCall: None, None, ) - cancellation_future.set_result(status) - return status async def unary_unary(self, bytes request, - object cancellation_future, object initial_metadata_observer, object status_observer): """Performs a unary unary RPC. @@ -145,19 +144,10 @@ cdef class _AioCall: receive_initial_metadata_op, receive_message_op, receive_status_on_client_op) - try: - await execute_batch(self._grpc_call_wrapper, - ops, - self._loop) - except asyncio.CancelledError: - status = self._cancel_and_create_status(cancellation_future) - initial_metadata_observer(None) - status_observer(status) - raise - else: - initial_metadata_observer( - receive_initial_metadata_op.initial_metadata() - ) + # Executes all operations in one batch. + await execute_batch(self._grpc_call_wrapper, + ops, + self._loop) status = AioRpcStatus( receive_status_on_client_op.code(), @@ -179,6 +169,11 @@ cdef class _AioCall: cdef ReceiveStatusOnClientOperation op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS) cdef tuple ops = (op,) await execute_batch(self._grpc_call_wrapper, ops, self._loop) + + # Halts if the RPC is locally cancelled + if self._is_locally_cancelled: + return + cdef AioRpcStatus status = AioRpcStatus( op.code(), op.details(), @@ -186,52 +181,30 @@ cdef class _AioCall: op.error_string(), ) status_observer(status) - self._status_received.set() - - def _handle_cancellation_from_application(self, - object cancellation_future, - object status_observer): - def _cancellation_action(finished_future): - if not self._status_received.set(): - status = self._cancel_and_create_status(finished_future) - status_observer(status) - self._status_received.set() - - cancellation_future.add_done_callback(_cancellation_action) - async def _message_async_generator(self): + async def receive_serialized_message(self): + """Receives one single raw message in bytes.""" cdef bytes received_message - # Infinitely receiving messages, until: + # Receives a message. Returns None when failed: # * EOF, no more messages to read; - # * The client application cancells; + # * The client application cancels; # * The server sends final status. - while True: - if self._status_received.is_set(): - return - - received_message = await _receive_message( - self._grpc_call_wrapper, - self._loop - ) - if received_message is None: - # The read operation failed, Core should explain why it fails - await self._status_received.wait() - return - else: - yield received_message + received_message = await _receive_message( + self._grpc_call_wrapper, + self._loop + ) + return received_message async def unary_stream(self, bytes request, - object cancellation_future, object initial_metadata_observer, object status_observer): - """Actual implementation of the complete unary-stream call. - - Needs to pay extra attention to the raise mechanism. If we want to - propagate the final status exception, then we have to raise it. - Othersize, it would end normally and raise `StopAsyncIteration()`. - """ + """Implementation of the start of a unary-stream call.""" + # Peer may prematurely end this RPC at any point. We need a corutine + # that watches if the server sends the final status. + self._loop.create_task(self._handle_status_once_received(status_observer)) + cdef tuple outbound_ops cdef Operation initial_metadata_op = SendInitialMetadataOperation( _EMPTY_METADATA, @@ -248,21 +221,13 @@ cdef class _AioCall: send_close_op, ) - # Actually sends out the request message. + # Sends out the request message. await execute_batch(self._grpc_call_wrapper, - outbound_ops, - self._loop) - - # Peer may prematurely end this RPC at any point. We need a mechanism - # that handles both the normal case and the error case. - self._loop.create_task(self._handle_status_once_received(status_observer)) - self._handle_cancellation_from_application(cancellation_future, - status_observer) + outbound_ops, + self._loop) # Receives initial metadata. initial_metadata_observer( await _receive_initial_metadata(self._grpc_call_wrapper, self._loop), ) - - return self._message_async_generator() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi index 81b6c208619..4b6dd8c3b35 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi @@ -26,38 +26,13 @@ cdef class AioChannel: def close(self): grpc_channel_destroy(self.channel) - async def unary_unary(self, - bytes method, - bytes request, - object deadline, - object cancellation_future, - object initial_metadata_observer, - object status_observer): - """Assembles a unary-unary RPC. + def call(self, + bytes method, + object deadline): + """Assembles a Cython Call object. Returns: - The response message in bytes. + The _AioCall object. """ cdef _AioCall call = _AioCall(self, deadline, method) - return await call.unary_unary(request, - cancellation_future, - initial_metadata_observer, - status_observer) - - def unary_stream(self, - bytes method, - bytes request, - object deadline, - object cancellation_future, - object initial_metadata_observer, - object status_observer): - """Assembles a unary-stream RPC. - - Returns: - An async generator that yields raw responses. - """ - cdef _AioCall call = _AioCall(self, deadline, method) - return call.unary_stream(request, - cancellation_future, - initial_metadata_observer, - status_observer) + return call diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index be7a48157d0..9e32cbd51f0 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -15,6 +15,7 @@ import asyncio from typing import AsyncIterable, Awaitable, Dict, Optional +import logging import grpc from grpc import _common @@ -167,8 +168,7 @@ class Call(_base_call.Call): raise NotImplementedError() def cancelled(self) -> bool: - return self._cancellation.done( - ) or self._code == grpc.StatusCode.CANCELLED + return self._code == grpc.StatusCode.CANCELLED def done(self) -> bool: return self._status.done() @@ -205,14 +205,17 @@ class Call(_base_call.Call): cancellation (by application) and Core receiving status from peer. We make no promise here which one will win. """ - if self._status.done(): - return - else: - self._status.set_result(status) - self._code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[ - status.code()] + logging.debug('Call._set_status, %s, %s', self._status.done(), status) + # In case of the RPC finished without receiving metadata. + if not self._initial_metadata.done(): + self._initial_metadata.set_result(None) + + # Sets final status + self._status.set_result(status) + self._code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[status.code()] async def _raise_rpc_error_if_not_ok(self) -> None: + await self._status if self._code != grpc.StatusCode.OK: raise _create_rpc_error(await self.initial_metadata(), self._status.result()) @@ -245,12 +248,11 @@ class UnaryUnaryCall(Call, _base_call.UnaryUnaryCall): Returned when an instance of `UnaryUnaryMultiCallable` object is called. """ _request: RequestType - _deadline: Optional[float] _channel: cygrpc.AioChannel - _method: bytes _request_serializer: SerializingFunction _response_deserializer: DeserializingFunction _call: asyncio.Task + _cython_call: cygrpc._AioCall def __init__(self, request: RequestType, deadline: Optional[float], channel: cygrpc.AioChannel, method: bytes, @@ -258,11 +260,10 @@ class UnaryUnaryCall(Call, _base_call.UnaryUnaryCall): response_deserializer: DeserializingFunction) -> None: super().__init__() self._request = request - self._deadline = deadline self._channel = channel - self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer + self._cython_call = self._channel.call(method, deadline) self._call = self._loop.create_task(self._invoke()) def __del__(self) -> None: @@ -275,19 +276,20 @@ class UnaryUnaryCall(Call, _base_call.UnaryUnaryCall): serialized_request = _common.serialize(self._request, self._request_serializer) - # NOTE(lidiz) asyncio.CancelledError is not a good transport for - # status, since the Task class do not cache the exact - # asyncio.CancelledError object. So, the solution is catching the error - # in Cython layer, then cancel the RPC and update the status, finally - # re-raise the CancelledError. - serialized_response = await self._channel.unary_unary( - self._method, - serialized_request, - self._deadline, - self._cancellation, - self._set_initial_metadata, - self._set_status, - ) + # NOTE(lidiz) asyncio.CancelledError is not a good transport for status, + # because the asyncio.Task class do not cache the exception object. + # https://github.com/python/cpython/blob/edad4d89e357c92f70c0324b937845d652b20afd/Lib/asyncio/tasks.py#L785 + try: + serialized_response = await self._cython_call.unary_unary( + serialized_request, + self._set_initial_metadata, + self._set_status, + ) + except asyncio.CancelledError: + # Only this class can inject the CancelledError into the RPC + # coroutine, so we are certain that this exception is due to local + # cancellation. + assert self._code == grpc.StatusCode.CANCELLED await self._raise_rpc_error_if_not_ok() return _common.deserialize(serialized_response, @@ -295,8 +297,8 @@ class UnaryUnaryCall(Call, _base_call.UnaryUnaryCall): def _cancel(self, status: cygrpc.AioRpcStatus) -> bool: """Forwards the application cancellation reasoning.""" - if not self._status.done() and not self._cancellation.done(): - self._cancellation.set_result(status) + if not self._status.done(): + self._set_status(status) self._call.cancel() return True else: @@ -328,13 +330,11 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall): Returned when an instance of `UnaryStreamMultiCallable` object is called. """ _request: RequestType - _deadline: Optional[float] _channel: cygrpc.AioChannel - _method: bytes _request_serializer: SerializingFunction _response_deserializer: DeserializingFunction - _call: asyncio.Task - _bytes_aiter: AsyncIterable[bytes] + _cython_call: cygrpc._AioCall + _send_unary_request_task: asyncio.Task _message_aiter: AsyncIterable[ResponseType] def __init__(self, request: RequestType, deadline: Optional[float], @@ -343,13 +343,13 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall): response_deserializer: DeserializingFunction) -> None: super().__init__() self._request = request - self._deadline = deadline self._channel = channel - self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer - self._call = self._loop.create_task(self._invoke()) - self._message_aiter = self._process() + self._send_unary_request_task = self._loop.create_task( + self._send_unary_request()) + self._message_aiter = self._fetch_stream_responses() + self._cython_call = self._channel.call(method, deadline) def __del__(self) -> None: if not self._status.done(): @@ -357,32 +357,18 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall): cygrpc.AioRpcStatus(cygrpc.StatusCode.cancelled, _GC_CANCELLATION_DETAILS, None, None)) - async def _invoke(self) -> ResponseType: + async def _send_unary_request(self) -> ResponseType: serialized_request = _common.serialize(self._request, self._request_serializer) + await self._cython_call.unary_stream( + serialized_request, self._set_initial_metadata, self._set_status) - self._bytes_aiter = await self._channel.unary_stream( - self._method, - serialized_request, - self._deadline, - self._cancellation, - self._set_initial_metadata, - self._set_status, - ) - - async def _process(self) -> ResponseType: - await self._call - async for serialized_response in self._bytes_aiter: - if self._cancellation.done(): - await self._status - if self._status.done(): - # Raises pre-maturely if final status received here. Generates - # more helpful stack trace for end users. - await self._raise_rpc_error_if_not_ok() - yield _common.deserialize(serialized_response, - self._response_deserializer) - - await self._raise_rpc_error_if_not_ok() + async def _fetch_stream_responses(self) -> ResponseType: + await self._send_unary_request_task + message = await self._read() + while message: + yield message + message = await self._read() def _cancel(self, status: cygrpc.AioRpcStatus) -> bool: """Forwards the application cancellation reasoning. @@ -395,8 +381,15 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall): and the client calling "cancel" at the same time, this method respects the winner in Core. """ - if not self._status.done() and not self._cancellation.done(): - self._cancellation.set_result(status) + if not self._status.done(): + self._set_status(status) + self._cython_call.cancel(status) + + if not self._send_unary_request_task.done(): + # Injects CancelledError to the Task. The exception will + # propagate to _fetch_stream_responses as well, if the sending + # is not done. + self._send_unary_request_task.cancel() return True else: return False @@ -409,8 +402,25 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall): def __aiter__(self) -> AsyncIterable[ResponseType]: return self._message_aiter + async def _read(self) -> ResponseType: + serialized_response = await self._cython_call.receive_serialized_message( + ) + if serialized_response is None: + return None + else: + return _common.deserialize(serialized_response, + self._response_deserializer) + async def read(self) -> ResponseType: if self._status.done(): await self._raise_rpc_error_if_not_ok() raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) - return await self._message_aiter.__anext__() + + response_message = await self._read() + if response_message is None: + # If the read operation failed, Core should explain why. + await self._raise_rpc_error_if_not_ok() + # If everything is okay, there is something wrong internally. + assert False, 'Read operation failed with StatusCode.OK' + else: + return response_message diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index 78e6dac21ae..849b5f471c8 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -126,18 +126,23 @@ class TestUnaryUnaryCall(AioTestBase): self.assertTrue(call.cancel()) self.assertFalse(call.cancel()) - with self.assertRaises(asyncio.CancelledError) as exception_context: + with self.assertRaises(grpc.RpcError) as exception_context: await call + # The info in the RpcError should match the info in Call object. + rpc_error = exception_context.exception + self.assertEqual(rpc_error.code(), await call.code()) + self.assertEqual(rpc_error.details(), await call.details()) + self.assertEqual(rpc_error.trailing_metadata(), await + call.trailing_metadata()) + self.assertEqual(rpc_error.debug_error_string(), await + call.debug_error_string()) + self.assertTrue(call.cancelled()) self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED) self.assertEqual(await call.details(), 'Locally cancelled by application!') - # NOTE(lidiz) The CancelledError is almost always re-created, - # so we might not want to use it to transmit data. - # https://github.com/python/cpython/blob/edad4d89e357c92f70c0324b937845d652b20afd/Lib/asyncio/tasks.py#L785 - class TestUnaryStreamCall(AioTestBase): From 65e4f17a2c5afe4c0df12867b55270a345fbc742 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 17 Dec 2019 19:29:21 -0800 Subject: [PATCH 02/10] Remove unused code --- src/python/grpcio/grpc/experimental/aio/_call.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index 9e32cbd51f0..1b0ace52376 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -149,14 +149,12 @@ class Call(_base_call.Call): _code: grpc.StatusCode _status: Awaitable[cygrpc.AioRpcStatus] _initial_metadata: Awaitable[MetadataType] - _cancellation: asyncio.Future def __init__(self) -> None: self._loop = asyncio.get_event_loop() self._code = None self._status = self._loop.create_future() self._initial_metadata = self._loop.create_future() - self._cancellation = self._loop.create_future() def cancel(self) -> bool: """Placeholder cancellation method. @@ -205,7 +203,6 @@ class Call(_base_call.Call): cancellation (by application) and Core receiving status from peer. We make no promise here which one will win. """ - logging.debug('Call._set_status, %s, %s', self._status.done(), status) # In case of the RPC finished without receiving metadata. if not self._initial_metadata.done(): self._initial_metadata.set_result(None) From e8283e4818edd1e31709447e612c54a4f38ce43e Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 17 Dec 2019 19:32:06 -0800 Subject: [PATCH 03/10] Reword the comment --- src/python/grpcio/grpc/experimental/aio/_call.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index 1b0ace52376..bd720c159ac 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -417,7 +417,7 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall): if response_message is None: # If the read operation failed, Core should explain why. await self._raise_rpc_error_if_not_ok() - # If everything is okay, there is something wrong internally. + # If no exception raised, there is something wrong internally. assert False, 'Read operation failed with StatusCode.OK' else: return response_message From d49b0849f05bf6d98897cb8a26ffee0fc4fef877 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 18 Dec 2019 13:35:25 -0800 Subject: [PATCH 04/10] Adding more catch clauses for CancelledError --- .../grpc/_cython/_cygrpc/aio/call.pyx.pxi | 6 ++ .../grpcio/grpc/experimental/aio/_call.py | 43 ++++++--- .../grpcio_tests/tests_aio/unit/call_test.py | 92 +++++++++++++++++++ 3 files changed, 129 insertions(+), 12 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi index 0b5c9a7589a..a726084ec4d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi @@ -77,6 +77,11 @@ cdef class _AioCall: """Destroys the corresponding Core object for this RPC.""" grpc_call_unref(self._grpc_call_wrapper.call) + @property + def locally_cancelled(self): + """Grant Python layer access of the cancelled flag.""" + return self._is_locally_cancelled + def cancel(self, AioRpcStatus status): """Cancels the RPC in Core with given RPC status. @@ -145,6 +150,7 @@ cdef class _AioCall: receive_status_on_client_op) # Executes all operations in one batch. + # Might raise CancelledError, handling it in Python UnaryUnaryCall. await execute_batch(self._grpc_call_wrapper, ops, self._loop) diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index bd720c159ac..c8fb5d18568 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -15,7 +15,6 @@ import asyncio from typing import AsyncIterable, Awaitable, Dict, Optional -import logging import grpc from grpc import _common @@ -42,6 +41,8 @@ _NON_OK_CALL_REPRESENTATION = ('<{} of RPC that terminated with:\n' '\tdebug_error_string = "{}"\n' '>') +_EMPTY_METADATA = tuple() + class AioRpcError(grpc.RpcError): """An implementation of RpcError to be used by the asynchronous API. @@ -205,7 +206,7 @@ class Call(_base_call.Call): """ # In case of the RPC finished without receiving metadata. if not self._initial_metadata.done(): - self._initial_metadata.set_result(None) + self._initial_metadata.set_result(_EMPTY_METADATA) # Sets final status self._status.set_result(status) @@ -283,10 +284,10 @@ class UnaryUnaryCall(Call, _base_call.UnaryUnaryCall): self._set_status, ) except asyncio.CancelledError: - # Only this class can inject the CancelledError into the RPC - # coroutine, so we are certain that this exception is due to local - # cancellation. - assert self._code == grpc.StatusCode.CANCELLED + if self._code != grpc.StatusCode.CANCELLED: + self.cancel() + + # Raises RpcError here if RPC failed or cancelled await self._raise_rpc_error_if_not_ok() return _common.deserialize(serialized_response, @@ -357,8 +358,16 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall): async def _send_unary_request(self) -> ResponseType: serialized_request = _common.serialize(self._request, self._request_serializer) - await self._cython_call.unary_stream( - serialized_request, self._set_initial_metadata, self._set_status) + try: + await self._cython_call.unary_stream( + serialized_request, + self._set_initial_metadata, + self._set_status + ) + except asyncio.CancelledError: + if self._code != grpc.StatusCode.CANCELLED: + self.cancel() + await self._raise_rpc_error_if_not_ok() async def _fetch_stream_responses(self) -> ResponseType: await self._send_unary_request_task @@ -400,12 +409,21 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall): return self._message_aiter async def _read(self) -> ResponseType: - serialized_response = await self._cython_call.receive_serialized_message( - ) - if serialized_response is None: + # Wait for the request being sent + await self._send_unary_request_task + + # Reads response message from Core + try: + raw_response = await self._cython_call.receive_serialized_message() + except asyncio.CancelledError: + if self._code != grpc.StatusCode.CANCELLED: + self.cancel() + await self._raise_rpc_error_if_not_ok() + + if raw_response is None: return None else: - return _common.deserialize(serialized_response, + return _common.deserialize(raw_response, self._response_deserializer) async def read(self) -> ResponseType: @@ -414,6 +432,7 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall): raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) response_message = await self._read() + if response_message is None: # If the read operation failed, Core should explain why. await self._raise_rpc_error_if_not_ok() diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index 849b5f471c8..bbca4dd3996 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -33,6 +33,8 @@ _LOCAL_CANCEL_DETAILS_EXPECTATION = 'Locally cancelled by application!' _RESPONSE_INTERVAL_US = test_constants.SHORT_TIMEOUT * 1000 * 1000 _UNREACHABLE_TARGET = '0.1:1111' +_INFINITE_INTERVAL_US = 2**31-1 + class TestUnaryUnaryCall(AioTestBase): @@ -143,6 +145,29 @@ class TestUnaryUnaryCall(AioTestBase): self.assertEqual(await call.details(), 'Locally cancelled by application!') + async def test_cancel_unary_unary_in_task(self): + async with aio.insecure_channel(self._server_target) as channel: + stub = test_pb2_grpc.TestServiceStub(channel) + coro_started = asyncio.Event() + call = stub.EmptyCall(messages_pb2.SimpleRequest()) + + async def another_coro(): + coro_started.set() + await call + + task = self.loop.create_task(another_coro()) + await coro_started.wait() + + self.assertFalse(task.done()) + task.cancel() + + self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) + + with self.assertRaises(grpc.RpcError) as exception_context: + await task + self.assertEqual(grpc.StatusCode.CANCELLED, + exception_context.exception.code()) + class TestUnaryStreamCall(AioTestBase): @@ -328,6 +353,73 @@ class TestUnaryStreamCall(AioTestBase): self.assertEqual(await call.code(), grpc.StatusCode.OK) + async def test_cancel_unary_stream_in_task_using_read(self): + async with aio.insecure_channel(self._server_target) as channel: + stub = test_pb2_grpc.TestServiceStub(channel) + coro_started = asyncio.Event() + + # Configs the server method to block forever + request = messages_pb2.StreamingOutputCallRequest() + request.response_parameters.append( + messages_pb2.ResponseParameters( + size=_RESPONSE_PAYLOAD_SIZE, + interval_us=_INFINITE_INTERVAL_US, + )) + + # Invokes the actual RPC + call = stub.StreamingOutputCall(request) + + async def another_coro(): + coro_started.set() + await call.read() + + task = self.loop.create_task(another_coro()) + await coro_started.wait() + + self.assertFalse(task.done()) + task.cancel() + + self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) + + with self.assertRaises(grpc.RpcError) as exception_context: + await task + self.assertEqual(grpc.StatusCode.CANCELLED, + exception_context.exception.code()) + + async def test_cancel_unary_stream_in_task_using_async_for(self): + async with aio.insecure_channel(self._server_target) as channel: + stub = test_pb2_grpc.TestServiceStub(channel) + coro_started = asyncio.Event() + + # Configs the server method to block forever + request = messages_pb2.StreamingOutputCallRequest() + request.response_parameters.append( + messages_pb2.ResponseParameters( + size=_RESPONSE_PAYLOAD_SIZE, + interval_us=_INFINITE_INTERVAL_US, + )) + + # Invokes the actual RPC + call = stub.StreamingOutputCall(request) + + async def another_coro(): + coro_started.set() + async for _ in call: + pass + + task = self.loop.create_task(another_coro()) + await coro_started.wait() + + self.assertFalse(task.done()) + task.cancel() + + self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) + + with self.assertRaises(grpc.RpcError) as exception_context: + await task + self.assertEqual(grpc.StatusCode.CANCELLED, + exception_context.exception.code()) + if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) From 413d29218e06c1407fe6226cc3c41f50cd386ce7 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 18 Dec 2019 13:57:09 -0800 Subject: [PATCH 05/10] Make YAPF happy --- src/python/grpcio/grpc/experimental/aio/_call.py | 8 +++----- src/python/grpcio_tests/tests_aio/unit/call_test.py | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index c8fb5d18568..96415fa9521 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -359,11 +359,9 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall): serialized_request = _common.serialize(self._request, self._request_serializer) try: - await self._cython_call.unary_stream( - serialized_request, - self._set_initial_metadata, - self._set_status - ) + await self._cython_call.unary_stream(serialized_request, + self._set_initial_metadata, + self._set_status) except asyncio.CancelledError: if self._code != grpc.StatusCode.CANCELLED: self.cancel() diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index bbca4dd3996..c0a7fa17017 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -33,7 +33,7 @@ _LOCAL_CANCEL_DETAILS_EXPECTATION = 'Locally cancelled by application!' _RESPONSE_INTERVAL_US = test_constants.SHORT_TIMEOUT * 1000 * 1000 _UNREACHABLE_TARGET = '0.1:1111' -_INFINITE_INTERVAL_US = 2**31-1 +_INFINITE_INTERVAL_US = 2**31 - 1 class TestUnaryUnaryCall(AioTestBase): From 6f0ffef2e94f24475b6197ad920ab287ce8050e8 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 18 Dec 2019 16:15:37 -0800 Subject: [PATCH 06/10] Resolve a TODO and handle one more cancellation corner case --- .../grpcio/grpc/experimental/aio/_call.py | 21 ++++++++++--------- .../grpcio_tests/tests_aio/unit/call_test.py | 4 ---- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index 96415fa9521..7acb5494b4b 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -308,16 +308,17 @@ class UnaryUnaryCall(Call, _base_call.UnaryUnaryCall): _LOCAL_CANCELLATION_DETAILS, None, None)) def __await__(self) -> ResponseType: - """Wait till the ongoing RPC request finishes. - - Returns: - Response of the RPC call. - - Raises: - RpcError: Indicating that the RPC terminated with non-OK status. - asyncio.CancelledError: Indicating that the RPC was canceled. - """ - response = yield from self._call + """Wait till the ongoing RPC request finishes.""" + try: + response = yield from self._call + except asyncio.CancelledError: + # Even if we converted all other CancelledError, there is still + # this corner case. If the application cancels immediately after + # the Call object is created, we will observe this + # `CancelledError`. + if not self.cancelled(): + self.cancel() + raise _create_rpc_error(_EMPTY_METADATA, self._status.result()) return response diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index c0a7fa17017..cecce1c79d5 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -121,10 +121,6 @@ class TestUnaryUnaryCall(AioTestBase): self.assertFalse(call.cancelled()) - # TODO(https://github.com/grpc/grpc/issues/20869) remove sleep. - # Force the loop to execute the RPC task. - await asyncio.sleep(0) - self.assertTrue(call.cancel()) self.assertFalse(call.cancel()) From a3d7733dd02054654c53e93e93781c96f8f371e1 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 19 Dec 2019 13:58:03 -0800 Subject: [PATCH 07/10] Passing cancel signal to Core for Unary Call as well --- .../grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi | 11 ----------- src/python/grpcio/grpc/experimental/aio/_call.py | 1 + 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi index a726084ec4d..c10d79cb7d3 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi @@ -77,11 +77,6 @@ cdef class _AioCall: """Destroys the corresponding Core object for this RPC.""" grpc_call_unref(self._grpc_call_wrapper.call) - @property - def locally_cancelled(self): - """Grant Python layer access of the cancelled flag.""" - return self._is_locally_cancelled - def cancel(self, AioRpcStatus status): """Cancels the RPC in Core with given RPC status. @@ -112,12 +107,6 @@ cdef class _AioCall: # By implementation, grpc_call_cancel always return OK error = grpc_call_cancel(self._grpc_call_wrapper.call, NULL) assert error == GRPC_CALL_OK - status = AioRpcStatus( - StatusCode.cancelled, - _UNKNOWN_CANCELLATION_DETAILS, - None, - None, - ) async def unary_unary(self, bytes request, diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index 7acb5494b4b..a8969b06edb 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -297,6 +297,7 @@ class UnaryUnaryCall(Call, _base_call.UnaryUnaryCall): """Forwards the application cancellation reasoning.""" if not self._status.done(): self._set_status(status) + self._cython_call.cancel(status) self._call.cancel() return True else: From 4e3d980f7038f229a09121cf885e9fcf39221d1d Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 2 Jan 2020 10:23:06 -0800 Subject: [PATCH 08/10] Convert local cancellation exception into CancelledError --- .../grpcio/grpc/experimental/aio/_call.py | 24 +++++++++----- .../grpcio_tests/tests_aio/unit/call_test.py | 33 ++++--------------- 2 files changed, 23 insertions(+), 34 deletions(-) diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index a8969b06edb..1557b678d66 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -150,12 +150,14 @@ class Call(_base_call.Call): _code: grpc.StatusCode _status: Awaitable[cygrpc.AioRpcStatus] _initial_metadata: Awaitable[MetadataType] + _locally_cancelled: bool def __init__(self) -> None: self._loop = asyncio.get_event_loop() self._code = None self._status = self._loop.create_future() self._initial_metadata = self._loop.create_future() + self._locally_cancelled = False def cancel(self) -> bool: """Placeholder cancellation method. @@ -204,6 +206,10 @@ class Call(_base_call.Call): cancellation (by application) and Core receiving status from peer. We make no promise here which one will win. """ + # In case of local cancellation, flip the flag. + if status.details() is _LOCAL_CANCELLATION_DETAILS: + self._locally_cancelled = True + # In case of the RPC finished without receiving metadata. if not self._initial_metadata.done(): self._initial_metadata.set_result(_EMPTY_METADATA) @@ -212,7 +218,9 @@ class Call(_base_call.Call): self._status.set_result(status) self._code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[status.code()] - async def _raise_rpc_error_if_not_ok(self) -> None: + async def _raise_if_not_ok(self) -> None: + if self._locally_cancelled: + raise asyncio.CancelledError() await self._status if self._code != grpc.StatusCode.OK: raise _create_rpc_error(await self.initial_metadata(), @@ -287,8 +295,8 @@ class UnaryUnaryCall(Call, _base_call.UnaryUnaryCall): if self._code != grpc.StatusCode.CANCELLED: self.cancel() - # Raises RpcError here if RPC failed or cancelled - await self._raise_rpc_error_if_not_ok() + # Raises here if RPC failed or cancelled + await self._raise_if_not_ok() return _common.deserialize(serialized_response, self._response_deserializer) @@ -319,7 +327,7 @@ class UnaryUnaryCall(Call, _base_call.UnaryUnaryCall): # `CancelledError`. if not self.cancelled(): self.cancel() - raise _create_rpc_error(_EMPTY_METADATA, self._status.result()) + raise return response @@ -367,7 +375,7 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall): except asyncio.CancelledError: if self._code != grpc.StatusCode.CANCELLED: self.cancel() - await self._raise_rpc_error_if_not_ok() + raise async def _fetch_stream_responses(self) -> ResponseType: await self._send_unary_request_task @@ -418,7 +426,7 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall): except asyncio.CancelledError: if self._code != grpc.StatusCode.CANCELLED: self.cancel() - await self._raise_rpc_error_if_not_ok() + raise if raw_response is None: return None @@ -428,14 +436,14 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall): async def read(self) -> ResponseType: if self._status.done(): - await self._raise_rpc_error_if_not_ok() + await self._raise_if_not_ok() raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) response_message = await self._read() if response_message is None: # If the read operation failed, Core should explain why. - await self._raise_rpc_error_if_not_ok() + await self._raise_if_not_ok() # If no exception raised, there is something wrong internally. assert False, 'Read operation failed with StatusCode.OK' else: diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index cecce1c79d5..bdf2fbfda6b 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -124,18 +124,10 @@ class TestUnaryUnaryCall(AioTestBase): self.assertTrue(call.cancel()) self.assertFalse(call.cancel()) - with self.assertRaises(grpc.RpcError) as exception_context: + with self.assertRaises(asyncio.CancelledError): await call # The info in the RpcError should match the info in Call object. - rpc_error = exception_context.exception - self.assertEqual(rpc_error.code(), await call.code()) - self.assertEqual(rpc_error.details(), await call.details()) - self.assertEqual(rpc_error.trailing_metadata(), await - call.trailing_metadata()) - self.assertEqual(rpc_error.debug_error_string(), await - call.debug_error_string()) - self.assertTrue(call.cancelled()) self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED) self.assertEqual(await call.details(), @@ -159,10 +151,8 @@ class TestUnaryUnaryCall(AioTestBase): self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) - with self.assertRaises(grpc.RpcError) as exception_context: + with self.assertRaises(asyncio.CancelledError): await task - self.assertEqual(grpc.StatusCode.CANCELLED, - exception_context.exception.code()) class TestUnaryStreamCall(AioTestBase): @@ -201,7 +191,7 @@ class TestUnaryStreamCall(AioTestBase): call.details()) self.assertFalse(call.cancel()) - with self.assertRaises(grpc.RpcError) as exception_context: + with self.assertRaises(asyncio.CancelledError): await call.read() self.assertTrue(call.cancelled()) @@ -232,7 +222,7 @@ class TestUnaryStreamCall(AioTestBase): self.assertFalse(call.cancel()) self.assertFalse(call.cancel()) - with self.assertRaises(grpc.RpcError) as exception_context: + with self.assertRaises(asyncio.CancelledError): await call.read() async def test_early_cancel_unary_stream(self): @@ -256,16 +246,11 @@ class TestUnaryStreamCall(AioTestBase): self.assertTrue(call.cancel()) self.assertFalse(call.cancel()) - with self.assertRaises(grpc.RpcError) as exception_context: + with self.assertRaises(asyncio.CancelledError): await call.read() self.assertTrue(call.cancelled()) - self.assertEqual(grpc.StatusCode.CANCELLED, - exception_context.exception.code()) - self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, - exception_context.exception.details()) - self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await call.details()) @@ -377,10 +362,8 @@ class TestUnaryStreamCall(AioTestBase): self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) - with self.assertRaises(grpc.RpcError) as exception_context: + with self.assertRaises(asyncio.CancelledError): await task - self.assertEqual(grpc.StatusCode.CANCELLED, - exception_context.exception.code()) async def test_cancel_unary_stream_in_task_using_async_for(self): async with aio.insecure_channel(self._server_target) as channel: @@ -411,10 +394,8 @@ class TestUnaryStreamCall(AioTestBase): self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) - with self.assertRaises(grpc.RpcError) as exception_context: + with self.assertRaises(asyncio.CancelledError): await task - self.assertEqual(grpc.StatusCode.CANCELLED, - exception_context.exception.code()) if __name__ == '__main__': From 9a3ddd8d76170ed9a0c040e3b0d343cd2e698a40 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Fri, 3 Jan 2020 10:28:40 -0800 Subject: [PATCH 09/10] Correct comment wording --- src/python/grpcio/grpc/experimental/aio/_call.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index 1557b678d66..36f07ce4d92 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -321,7 +321,7 @@ class UnaryUnaryCall(Call, _base_call.UnaryUnaryCall): try: response = yield from self._call except asyncio.CancelledError: - # Even if we converted all other CancelledError, there is still + # Even if we caught all other CancelledError, there is still # this corner case. If the application cancels immediately after # the Call object is created, we will observe this # `CancelledError`. From 5c4e28583068dcceeb56917e4d86a2cf43c13b03 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 6 Jan 2020 11:54:57 -0800 Subject: [PATCH 10/10] Use "raise_for_status" --- src/python/grpcio/grpc/experimental/aio/_call.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index 36f07ce4d92..f04c6cdd761 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -218,7 +218,7 @@ class Call(_base_call.Call): self._status.set_result(status) self._code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[status.code()] - async def _raise_if_not_ok(self) -> None: + async def _raise_for_status(self) -> None: if self._locally_cancelled: raise asyncio.CancelledError() await self._status @@ -296,7 +296,7 @@ class UnaryUnaryCall(Call, _base_call.UnaryUnaryCall): self.cancel() # Raises here if RPC failed or cancelled - await self._raise_if_not_ok() + await self._raise_for_status() return _common.deserialize(serialized_response, self._response_deserializer) @@ -436,14 +436,14 @@ class UnaryStreamCall(Call, _base_call.UnaryStreamCall): async def read(self) -> ResponseType: if self._status.done(): - await self._raise_if_not_ok() + await self._raise_for_status() raise asyncio.InvalidStateError(_RPC_ALREADY_FINISHED_DETAILS) response_message = await self._read() if response_message is None: # If the read operation failed, Core should explain why. - await self._raise_if_not_ok() + await self._raise_for_status() # If no exception raised, there is something wrong internally. assert False, 'Read operation failed with StatusCode.OK' else: