Add try_connect API to StreamUnaryCall

pull/22565/head
Lidi Zheng 5 years ago
parent c07520a350
commit 58beda2f73
  1. 41
      src/python/grpcio/grpc/experimental/aio/_base_call.py
  2. 10
      src/python/grpcio/grpc/experimental/aio/_call.py
  3. 28
      src/python/grpcio_tests/tests_aio/unit/try_connect_test.py

@ -164,13 +164,13 @@ class UnaryStreamCall(Generic[RequestType, ResponseType],
This is an EXPERIMENTAL method.
This method is available for RPCs with streaming responses. This method
enables the application to ensure if the RPC has been successfully
connected. Otherwise, an AioRpcError will be raised to explain the
reason of the connection failure.
This method is available for streaming RPCs. This method enables the
application to ensure if the RPC has been successfully connected.
Otherwise, an AioRpcError will be raised to explain the reason of the
connection failure.
For RPCs with unary response, the connectivity issue will be raised
once the application awaits the call.
For unary-unary RPCs, the connectivity issue will be raised once the
application awaits the call.
This method is recommended for building retry mechanisms.
"""
@ -204,6 +204,23 @@ class StreamUnaryCall(Generic[RequestType, ResponseType],
The response message of the stream.
"""
@abstractmethod
async def try_connect(self) -> None:
"""Tries to connect to peer and raise aio.AioRpcError if failed.
This is an EXPERIMENTAL method.
This method is available for streaming RPCs. This method enables the
application to ensure if the RPC has been successfully connected.
Otherwise, an AioRpcError will be raised to explain the reason of the
connection failure.
For unary-unary RPCs, the connectivity issue will be raised once the
application awaits the call.
This method is recommended for building retry mechanisms.
"""
class StreamStreamCall(Generic[RequestType, ResponseType],
Call,
@ -253,13 +270,13 @@ class StreamStreamCall(Generic[RequestType, ResponseType],
This is an EXPERIMENTAL method.
This method is available for RPCs with streaming responses. This method
enables the application to ensure if the RPC has been successfully
connected. Otherwise, an AioRpcError will be raised to explain the
reason of the connection failure.
This method is available for streaming RPCs. This method enables the
application to ensure if the RPC has been successfully connected.
Otherwise, an AioRpcError will be raised to explain the reason of the
connection failure.
For RPCs with unary response, the connectivity issue will be raised
once the application awaits the call.
For unary-unary RPCs, the connectivity issue will be raised once the
application awaits the call.
This method is recommended for building retry mechanisms.
"""

@ -458,6 +458,11 @@ class _StreamRequestMixin(Call):
self._raise_for_different_style(_APIStyle.READER_WRITER)
await self._done_writing()
async def try_connect(self) -> None:
await self._metadata_sent.wait()
if self.done():
await self._raise_for_status()
class UnaryUnaryCall(_UnaryResponseMixin, Call, _base_call.UnaryUnaryCall):
"""Object for managing unary-unary RPC calls.
@ -615,8 +620,3 @@ class StreamStreamCall(_StreamRequestMixin, _StreamResponseMixin, Call,
if not self.cancelled():
self.cancel()
# No need to raise RpcError here, because no one will `await` this task.
async def try_connect(self) -> None:
await self._metadata_sent.wait()
if self.done():
await self._raise_for_status()

@ -71,6 +71,26 @@ class TestTryConnect(AioTestBase):
self.assertEqual(_NUM_STREAM_RESPONSES, response_cnt)
self.assertEqual(await call.code(), grpc.StatusCode.OK)
async def test_stream_unary_ok(self):
call = self._stub.StreamingInputCall()
# No exception raised and no message swallowed.
await call.try_connect()
payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE)
request = messages_pb2.StreamingInputCallRequest(payload=payload)
for _ in range(_NUM_STREAM_RESPONSES):
await call.write(request)
await call.done_writing()
response = await call
self.assertIsInstance(response, messages_pb2.StreamingInputCallResponse)
self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE,
response.aggregated_payload_size)
self.assertEqual(await call.code(), grpc.StatusCode.OK)
async def test_stream_stream_ok(self):
call = self._stub.FullDuplexCall()
@ -100,6 +120,14 @@ class TestTryConnect(AioTestBase):
rpc_error = exception_context.exception
self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code())
async def test_stream_unary_error(self):
call = self._dummy_channel.stream_unary(_TEST_METHOD)(_REQUEST)
with self.assertRaises(aio.AioRpcError) as exception_context:
await call.try_connect()
rpc_error = exception_context.exception
self.assertEqual(grpc.StatusCode.UNAVAILABLE, rpc_error.code())
async def test_stream_stream_error(self):
call = self._dummy_channel.stream_stream(_TEST_METHOD)()

Loading…
Cancel
Save