Simplify call_test using mixin class

pull/21681/head
Lidi Zheng 5 years ago
parent bd9865f6cb
commit adab340647
  1. 508
      src/python/grpcio_tests/tests_aio/unit/call_test.py

@ -14,19 +14,17 @@
"""Tests behavior of the grpc.aio.UnaryUnaryCall class."""
import asyncio
import datetime
import logging
import unittest
import datetime
import grpc
from grpc.experimental import aio
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import test_pb2_grpc
from src.proto.grpc.testing import messages_pb2, test_pb2_grpc
from tests.unit.framework.common import test_constants
from tests_aio.unit._test_server import start_test_server
from tests_aio.unit._test_base import AioTestBase
from src.proto.grpc.testing import messages_pb2
from tests_aio.unit._test_server import start_test_server
_NUM_STREAM_RESPONSES = 5
_RESPONSE_PAYLOAD_SIZE = 42
@ -37,44 +35,41 @@ _UNREACHABLE_TARGET = '0.1:1111'
_INFINITE_INTERVAL_US = 2**31 - 1
class TestUnaryUnaryCall(AioTestBase):
class _MulticallableTestMixin():
async def setUp(self):
self._server_target, self._server = await start_test_server()
address, self._server = await start_test_server()
self._channel = aio.insecure_channel(address)
self._stub = test_pb2_grpc.TestServiceStub(self._channel)
async def tearDown(self):
await self._channel.close()
await self._server.stop(None)
class TestUnaryUnaryCall(_MulticallableTestMixin, AioTestBase):
async def test_call_ok(self):
async with aio.insecure_channel(self._server_target) as channel:
hi = channel.unary_unary(
'/grpc.testing.TestService/UnaryCall',
request_serializer=messages_pb2.SimpleRequest.SerializeToString,
response_deserializer=messages_pb2.SimpleResponse.FromString)
call = hi(messages_pb2.SimpleRequest())
call = self._stub.UnaryCall(messages_pb2.SimpleRequest())
self.assertFalse(call.done())
self.assertFalse(call.done())
response = await call
response = await call
self.assertTrue(call.done())
self.assertIsInstance(response, messages_pb2.SimpleResponse)
self.assertEqual(await call.code(), grpc.StatusCode.OK)
self.assertTrue(call.done())
self.assertIsInstance(response, messages_pb2.SimpleResponse)
self.assertEqual(await call.code(), grpc.StatusCode.OK)
# Response is cached at call object level, reentrance
# returns again the same response
response_retry = await call
self.assertIs(response, response_retry)
# Response is cached at call object level, reentrance
# returns again the same response
response_retry = await call
self.assertIs(response, response_retry)
async def test_call_rpc_error(self):
async with aio.insecure_channel(_UNREACHABLE_TARGET) as channel:
hi = channel.unary_unary(
'/grpc.testing.TestService/UnaryCall',
request_serializer=messages_pb2.SimpleRequest.SerializeToString,
response_deserializer=messages_pb2.SimpleResponse.FromString,
)
stub = test_pb2_grpc.TestServiceStub(channel)
call = hi(messages_pb2.SimpleRequest(), timeout=0.1)
call = stub.UnaryCall(messages_pb2.SimpleRequest(), timeout=0.1)
with self.assertRaises(grpc.RpcError) as exception_context:
await call
@ -95,327 +90,272 @@ class TestUnaryUnaryCall(AioTestBase):
exception_context_retry.exception)
async def test_call_code_awaitable(self):
async with aio.insecure_channel(self._server_target) as channel:
hi = channel.unary_unary(
'/grpc.testing.TestService/UnaryCall',
request_serializer=messages_pb2.SimpleRequest.SerializeToString,
response_deserializer=messages_pb2.SimpleResponse.FromString)
call = hi(messages_pb2.SimpleRequest())
self.assertEqual(await call.code(), grpc.StatusCode.OK)
call = self._stub.UnaryCall(messages_pb2.SimpleRequest())
self.assertEqual(await call.code(), grpc.StatusCode.OK)
async def test_call_details_awaitable(self):
async with aio.insecure_channel(self._server_target) as channel:
hi = channel.unary_unary(
'/grpc.testing.TestService/UnaryCall',
request_serializer=messages_pb2.SimpleRequest.SerializeToString,
response_deserializer=messages_pb2.SimpleResponse.FromString)
call = hi(messages_pb2.SimpleRequest())
self.assertEqual('', await call.details())
call = self._stub.UnaryCall(messages_pb2.SimpleRequest())
self.assertEqual('', await call.details())
async def test_call_initial_metadata_awaitable(self):
async with aio.insecure_channel(self._server_target) as channel:
hi = channel.unary_unary(
'/grpc.testing.TestService/UnaryCall',
request_serializer=messages_pb2.SimpleRequest.SerializeToString,
response_deserializer=messages_pb2.SimpleResponse.FromString)
call = hi(messages_pb2.SimpleRequest())
self.assertEqual((), await call.initial_metadata())
call = self._stub.UnaryCall(messages_pb2.SimpleRequest())
self.assertEqual((), await call.initial_metadata())
async def test_call_trailing_metadata_awaitable(self):
async with aio.insecure_channel(self._server_target) as channel:
hi = channel.unary_unary(
'/grpc.testing.TestService/UnaryCall',
request_serializer=messages_pb2.SimpleRequest.SerializeToString,
response_deserializer=messages_pb2.SimpleResponse.FromString)
call = hi(messages_pb2.SimpleRequest())
self.assertEqual((), await call.trailing_metadata())
call = self._stub.UnaryCall(messages_pb2.SimpleRequest())
self.assertEqual((), await call.trailing_metadata())
async def test_cancel_unary_unary(self):
async with aio.insecure_channel(self._server_target) as channel:
hi = channel.unary_unary(
'/grpc.testing.TestService/UnaryCall',
request_serializer=messages_pb2.SimpleRequest.SerializeToString,
response_deserializer=messages_pb2.SimpleResponse.FromString)
call = hi(messages_pb2.SimpleRequest())
call = self._stub.UnaryCall(messages_pb2.SimpleRequest())
self.assertFalse(call.cancelled())
self.assertFalse(call.cancelled())
self.assertTrue(call.cancel())
self.assertFalse(call.cancel())
self.assertTrue(call.cancel())
self.assertFalse(call.cancel())
with self.assertRaises(asyncio.CancelledError):
await call
with self.assertRaises(asyncio.CancelledError):
await call
# The info in the RpcError should match the info in Call object.
self.assertTrue(call.cancelled())
self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED)
self.assertEqual(await call.details(),
'Locally cancelled by application!')
# The info in the RpcError should match the info in Call object.
self.assertTrue(call.cancelled())
self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED)
self.assertEqual(await call.details(),
'Locally cancelled by application!')
async def test_cancel_unary_unary_in_task(self):
async with aio.insecure_channel(self._server_target) as channel:
stub = test_pb2_grpc.TestServiceStub(channel)
coro_started = asyncio.Event()
call = stub.EmptyCall(messages_pb2.SimpleRequest())
async def another_coro():
coro_started.set()
await call
coro_started = asyncio.Event()
call = self._stub.EmptyCall(messages_pb2.SimpleRequest())
task = self.loop.create_task(another_coro())
await coro_started.wait()
async def another_coro():
coro_started.set()
await call
self.assertFalse(task.done())
task.cancel()
task = self.loop.create_task(another_coro())
await coro_started.wait()
self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
self.assertFalse(task.done())
task.cancel()
with self.assertRaises(asyncio.CancelledError):
await task
self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
with self.assertRaises(asyncio.CancelledError):
await task
class TestUnaryStreamCall(AioTestBase):
async def setUp(self):
self._server_target, self._server = await start_test_server()
async def tearDown(self):
await self._server.stop(None)
class TestUnaryStreamCall(_MulticallableTestMixin, AioTestBase):
async def test_cancel_unary_stream(self):
async with aio.insecure_channel(self._server_target) as channel:
stub = test_pb2_grpc.TestServiceStub(channel)
# Prepares the request
request = messages_pb2.StreamingOutputCallRequest()
for _ in range(_NUM_STREAM_RESPONSES):
request.response_parameters.append(
messages_pb2.ResponseParameters(
size=_RESPONSE_PAYLOAD_SIZE,
interval_us=_RESPONSE_INTERVAL_US,
))
# Prepares the request
request = messages_pb2.StreamingOutputCallRequest()
for _ in range(_NUM_STREAM_RESPONSES):
request.response_parameters.append(
messages_pb2.ResponseParameters(
size=_RESPONSE_PAYLOAD_SIZE,
interval_us=_RESPONSE_INTERVAL_US,
))
# Invokes the actual RPC
call = stub.StreamingOutputCall(request)
self.assertFalse(call.cancelled())
# Invokes the actual RPC
call = self._stub.StreamingOutputCall(request)
self.assertFalse(call.cancelled())
response = await call.read()
self.assertIs(type(response),
messages_pb2.StreamingOutputCallResponse)
self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body))
response = await call.read()
self.assertIs(type(response),
messages_pb2.StreamingOutputCallResponse)
self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body))
self.assertTrue(call.cancel())
self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await
call.details())
self.assertFalse(call.cancel())
self.assertTrue(call.cancel())
self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await
call.details())
self.assertFalse(call.cancel())
with self.assertRaises(asyncio.CancelledError):
await call.read()
self.assertTrue(call.cancelled())
with self.assertRaises(asyncio.CancelledError):
await call.read()
self.assertTrue(call.cancelled())
async def test_multiple_cancel_unary_stream(self):
async with aio.insecure_channel(self._server_target) as channel:
stub = test_pb2_grpc.TestServiceStub(channel)
# Prepares the request
request = messages_pb2.StreamingOutputCallRequest()
for _ in range(_NUM_STREAM_RESPONSES):
request.response_parameters.append(
messages_pb2.ResponseParameters(
size=_RESPONSE_PAYLOAD_SIZE,
interval_us=_RESPONSE_INTERVAL_US,
))
# Prepares the request
request = messages_pb2.StreamingOutputCallRequest()
for _ in range(_NUM_STREAM_RESPONSES):
request.response_parameters.append(
messages_pb2.ResponseParameters(
size=_RESPONSE_PAYLOAD_SIZE,
interval_us=_RESPONSE_INTERVAL_US,
))
# Invokes the actual RPC
call = stub.StreamingOutputCall(request)
self.assertFalse(call.cancelled())
# Invokes the actual RPC
call = self._stub.StreamingOutputCall(request)
self.assertFalse(call.cancelled())
response = await call.read()
self.assertIs(type(response),
messages_pb2.StreamingOutputCallResponse)
self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body))
response = await call.read()
self.assertIs(type(response),
messages_pb2.StreamingOutputCallResponse)
self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body))
self.assertTrue(call.cancel())
self.assertFalse(call.cancel())
self.assertFalse(call.cancel())
self.assertFalse(call.cancel())
self.assertTrue(call.cancel())
self.assertFalse(call.cancel())
self.assertFalse(call.cancel())
self.assertFalse(call.cancel())
with self.assertRaises(asyncio.CancelledError):
await call.read()
with self.assertRaises(asyncio.CancelledError):
await call.read()
async def test_early_cancel_unary_stream(self):
"""Test cancellation before receiving messages."""
async with aio.insecure_channel(self._server_target) as channel:
stub = test_pb2_grpc.TestServiceStub(channel)
# Prepares the request
request = messages_pb2.StreamingOutputCallRequest()
for _ in range(_NUM_STREAM_RESPONSES):
request.response_parameters.append(
messages_pb2.ResponseParameters(
size=_RESPONSE_PAYLOAD_SIZE,
interval_us=_RESPONSE_INTERVAL_US,
))
# Prepares the request
request = messages_pb2.StreamingOutputCallRequest()
for _ in range(_NUM_STREAM_RESPONSES):
request.response_parameters.append(
messages_pb2.ResponseParameters(
size=_RESPONSE_PAYLOAD_SIZE,
interval_us=_RESPONSE_INTERVAL_US,
))
# Invokes the actual RPC
call = stub.StreamingOutputCall(request)
# Invokes the actual RPC
call = self._stub.StreamingOutputCall(request)
self.assertFalse(call.cancelled())
self.assertTrue(call.cancel())
self.assertFalse(call.cancel())
self.assertFalse(call.cancelled())
self.assertTrue(call.cancel())
self.assertFalse(call.cancel())
with self.assertRaises(asyncio.CancelledError):
await call.read()
with self.assertRaises(asyncio.CancelledError):
await call.read()
self.assertTrue(call.cancelled())
self.assertTrue(call.cancelled())
self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await
call.details())
self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await
call.details())
async def test_late_cancel_unary_stream(self):
"""Test cancellation after received all messages."""
async with aio.insecure_channel(self._server_target) as channel:
stub = test_pb2_grpc.TestServiceStub(channel)
# Prepares the request
request = messages_pb2.StreamingOutputCallRequest()
for _ in range(_NUM_STREAM_RESPONSES):
request.response_parameters.append(
messages_pb2.ResponseParameters(
size=_RESPONSE_PAYLOAD_SIZE,))
# Prepares the request
request = messages_pb2.StreamingOutputCallRequest()
for _ in range(_NUM_STREAM_RESPONSES):
request.response_parameters.append(
messages_pb2.ResponseParameters(
size=_RESPONSE_PAYLOAD_SIZE,))
# Invokes the actual RPC
call = self._stub.StreamingOutputCall(request)
# Invokes the actual RPC
call = stub.StreamingOutputCall(request)
for _ in range(_NUM_STREAM_RESPONSES):
response = await call.read()
self.assertIs(type(response),
messages_pb2.StreamingOutputCallResponse)
self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
for _ in range(_NUM_STREAM_RESPONSES):
response = await call.read()
self.assertIs(type(response),
messages_pb2.StreamingOutputCallResponse)
self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
# After all messages received, it is possible that the final state
# is received or on its way. It's basically a data race, so our
# expectation here is do not crash :)
call.cancel()
self.assertIn(await call.code(),
[grpc.StatusCode.OK, grpc.StatusCode.CANCELLED])
# After all messages received, it is possible that the final state
# is received or on its way. It's basically a data race, so our
# expectation here is do not crash :)
call.cancel()
self.assertIn(await call.code(),
[grpc.StatusCode.OK, grpc.StatusCode.CANCELLED])
async def test_too_many_reads_unary_stream(self):
"""Test calling read after received all messages fails."""
async with aio.insecure_channel(self._server_target) as channel:
stub = test_pb2_grpc.TestServiceStub(channel)
# Prepares the request
request = messages_pb2.StreamingOutputCallRequest()
for _ in range(_NUM_STREAM_RESPONSES):
request.response_parameters.append(
messages_pb2.ResponseParameters(
size=_RESPONSE_PAYLOAD_SIZE,))
# Prepares the request
request = messages_pb2.StreamingOutputCallRequest()
for _ in range(_NUM_STREAM_RESPONSES):
request.response_parameters.append(
messages_pb2.ResponseParameters(
size=_RESPONSE_PAYLOAD_SIZE,))
# Invokes the actual RPC
call = stub.StreamingOutputCall(request)
# Invokes the actual RPC
call = self._stub.StreamingOutputCall(request)
for _ in range(_NUM_STREAM_RESPONSES):
response = await call.read()
self.assertIs(type(response),
messages_pb2.StreamingOutputCallResponse)
self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
self.assertIs(await call.read(), aio.EOF)
for _ in range(_NUM_STREAM_RESPONSES):
response = await call.read()
self.assertIs(type(response),
messages_pb2.StreamingOutputCallResponse)
self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
self.assertIs(await call.read(), aio.EOF)
# After the RPC is finished, further reads will lead to exception.
self.assertEqual(await call.code(), grpc.StatusCode.OK)
self.assertIs(await call.read(), aio.EOF)
# After the RPC is finished, further reads will lead to exception.
self.assertEqual(await call.code(), grpc.StatusCode.OK)
self.assertIs(await call.read(), aio.EOF)
async def test_unary_stream_async_generator(self):
"""Sunny day test case for unary_stream."""
async with aio.insecure_channel(self._server_target) as channel:
stub = test_pb2_grpc.TestServiceStub(channel)
# Prepares the request
request = messages_pb2.StreamingOutputCallRequest()
for _ in range(_NUM_STREAM_RESPONSES):
request.response_parameters.append(
messages_pb2.ResponseParameters(
size=_RESPONSE_PAYLOAD_SIZE,))
# Prepares the request
request = messages_pb2.StreamingOutputCallRequest()
for _ in range(_NUM_STREAM_RESPONSES):
request.response_parameters.append(
messages_pb2.ResponseParameters(
size=_RESPONSE_PAYLOAD_SIZE,))
# Invokes the actual RPC
call = stub.StreamingOutputCall(request)
self.assertFalse(call.cancelled())
# Invokes the actual RPC
call = self._stub.StreamingOutputCall(request)
self.assertFalse(call.cancelled())
async for response in call:
self.assertIs(type(response),
messages_pb2.StreamingOutputCallResponse)
self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
async for response in call:
self.assertIs(type(response),
messages_pb2.StreamingOutputCallResponse)
self.assertEqual(_RESPONSE_PAYLOAD_SIZE,
len(response.payload.body))
self.assertEqual(await call.code(), grpc.StatusCode.OK)
self.assertEqual(await call.code(), grpc.StatusCode.OK)
async def test_cancel_unary_stream_in_task_using_read(self):
async with aio.insecure_channel(self._server_target) as channel:
stub = test_pb2_grpc.TestServiceStub(channel)
coro_started = asyncio.Event()
coro_started = asyncio.Event()
# Configs the server method to block forever
request = messages_pb2.StreamingOutputCallRequest()
request.response_parameters.append(
messages_pb2.ResponseParameters(
size=_RESPONSE_PAYLOAD_SIZE,
interval_us=_INFINITE_INTERVAL_US,
))
# Configs the server method to block forever
request = messages_pb2.StreamingOutputCallRequest()
request.response_parameters.append(
messages_pb2.ResponseParameters(
size=_RESPONSE_PAYLOAD_SIZE,
interval_us=_INFINITE_INTERVAL_US,
))
# Invokes the actual RPC
call = stub.StreamingOutputCall(request)
# Invokes the actual RPC
call = self._stub.StreamingOutputCall(request)
async def another_coro():
coro_started.set()
await call.read()
async def another_coro():
coro_started.set()
await call.read()
task = self.loop.create_task(another_coro())
await coro_started.wait()
task = self.loop.create_task(another_coro())
await coro_started.wait()
self.assertFalse(task.done())
task.cancel()
self.assertFalse(task.done())
task.cancel()
self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
with self.assertRaises(asyncio.CancelledError):
await task
with self.assertRaises(asyncio.CancelledError):
await task
async def test_cancel_unary_stream_in_task_using_async_for(self):
async with aio.insecure_channel(self._server_target) as channel:
stub = test_pb2_grpc.TestServiceStub(channel)
coro_started = asyncio.Event()
coro_started = asyncio.Event()
# Configs the server method to block forever
request = messages_pb2.StreamingOutputCallRequest()
request.response_parameters.append(
messages_pb2.ResponseParameters(
size=_RESPONSE_PAYLOAD_SIZE,
interval_us=_INFINITE_INTERVAL_US,
))
# Configs the server method to block forever
request = messages_pb2.StreamingOutputCallRequest()
request.response_parameters.append(
messages_pb2.ResponseParameters(
size=_RESPONSE_PAYLOAD_SIZE,
interval_us=_INFINITE_INTERVAL_US,
))
# Invokes the actual RPC
call = stub.StreamingOutputCall(request)
# Invokes the actual RPC
call = self._stub.StreamingOutputCall(request)
async def another_coro():
coro_started.set()
async for _ in call:
pass
async def another_coro():
coro_started.set()
async for _ in call:
pass
task = self.loop.create_task(another_coro())
await coro_started.wait()
task = self.loop.create_task(another_coro())
await coro_started.wait()
self.assertFalse(task.done())
task.cancel()
self.assertFalse(task.done())
task.cancel()
self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
self.assertEqual(grpc.StatusCode.CANCELLED, await call.code())
with self.assertRaises(asyncio.CancelledError):
await task
with self.assertRaises(asyncio.CancelledError):
await task
def test_call_credentials(self):
@ -445,16 +385,7 @@ class TestUnaryStreamCall(AioTestBase):
self.loop.run_until_complete(coro())
class TestStreamUnaryCall(AioTestBase):
async def setUp(self):
self._server_target, self._server = await start_test_server()
self._channel = aio.insecure_channel(self._server_target)
self._stub = test_pb2_grpc.TestServiceStub(self._channel)
async def tearDown(self):
await self._channel.close()
await self._server.stop(None)
class TestStreamUnaryCall(_MulticallableTestMixin, AioTestBase):
async def test_cancel_stream_unary(self):
call = self._stub.StreamingInputCall()
@ -564,16 +495,7 @@ _STREAM_OUTPUT_REQUEST_ONE_RESPONSE.response_parameters.append(
messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE))
class TestStreamStreamCall(AioTestBase):
async def setUp(self):
self._server_target, self._server = await start_test_server()
self._channel = aio.insecure_channel(self._server_target)
self._stub = test_pb2_grpc.TestServiceStub(self._channel)
async def tearDown(self):
await self._channel.close()
await self._server.stop(None)
class TestStreamStreamCall(_MulticallableTestMixin, AioTestBase):
async def test_cancel(self):
# Invokes the actual RPC

Loading…
Cancel
Save