From 21670ed1a8c79726e7073e9622d262eb6c4eb07c Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 16 Jan 2020 11:59:36 -0800 Subject: [PATCH] Add time remaining test case --- .../grpcio_tests/tests_aio/unit/call_test.py | 67 ++++++++++++------- .../tests_aio/unit/done_callback_test.py | 10 +-- 2 files changed, 49 insertions(+), 28 deletions(-) diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index 7a71ddcd317..6fc5b43dec1 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -120,7 +120,7 @@ class TestUnaryUnaryCall(_MulticallableTestMixin, AioTestBase): self.assertTrue(call.cancelled()) self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED) self.assertEqual(await call.details(), - 'Locally cancelled by application!') + 'Locally cancelled by application!') async def test_cancel_unary_unary_in_task(self): coro_started = asyncio.Event() @@ -159,14 +159,13 @@ class TestUnaryStreamCall(_MulticallableTestMixin, AioTestBase): self.assertFalse(call.cancelled()) response = await call.read() - self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) + self.assertIs(type(response), messages_pb2.StreamingOutputCallResponse) self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) self.assertTrue(call.cancel()) self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await - call.details()) + call.details()) self.assertFalse(call.cancel()) with self.assertRaises(asyncio.CancelledError): @@ -188,8 +187,7 @@ class TestUnaryStreamCall(_MulticallableTestMixin, AioTestBase): self.assertFalse(call.cancelled()) response = await call.read() - self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) + self.assertIs(type(response), messages_pb2.StreamingOutputCallResponse) self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) self.assertTrue(call.cancel()) @@ -225,7 +223,7 @@ class TestUnaryStreamCall(_MulticallableTestMixin, AioTestBase): self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await - call.details()) + call.details()) async def test_late_cancel_unary_stream(self): """Test cancellation after received all messages.""" @@ -233,8 +231,7 @@ class TestUnaryStreamCall(_MulticallableTestMixin, AioTestBase): request = messages_pb2.StreamingOutputCallRequest() for _ in range(_NUM_STREAM_RESPONSES): request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE,)) + messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE,)) # Invokes the actual RPC call = self._stub.StreamingOutputCall(request) @@ -242,16 +239,15 @@ class TestUnaryStreamCall(_MulticallableTestMixin, AioTestBase): for _ in range(_NUM_STREAM_RESPONSES): response = await call.read() self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, - len(response.payload.body)) + messages_pb2.StreamingOutputCallResponse) + self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) # After all messages received, it is possible that the final state # is received or on its way. It's basically a data race, so our # expectation here is do not crash :) call.cancel() self.assertIn(await call.code(), - [grpc.StatusCode.OK, grpc.StatusCode.CANCELLED]) + [grpc.StatusCode.OK, grpc.StatusCode.CANCELLED]) async def test_too_many_reads_unary_stream(self): """Test calling read after received all messages fails.""" @@ -259,8 +255,7 @@ class TestUnaryStreamCall(_MulticallableTestMixin, AioTestBase): request = messages_pb2.StreamingOutputCallRequest() for _ in range(_NUM_STREAM_RESPONSES): request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE,)) + messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE,)) # Invokes the actual RPC call = self._stub.StreamingOutputCall(request) @@ -268,9 +263,8 @@ class TestUnaryStreamCall(_MulticallableTestMixin, AioTestBase): for _ in range(_NUM_STREAM_RESPONSES): response = await call.read() self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, - len(response.payload.body)) + messages_pb2.StreamingOutputCallResponse) + self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) self.assertIs(await call.read(), aio.EOF) # After the RPC is finished, further reads will lead to exception. @@ -283,8 +277,7 @@ class TestUnaryStreamCall(_MulticallableTestMixin, AioTestBase): request = messages_pb2.StreamingOutputCallRequest() for _ in range(_NUM_STREAM_RESPONSES): request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE,)) + messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE,)) # Invokes the actual RPC call = self._stub.StreamingOutputCall(request) @@ -292,9 +285,8 @@ class TestUnaryStreamCall(_MulticallableTestMixin, AioTestBase): async for response in call: self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, - len(response.payload.body)) + messages_pb2.StreamingOutputCallResponse) + self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) self.assertEqual(await call.code(), grpc.StatusCode.OK) @@ -384,6 +376,35 @@ class TestUnaryStreamCall(_MulticallableTestMixin, AioTestBase): self.loop.run_until_complete(coro()) + async def test_time_remaining(self): + request = messages_pb2.StreamingOutputCallRequest() + # First message comes back immediately + request.response_parameters.append( + messages_pb2.ResponseParameters( + size=_RESPONSE_PAYLOAD_SIZE, + interval_us=_INFINITE_INTERVAL_US, + )) + # Second message comes back after a unit of wait time + request.response_parameters.append( + messages_pb2.ResponseParameters( + size=_RESPONSE_PAYLOAD_SIZE, + interval_us=_RESPONSE_INTERVAL_US, + )) + + call = self._stub.StreamingOutputCall( + request, timeout=test_constants.SHORT_TIMEOUT*2) + + response = await call.read() + self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + + remained_time = call.time_remaining() + self.assertGreater(remained_time, test_constants.SHORT_TIMEOUT//2) + self.assertLess(remained_time, test_constants.SHORT_TIMEOUT*3//2) + + response = await call.read() + self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + + self.assertEqual(grpc.StatusCode.OK, await call.code()) class TestStreamUnaryCall(_MulticallableTestMixin, AioTestBase): diff --git a/src/python/grpcio_tests/tests_aio/unit/done_callback_test.py b/src/python/grpcio_tests/tests_aio/unit/done_callback_test.py index f5f6a75974a..93eddcf0917 100644 --- a/src/python/grpcio_tests/tests_aio/unit/done_callback_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/done_callback_test.py @@ -13,7 +13,6 @@ # limitations under the License. """Testing the done callbacks mechanism.""" - # Copyright 2019 The gRPC Authors # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -41,7 +40,6 @@ from tests.unit.framework.common import test_constants from src.proto.grpc.testing import messages_pb2, test_pb2_grpc from tests_aio.unit._test_server import start_test_server - _NUM_STREAM_RESPONSES = 5 _REQUEST_PAYLOAD_SIZE = 7 _RESPONSE_PAYLOAD_SIZE = 42 @@ -49,10 +47,12 @@ _RESPONSE_PAYLOAD_SIZE = 42 def _inject_callbacks(call): first_callback_ran = asyncio.Event() + def first_callback(unused_call): first_callback_ran.set() second_callback_ran = asyncio.Event() + def second_callback(unused_call): second_callback_ran.set() @@ -61,9 +61,9 @@ def _inject_callbacks(call): async def validation(): await asyncio.wait_for( - asyncio.gather(first_callback_ran.wait(), second_callback_ran.wait()), - test_constants.SHORT_TIMEOUT - ) + asyncio.gather(first_callback_ran.wait(), + second_callback_ran.wait()), + test_constants.SHORT_TIMEOUT) return validation()