From 58beda2f73801c920d0b274348bf5fe106fdab3f Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Fri, 3 Apr 2020 12:08:17 -0700 Subject: [PATCH] Add try_connect API to StreamUnaryCall --- .../grpc/experimental/aio/_base_call.py | 41 +++++++++++++------ .../grpcio/grpc/experimental/aio/_call.py | 10 ++--- .../tests_aio/unit/try_connect_test.py | 28 +++++++++++++ 3 files changed, 62 insertions(+), 17 deletions(-) diff --git a/src/python/grpcio/grpc/experimental/aio/_base_call.py b/src/python/grpcio/grpc/experimental/aio/_base_call.py index 703e4ca621d..e71076fe3d5 100644 --- a/src/python/grpcio/grpc/experimental/aio/_base_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_base_call.py @@ -164,13 +164,13 @@ class UnaryStreamCall(Generic[RequestType, ResponseType], This is an EXPERIMENTAL method. - This method is available for RPCs with streaming responses. This method - enables the application to ensure if the RPC has been successfully - connected. Otherwise, an AioRpcError will be raised to explain the - reason of the connection failure. + This method is available for streaming RPCs. This method enables the + application to ensure if the RPC has been successfully connected. + Otherwise, an AioRpcError will be raised to explain the reason of the + connection failure. - For RPCs with unary response, the connectivity issue will be raised - once the application awaits the call. + For unary-unary RPCs, the connectivity issue will be raised once the + application awaits the call. This method is recommended for building retry mechanisms. """ @@ -204,6 +204,23 @@ class StreamUnaryCall(Generic[RequestType, ResponseType], The response message of the stream. """ + @abstractmethod + async def try_connect(self) -> None: + """Tries to connect to peer and raise aio.AioRpcError if failed. + + This is an EXPERIMENTAL method. + + This method is available for streaming RPCs. This method enables the + application to ensure if the RPC has been successfully connected. + Otherwise, an AioRpcError will be raised to explain the reason of the + connection failure. + + For unary-unary RPCs, the connectivity issue will be raised once the + application awaits the call. + + This method is recommended for building retry mechanisms. + """ + class StreamStreamCall(Generic[RequestType, ResponseType], Call, @@ -253,13 +270,13 @@ class StreamStreamCall(Generic[RequestType, ResponseType], This is an EXPERIMENTAL method. - This method is available for RPCs with streaming responses. This method - enables the application to ensure if the RPC has been successfully - connected. Otherwise, an AioRpcError will be raised to explain the - reason of the connection failure. + This method is available for streaming RPCs. This method enables the + application to ensure if the RPC has been successfully connected. + Otherwise, an AioRpcError will be raised to explain the reason of the + connection failure. - For RPCs with unary response, the connectivity issue will be raised - once the application awaits the call. + For unary-unary RPCs, the connectivity issue will be raised once the + application awaits the call. This method is recommended for building retry mechanisms. """ diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index 07c0d302141..dc9dbfe481b 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -458,6 +458,11 @@ class _StreamRequestMixin(Call): self._raise_for_different_style(_APIStyle.READER_WRITER) await self._done_writing() + async def try_connect(self) -> None: + await self._metadata_sent.wait() + if self.done(): + await self._raise_for_status() + class UnaryUnaryCall(_UnaryResponseMixin, Call, _base_call.UnaryUnaryCall): """Object for managing unary-unary RPC calls. @@ -615,8 +620,3 @@ class StreamStreamCall(_StreamRequestMixin, _StreamResponseMixin, Call, if not self.cancelled(): self.cancel() # No need to raise RpcError here, because no one will `await` this task. - - async def try_connect(self) -> None: - await self._metadata_sent.wait() - if self.done(): - await self._raise_for_status() diff --git a/src/python/grpcio_tests/tests_aio/unit/try_connect_test.py b/src/python/grpcio_tests/tests_aio/unit/try_connect_test.py index 61d740e93b7..af4cfa303ff 100644 --- a/src/python/grpcio_tests/tests_aio/unit/try_connect_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/try_connect_test.py @@ -71,6 +71,26 @@ class TestTryConnect(AioTestBase): self.assertEqual(_NUM_STREAM_RESPONSES, response_cnt) self.assertEqual(await call.code(), grpc.StatusCode.OK) + async def test_stream_unary_ok(self): + call = self._stub.StreamingInputCall() + + # No exception raised and no message swallowed. + await call.try_connect() + + payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE) + request = messages_pb2.StreamingInputCallRequest(payload=payload) + + for _ in range(_NUM_STREAM_RESPONSES): + await call.write(request) + await call.done_writing() + + response = await call + self.assertIsInstance(response, messages_pb2.StreamingInputCallResponse) + self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE, + response.aggregated_payload_size) + + self.assertEqual(await call.code(), grpc.StatusCode.OK) + async def test_stream_stream_ok(self): call = self._stub.FullDuplexCall() @@ -100,6 +120,14 @@ class TestTryConnect(AioTestBase): rpc_error = exception_context.exception self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code()) + async def test_stream_unary_error(self): + call = self._dummy_channel.stream_unary(_TEST_METHOD)(_REQUEST) + + with self.assertRaises(aio.AioRpcError) as exception_context: + await call.try_connect() + rpc_error = exception_context.exception + self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code()) + async def test_stream_stream_error(self): call = self._dummy_channel.stream_stream(_TEST_METHOD)()