Promote the channel_ready & simplify test logic

pull/21885/head
Lidi Zheng 5 years ago
parent a5503cc64a
commit daeaf8eff4
  1. 4
      src/python/grpcio/grpc/experimental/aio/__init__.py
  2. 19
      src/python/grpcio/grpc/experimental/aio/_channel.py
  3. 7
      src/python/grpcio_tests/tests_aio/unit/channel_ready_test.py

@ -26,7 +26,7 @@ from grpc._cython.cygrpc import EOF, AbortError, init_grpc_aio
from ._base_call import Call, RpcContext, UnaryStreamCall, UnaryUnaryCall
from ._call import AioRpcError
from ._channel import Channel, UnaryUnaryMultiCallable, channel_ready
from ._channel import Channel, UnaryUnaryMultiCallable
from ._interceptor import (ClientCallDetails, InterceptedUnaryUnaryCall,
UnaryUnaryClientInterceptor)
from ._server import Server, server
@ -88,4 +88,4 @@ __all__ = ('AioRpcError', 'RpcContext', 'Call', 'UnaryUnaryCall',
'UnaryUnaryMultiCallable', 'ClientCallDetails',
'UnaryUnaryClientInterceptor', 'InterceptedUnaryUnaryCall',
'insecure_channel', 'server', 'Server', 'EOF', 'secure_channel',
'AbortError', 'channel_ready')
'AbortError')

@ -453,6 +453,13 @@ class Channel:
assert await self._channel.watch_connectivity_state(
last_observed_state.value[0], None)
async def channel_ready(self) -> None:
"""Creates a coroutine that ends when a Channel is ready."""
state = self.get_state(try_to_connect=True)
while state != grpc.ChannelConnectivity.READY:
await self.wait_for_state_change(state)
state = self.get_state(try_to_connect=True)
def unary_unary(
self,
method: Text,
@ -512,15 +519,3 @@ class Channel:
request_serializer,
response_deserializer, None,
self._loop)
async def channel_ready(channel: Channel) -> None:
"""Creates a coroutine that ends when a Channel is ready.
Args:
channel: A grpc.aio.Channel object.
"""
state = channel.get_state(try_to_connect=True)
while state != grpc.ChannelConnectivity.READY:
await channel.wait_for_state_change(state)
state = channel.get_state(try_to_connect=True)

@ -41,7 +41,7 @@ class TestChannelReady(AioTestBase):
async def test_channel_ready_success(self):
# Start `channel_ready` as another Task
channel_ready_task = self.loop.create_task(
aio.channel_ready(self._channel))
self._channel.channel_ready())
# Wait for TRANSIENT_FAILURE
await _common.block_until_certain_state(
@ -54,12 +54,11 @@ class TestChannelReady(AioTestBase):
# The RPC should recover itself
await channel_ready_task
finally:
if server is not None:
await server.stop(None)
await server.stop(None)
async def test_channel_ready_blocked(self):
with self.assertRaises(asyncio.TimeoutError):
await asyncio.wait_for(aio.channel_ready(self._channel),
await asyncio.wait_for(self._channel.channel_ready(),
test_constants.SHORT_TIMEOUT)

Loading…
Cancel
Save