diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi index 37ba5f0d346..1a3dd46dcf4 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi @@ -170,6 +170,7 @@ cdef grpc_error* asyncio_resolve( char* host, char* port, grpc_resolved_addresses** res) with gil: + _LOGGER.debug('asyncio_resolve') result = native_socket.getaddrinfo(host, port) res[0] = tuples_to_resolvaddr(result) @@ -178,6 +179,7 @@ cdef void asyncio_resolve_async( grpc_custom_resolver* grpc_resolver, char* host, char* port) with gil: + _LOGGER.debug('asyncio_resolve_async') resolver = _AsyncioResolver.create(grpc_resolver) resolver.resolve(host, port) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi index 7d47fa77b00..7897368bc83 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi @@ -32,7 +32,9 @@ cdef class _AsyncioResolver: async def _async_resolve(self, bytes host, bytes port): self._task_resolve = None try: + _LOGGER.debug('_AsyncioResolver before') resolved = await grpc_aio_loop().getaddrinfo(host, port) + _LOGGER.debug('_AsyncioResolver after') except Exception as e: grpc_custom_resolve_callback( self._grpc_resolver, @@ -50,6 +52,7 @@ cdef class _AsyncioResolver: cdef void resolve(self, char* host, char* port): assert not self._task_resolve + _LOGGER.debug('_AsyncioResolver resolve') self._task_resolve = grpc_aio_loop().create_task( self._async_resolve(host, port) ) diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index e47c00a62ab..a9ff5f5dca8 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -49,32 +49,32 @@ class _MulticallableTestMixin(): class TestUnaryUnaryCall(_MulticallableTestMixin, AioTestBase): - async def test_call_to_string(self): - call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) + # async def test_call_to_string(self): + # call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) - self.assertTrue(str(call) is not None) - self.assertTrue(repr(call) is not None) + # self.assertTrue(str(call) is not None) + # self.assertTrue(repr(call) is not None) - response = await call + # response = await call - self.assertTrue(str(call) is not None) - self.assertTrue(repr(call) is not None) + # self.assertTrue(str(call) is not None) + # self.assertTrue(repr(call) is not None) - async def test_call_ok(self): - call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) + # async def test_call_ok(self): + # call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) - self.assertFalse(call.done()) + # self.assertFalse(call.done()) - response = await call + # response = await call - self.assertTrue(call.done()) - self.assertIsInstance(response, messages_pb2.SimpleResponse) - self.assertEqual(await call.code(), grpc.StatusCode.OK) + # self.assertTrue(call.done()) + # self.assertIsInstance(response, messages_pb2.SimpleResponse) + # self.assertEqual(await call.code(), grpc.StatusCode.OK) - # Response is cached at call object level, reentrance - # returns again the same response - response_retry = await call - self.assertIs(response, response_retry) + # # Response is cached at call object level, reentrance + # # returns again the same response + # response_retry = await call + # self.assertIs(response, response_retry) async def test_call_rpc_error(self): async with aio.insecure_channel(_UNREACHABLE_TARGET) as channel: @@ -91,668 +91,664 @@ class TestUnaryUnaryCall(_MulticallableTestMixin, AioTestBase): self.assertTrue(call.done()) self.assertEqual(grpc.StatusCode.UNAVAILABLE, await call.code()) - async def test_call_code_awaitable(self): - call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) - self.assertEqual(await call.code(), grpc.StatusCode.OK) - async def test_call_details_awaitable(self): - call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) - self.assertEqual('', await call.details()) +# async def test_call_code_awaitable(self): +# call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) +# self.assertEqual(await call.code(), grpc.StatusCode.OK) - async def test_call_initial_metadata_awaitable(self): - call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) - self.assertEqual((), await call.initial_metadata()) +# async def test_call_details_awaitable(self): +# call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) +# self.assertEqual('', await call.details()) - async def test_call_trailing_metadata_awaitable(self): - call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) - self.assertEqual((), await call.trailing_metadata()) +# async def test_call_initial_metadata_awaitable(self): +# call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) +# self.assertEqual((), await call.initial_metadata()) - async def test_call_initial_metadata_cancelable(self): - coro_started = asyncio.Event() - call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) +# async def test_call_trailing_metadata_awaitable(self): +# call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) +# self.assertEqual((), await call.trailing_metadata()) - async def coro(): - coro_started.set() - await call.initial_metadata() +# async def test_call_initial_metadata_cancelable(self): +# coro_started = asyncio.Event() +# call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) - task = self.loop.create_task(coro()) - await coro_started.wait() - task.cancel() +# async def coro(): +# coro_started.set() +# await call.initial_metadata() - # Test that initial metadata can still be asked thought - # a cancellation happened with the previous task - self.assertEqual((), await call.initial_metadata()) +# task = self.loop.create_task(coro()) +# await coro_started.wait() +# task.cancel() - async def test_call_initial_metadata_multiple_waiters(self): - call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) +# # Test that initial metadata can still be asked thought +# # a cancellation happened with the previous task +# self.assertEqual((), await call.initial_metadata()) - async def coro(): - return await call.initial_metadata() +# async def test_call_initial_metadata_multiple_waiters(self): +# call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) - task1 = self.loop.create_task(coro()) - task2 = self.loop.create_task(coro()) +# async def coro(): +# return await call.initial_metadata() - await call +# task1 = self.loop.create_task(coro()) +# task2 = self.loop.create_task(coro()) - self.assertEqual([(), ()], await asyncio.gather(*[task1, task2])) +# await call - async def test_call_code_cancelable(self): - coro_started = asyncio.Event() - call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) +# self.assertEqual([(), ()], await asyncio.gather(*[task1, task2])) - async def coro(): - coro_started.set() - await call.code() +# async def test_call_code_cancelable(self): +# coro_started = asyncio.Event() +# call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) - task = self.loop.create_task(coro()) - await coro_started.wait() - task.cancel() +# async def coro(): +# coro_started.set() +# await call.code() - # Test that code can still be asked thought - # a cancellation happened with the previous task - self.assertEqual(grpc.StatusCode.OK, await call.code()) +# task = self.loop.create_task(coro()) +# await coro_started.wait() +# task.cancel() - async def test_call_code_multiple_waiters(self): - call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) +# # Test that code can still be asked thought +# # a cancellation happened with the previous task +# self.assertEqual(grpc.StatusCode.OK, await call.code()) - async def coro(): - return await call.code() +# async def test_call_code_multiple_waiters(self): +# call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) - task1 = self.loop.create_task(coro()) - task2 = self.loop.create_task(coro()) +# async def coro(): +# return await call.code() - await call +# task1 = self.loop.create_task(coro()) +# task2 = self.loop.create_task(coro()) - self.assertEqual([grpc.StatusCode.OK, grpc.StatusCode.OK], await - asyncio.gather(task1, task2)) +# await call - async def test_cancel_unary_unary(self): - call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) +# self.assertEqual([grpc.StatusCode.OK, grpc.StatusCode.OK], await +# asyncio.gather(task1, task2)) - self.assertFalse(call.cancelled()) +# async def test_cancel_unary_unary(self): +# call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) - self.assertTrue(call.cancel()) - self.assertFalse(call.cancel()) +# self.assertFalse(call.cancelled()) - with self.assertRaises(asyncio.CancelledError): - await call +# self.assertTrue(call.cancel()) +# self.assertFalse(call.cancel()) - # The info in the RpcError should match the info in Call object. - self.assertTrue(call.cancelled()) - self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED) - self.assertEqual(await call.details(), - 'Locally cancelled by application!') +# with self.assertRaises(asyncio.CancelledError): +# await call - async def test_cancel_unary_unary_in_task(self): - coro_started = asyncio.Event() - call = self._stub.EmptyCall(messages_pb2.SimpleRequest()) +# # The info in the RpcError should match the info in Call object. +# self.assertTrue(call.cancelled()) +# self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED) +# self.assertEqual(await call.details(), +# 'Locally cancelled by application!') - async def another_coro(): - coro_started.set() - await call +# async def test_cancel_unary_unary_in_task(self): +# coro_started = asyncio.Event() +# call = self._stub.EmptyCall(messages_pb2.SimpleRequest()) - task = self.loop.create_task(another_coro()) - await coro_started.wait() +# async def another_coro(): +# coro_started.set() +# await call - self.assertFalse(task.done()) - task.cancel() +# task = self.loop.create_task(another_coro()) +# await coro_started.wait() - self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) +# self.assertFalse(task.done()) +# task.cancel() - with self.assertRaises(asyncio.CancelledError): - await task +# self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) +# with self.assertRaises(asyncio.CancelledError): +# await task -class TestUnaryStreamCall(_MulticallableTestMixin, AioTestBase): +# class TestUnaryStreamCall(_MulticallableTestMixin, AioTestBase): - async def test_cancel_unary_stream(self): - # Prepares the request - request = messages_pb2.StreamingOutputCallRequest() - for _ in range(_NUM_STREAM_RESPONSES): - request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE, - interval_us=_RESPONSE_INTERVAL_US, - )) +# async def test_cancel_unary_stream(self): +# # Prepares the request +# request = messages_pb2.StreamingOutputCallRequest() +# for _ in range(_NUM_STREAM_RESPONSES): +# request.response_parameters.append( +# messages_pb2.ResponseParameters( +# size=_RESPONSE_PAYLOAD_SIZE, +# interval_us=_RESPONSE_INTERVAL_US, +# )) + +# # Invokes the actual RPC +# call = self._stub.StreamingOutputCall(request) +# self.assertFalse(call.cancelled()) + +# response = await call.read() +# self.assertIs(type(response), messages_pb2.StreamingOutputCallResponse) +# self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + +# self.assertTrue(call.cancel()) +# self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) +# self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await +# call.details()) +# self.assertFalse(call.cancel()) + +# with self.assertRaises(asyncio.CancelledError): +# await call.read() +# self.assertTrue(call.cancelled()) + +# async def test_multiple_cancel_unary_stream(self): +# # Prepares the request +# request = messages_pb2.StreamingOutputCallRequest() +# for _ in range(_NUM_STREAM_RESPONSES): +# request.response_parameters.append( +# messages_pb2.ResponseParameters( +# size=_RESPONSE_PAYLOAD_SIZE, +# interval_us=_RESPONSE_INTERVAL_US, +# )) + +# # Invokes the actual RPC +# call = self._stub.StreamingOutputCall(request) +# self.assertFalse(call.cancelled()) + +# response = await call.read() +# self.assertIs(type(response), messages_pb2.StreamingOutputCallResponse) +# self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + +# self.assertTrue(call.cancel()) +# self.assertFalse(call.cancel()) +# self.assertFalse(call.cancel()) +# self.assertFalse(call.cancel()) + +# with self.assertRaises(asyncio.CancelledError): +# await call.read() + +# async def test_early_cancel_unary_stream(self): +# """Test cancellation before receiving messages.""" +# # Prepares the request +# request = messages_pb2.StreamingOutputCallRequest() +# for _ in range(_NUM_STREAM_RESPONSES): +# request.response_parameters.append( +# messages_pb2.ResponseParameters( +# size=_RESPONSE_PAYLOAD_SIZE, +# interval_us=_RESPONSE_INTERVAL_US, +# )) + +# # Invokes the actual RPC +# call = self._stub.StreamingOutputCall(request) + +# self.assertFalse(call.cancelled()) +# self.assertTrue(call.cancel()) +# self.assertFalse(call.cancel()) + +# with self.assertRaises(asyncio.CancelledError): +# await call.read() + +# self.assertTrue(call.cancelled()) + +# self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) +# self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await +# call.details()) + +# async def test_late_cancel_unary_stream(self): +# """Test cancellation after received all messages.""" +# # Prepares the request +# request = messages_pb2.StreamingOutputCallRequest() +# for _ in range(_NUM_STREAM_RESPONSES): +# request.response_parameters.append( +# messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE,)) + +# # Invokes the actual RPC +# call = self._stub.StreamingOutputCall(request) + +# for _ in range(_NUM_STREAM_RESPONSES): +# response = await call.read() +# self.assertIs(type(response), +# messages_pb2.StreamingOutputCallResponse) +# self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + +# # After all messages received, it is possible that the final state +# # is received or on its way. It's basically a data race, so our +# # expectation here is do not crash :) +# call.cancel() +# self.assertIn(await call.code(), +# [grpc.StatusCode.OK, grpc.StatusCode.CANCELLED]) + +# async def test_too_many_reads_unary_stream(self): +# """Test calling read after received all messages fails.""" +# # Prepares the request +# request = messages_pb2.StreamingOutputCallRequest() +# for _ in range(_NUM_STREAM_RESPONSES): +# request.response_parameters.append( +# messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE,)) + +# # Invokes the actual RPC +# call = self._stub.StreamingOutputCall(request) + +# for _ in range(_NUM_STREAM_RESPONSES): +# response = await call.read() +# self.assertIs(type(response), +# messages_pb2.StreamingOutputCallResponse) +# self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) +# self.assertIs(await call.read(), aio.EOF) + +# # After the RPC is finished, further reads will lead to exception. +# self.assertEqual(await call.code(), grpc.StatusCode.OK) +# self.assertIs(await call.read(), aio.EOF) + +# async def test_unary_stream_async_generator(self): +# """Sunny day test case for unary_stream.""" +# # Prepares the request +# request = messages_pb2.StreamingOutputCallRequest() +# for _ in range(_NUM_STREAM_RESPONSES): +# request.response_parameters.append( +# messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE,)) + +# # Invokes the actual RPC +# call = self._stub.StreamingOutputCall(request) +# self.assertFalse(call.cancelled()) + +# async for response in call: +# self.assertIs(type(response), +# messages_pb2.StreamingOutputCallResponse) +# self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + +# self.assertEqual(await call.code(), grpc.StatusCode.OK) + +# async def test_cancel_unary_stream_in_task_using_read(self): +# coro_started = asyncio.Event() + +# # Configs the server method to block forever +# request = messages_pb2.StreamingOutputCallRequest() +# request.response_parameters.append( +# messages_pb2.ResponseParameters( +# size=_RESPONSE_PAYLOAD_SIZE, +# interval_us=_INFINITE_INTERVAL_US, +# )) + +# # Invokes the actual RPC +# call = self._stub.StreamingOutputCall(request) + +# async def another_coro(): +# coro_started.set() +# await call.read() + +# task = self.loop.create_task(another_coro()) +# await coro_started.wait() + +# self.assertFalse(task.done()) +# task.cancel() + +# self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) + +# with self.assertRaises(asyncio.CancelledError): +# await task + +# async def test_cancel_unary_stream_in_task_using_async_for(self): +# coro_started = asyncio.Event() + +# # Configs the server method to block forever +# request = messages_pb2.StreamingOutputCallRequest() +# request.response_parameters.append( +# messages_pb2.ResponseParameters( +# size=_RESPONSE_PAYLOAD_SIZE, +# interval_us=_INFINITE_INTERVAL_US, +# )) + +# # Invokes the actual RPC +# call = self._stub.StreamingOutputCall(request) + +# async def another_coro(): +# coro_started.set() +# async for _ in call: +# pass + +# task = self.loop.create_task(another_coro()) +# await coro_started.wait() + +# self.assertFalse(task.done()) +# task.cancel() + +# self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) + +# with self.assertRaises(asyncio.CancelledError): +# await task - # Invokes the actual RPC - call = self._stub.StreamingOutputCall(request) - self.assertFalse(call.cancelled()) +# def test_call_credentials(self): - response = await call.read() - self.assertIs(type(response), messages_pb2.StreamingOutputCallResponse) - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) +# class DummyAuth(grpc.AuthMetadataPlugin): - self.assertTrue(call.cancel()) - self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) - self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await - call.details()) - self.assertFalse(call.cancel()) - - with self.assertRaises(asyncio.CancelledError): - await call.read() - self.assertTrue(call.cancelled()) - - async def test_multiple_cancel_unary_stream(self): - # Prepares the request - request = messages_pb2.StreamingOutputCallRequest() - for _ in range(_NUM_STREAM_RESPONSES): - request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE, - interval_us=_RESPONSE_INTERVAL_US, - )) - - # Invokes the actual RPC - call = self._stub.StreamingOutputCall(request) - self.assertFalse(call.cancelled()) - - response = await call.read() - self.assertIs(type(response), messages_pb2.StreamingOutputCallResponse) - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) - - self.assertTrue(call.cancel()) - self.assertFalse(call.cancel()) - self.assertFalse(call.cancel()) - self.assertFalse(call.cancel()) - - with self.assertRaises(asyncio.CancelledError): - await call.read() - - async def test_early_cancel_unary_stream(self): - """Test cancellation before receiving messages.""" - # Prepares the request - request = messages_pb2.StreamingOutputCallRequest() - for _ in range(_NUM_STREAM_RESPONSES): - request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE, - interval_us=_RESPONSE_INTERVAL_US, - )) - - # Invokes the actual RPC - call = self._stub.StreamingOutputCall(request) - - self.assertFalse(call.cancelled()) - self.assertTrue(call.cancel()) - self.assertFalse(call.cancel()) - - with self.assertRaises(asyncio.CancelledError): - await call.read() - - self.assertTrue(call.cancelled()) - - self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) - self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await - call.details()) - - async def test_late_cancel_unary_stream(self): - """Test cancellation after received all messages.""" - # Prepares the request - request = messages_pb2.StreamingOutputCallRequest() - for _ in range(_NUM_STREAM_RESPONSES): - request.response_parameters.append( - messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE,)) - - # Invokes the actual RPC - call = self._stub.StreamingOutputCall(request) - - for _ in range(_NUM_STREAM_RESPONSES): - response = await call.read() - self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) - - # After all messages received, it is possible that the final state - # is received or on its way. It's basically a data race, so our - # expectation here is do not crash :) - call.cancel() - self.assertIn(await call.code(), - [grpc.StatusCode.OK, grpc.StatusCode.CANCELLED]) - - async def test_too_many_reads_unary_stream(self): - """Test calling read after received all messages fails.""" - # Prepares the request - request = messages_pb2.StreamingOutputCallRequest() - for _ in range(_NUM_STREAM_RESPONSES): - request.response_parameters.append( - messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE,)) - - # Invokes the actual RPC - call = self._stub.StreamingOutputCall(request) - - for _ in range(_NUM_STREAM_RESPONSES): - response = await call.read() - self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) - self.assertIs(await call.read(), aio.EOF) - - # After the RPC is finished, further reads will lead to exception. - self.assertEqual(await call.code(), grpc.StatusCode.OK) - self.assertIs(await call.read(), aio.EOF) - - async def test_unary_stream_async_generator(self): - """Sunny day test case for unary_stream.""" - # Prepares the request - request = messages_pb2.StreamingOutputCallRequest() - for _ in range(_NUM_STREAM_RESPONSES): - request.response_parameters.append( - messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE,)) - - # Invokes the actual RPC - call = self._stub.StreamingOutputCall(request) - self.assertFalse(call.cancelled()) - - async for response in call: - self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) - - self.assertEqual(await call.code(), grpc.StatusCode.OK) - - async def test_cancel_unary_stream_in_task_using_read(self): - coro_started = asyncio.Event() - - # Configs the server method to block forever - request = messages_pb2.StreamingOutputCallRequest() - request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE, - interval_us=_INFINITE_INTERVAL_US, - )) - - # Invokes the actual RPC - call = self._stub.StreamingOutputCall(request) - - async def another_coro(): - coro_started.set() - await call.read() - - task = self.loop.create_task(another_coro()) - await coro_started.wait() - - self.assertFalse(task.done()) - task.cancel() - - self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) - - with self.assertRaises(asyncio.CancelledError): - await task - - async def test_cancel_unary_stream_in_task_using_async_for(self): - coro_started = asyncio.Event() - - # Configs the server method to block forever - request = messages_pb2.StreamingOutputCallRequest() - request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE, - interval_us=_INFINITE_INTERVAL_US, - )) - - # Invokes the actual RPC - call = self._stub.StreamingOutputCall(request) - - async def another_coro(): - coro_started.set() - async for _ in call: - pass - - task = self.loop.create_task(another_coro()) - await coro_started.wait() - - self.assertFalse(task.done()) - task.cancel() - - self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) +# def __call__(self, context, callback): +# signature = context.method_name[::-1] +# callback((("test", signature),), None) - with self.assertRaises(asyncio.CancelledError): - await task +# async def coro(): +# server_target, _ = await start_test_server(secure=False) # pylint: disable=unused-variable - def test_call_credentials(self): +# async with aio.insecure_channel(server_target) as channel: +# hi = channel.unary_unary('/grpc.testing.TestService/UnaryCall', +# request_serializer=messages_pb2. +# SimpleRequest.SerializeToString, +# response_deserializer=messages_pb2. +# SimpleResponse.FromString) +# call_credentials = grpc.metadata_call_credentials(DummyAuth()) +# call = hi(messages_pb2.SimpleRequest(), +# credentials=call_credentials) +# response = await call - class DummyAuth(grpc.AuthMetadataPlugin): +# self.assertIsInstance(response, messages_pb2.SimpleResponse) +# self.assertEqual(await call.code(), grpc.StatusCode.OK) - def __call__(self, context, callback): - signature = context.method_name[::-1] - callback((("test", signature),), None) +# self.loop.run_until_complete(coro()) - async def coro(): - server_target, _ = await start_test_server(secure=False) # pylint: disable=unused-variable +# async def test_time_remaining(self): +# request = messages_pb2.StreamingOutputCallRequest() +# # First message comes back immediately +# request.response_parameters.append( +# messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE,)) +# # Second message comes back after a unit of wait time +# request.response_parameters.append( +# messages_pb2.ResponseParameters( +# size=_RESPONSE_PAYLOAD_SIZE, +# interval_us=_RESPONSE_INTERVAL_US, +# )) - async with aio.insecure_channel(server_target) as channel: - hi = channel.unary_unary('/grpc.testing.TestService/UnaryCall', - request_serializer=messages_pb2. - SimpleRequest.SerializeToString, - response_deserializer=messages_pb2. - SimpleResponse.FromString) - call_credentials = grpc.metadata_call_credentials(DummyAuth()) - call = hi(messages_pb2.SimpleRequest(), - credentials=call_credentials) - response = await call +# call = self._stub.StreamingOutputCall( +# request, timeout=test_constants.SHORT_TIMEOUT * 2) - self.assertIsInstance(response, messages_pb2.SimpleResponse) - self.assertEqual(await call.code(), grpc.StatusCode.OK) +# response = await call.read() +# self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) - self.loop.run_until_complete(coro()) +# # Should be around the same as the timeout +# remained_time = call.time_remaining() +# self.assertGreater(remained_time, test_constants.SHORT_TIMEOUT * 3 // 2) +# self.assertLess(remained_time, test_constants.SHORT_TIMEOUT * 2) - async def test_time_remaining(self): - request = messages_pb2.StreamingOutputCallRequest() - # First message comes back immediately - request.response_parameters.append( - messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE,)) - # Second message comes back after a unit of wait time - request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE, - interval_us=_RESPONSE_INTERVAL_US, - )) +# response = await call.read() +# self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) - call = self._stub.StreamingOutputCall( - request, timeout=test_constants.SHORT_TIMEOUT * 2) +# # Should be around the timeout minus a unit of wait time +# remained_time = call.time_remaining() +# self.assertGreater(remained_time, test_constants.SHORT_TIMEOUT // 2) +# self.assertLess(remained_time, test_constants.SHORT_TIMEOUT * 3 // 2) - response = await call.read() - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) +# self.assertEqual(grpc.StatusCode.OK, await call.code()) - # Should be around the same as the timeout - remained_time = call.time_remaining() - self.assertGreater(remained_time, test_constants.SHORT_TIMEOUT * 3 // 2) - self.assertLess(remained_time, test_constants.SHORT_TIMEOUT * 2) +# class TestStreamUnaryCall(_MulticallableTestMixin, AioTestBase): - response = await call.read() - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) +# async def test_cancel_stream_unary(self): +# call = self._stub.StreamingInputCall() - # Should be around the timeout minus a unit of wait time - remained_time = call.time_remaining() - self.assertGreater(remained_time, test_constants.SHORT_TIMEOUT // 2) - self.assertLess(remained_time, test_constants.SHORT_TIMEOUT * 3 // 2) +# # Prepares the request +# payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE) +# request = messages_pb2.StreamingInputCallRequest(payload=payload) - self.assertEqual(grpc.StatusCode.OK, await call.code()) +# # Sends out requests +# for _ in range(_NUM_STREAM_RESPONSES): +# await call.write(request) +# # Cancels the RPC +# self.assertFalse(call.done()) +# self.assertFalse(call.cancelled()) +# self.assertTrue(call.cancel()) +# self.assertTrue(call.cancelled()) -class TestStreamUnaryCall(_MulticallableTestMixin, AioTestBase): +# await call.done_writing() - async def test_cancel_stream_unary(self): - call = self._stub.StreamingInputCall() +# with self.assertRaises(asyncio.CancelledError): +# await call - # Prepares the request - payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE) - request = messages_pb2.StreamingInputCallRequest(payload=payload) +# async def test_early_cancel_stream_unary(self): +# call = self._stub.StreamingInputCall() - # Sends out requests - for _ in range(_NUM_STREAM_RESPONSES): - await call.write(request) +# # Cancels the RPC +# self.assertFalse(call.done()) +# self.assertFalse(call.cancelled()) +# self.assertTrue(call.cancel()) +# self.assertTrue(call.cancelled()) - # Cancels the RPC - self.assertFalse(call.done()) - self.assertFalse(call.cancelled()) - self.assertTrue(call.cancel()) - self.assertTrue(call.cancelled()) +# with self.assertRaises(asyncio.InvalidStateError): +# await call.write(messages_pb2.StreamingInputCallRequest()) - await call.done_writing() +# # Should be no-op +# await call.done_writing() - with self.assertRaises(asyncio.CancelledError): - await call +# with self.assertRaises(asyncio.CancelledError): +# await call - async def test_early_cancel_stream_unary(self): - call = self._stub.StreamingInputCall() +# async def test_write_after_done_writing(self): +# call = self._stub.StreamingInputCall() + +# # Prepares the request +# payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE) +# request = messages_pb2.StreamingInputCallRequest(payload=payload) + +# # Sends out requests +# for _ in range(_NUM_STREAM_RESPONSES): +# await call.write(request) + +# # Should be no-op +# await call.done_writing() + +# with self.assertRaises(asyncio.InvalidStateError): +# await call.write(messages_pb2.StreamingInputCallRequest()) - # Cancels the RPC - self.assertFalse(call.done()) - self.assertFalse(call.cancelled()) - self.assertTrue(call.cancel()) - self.assertTrue(call.cancelled()) +# response = await call +# self.assertIsInstance(response, messages_pb2.StreamingInputCallResponse) +# self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE, +# response.aggregated_payload_size) - with self.assertRaises(asyncio.InvalidStateError): - await call.write(messages_pb2.StreamingInputCallRequest()) +# self.assertEqual(await call.code(), grpc.StatusCode.OK) + +# async def test_error_in_async_generator(self): +# # Server will pause between responses +# request = messages_pb2.StreamingOutputCallRequest() +# request.response_parameters.append( +# messages_pb2.ResponseParameters( +# size=_RESPONSE_PAYLOAD_SIZE, +# interval_us=_RESPONSE_INTERVAL_US, +# )) - # Should be no-op - await call.done_writing() +# # We expect the request iterator to receive the exception +# request_iterator_received_the_exception = asyncio.Event() - with self.assertRaises(asyncio.CancelledError): - await call +# async def request_iterator(): +# with self.assertRaises(asyncio.CancelledError): +# for _ in range(_NUM_STREAM_RESPONSES): +# yield request +# await asyncio.sleep(test_constants.SHORT_TIMEOUT) +# request_iterator_received_the_exception.set() - async def test_write_after_done_writing(self): - call = self._stub.StreamingInputCall() +# call = self._stub.StreamingInputCall(request_iterator()) + +# # Cancel the RPC after at least one response +# async def cancel_later(): +# await asyncio.sleep(test_constants.SHORT_TIMEOUT * 2) +# call.cancel() + +# cancel_later_task = self.loop.create_task(cancel_later()) + +# # No exceptions here +# with self.assertRaises(asyncio.CancelledError): +# await call - # Prepares the request - payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE) - request = messages_pb2.StreamingInputCallRequest(payload=payload) +# await request_iterator_received_the_exception.wait() - # Sends out requests - for _ in range(_NUM_STREAM_RESPONSES): - await call.write(request) - - # Should be no-op - await call.done_writing() - - with self.assertRaises(asyncio.InvalidStateError): - await call.write(messages_pb2.StreamingInputCallRequest()) - - response = await call - self.assertIsInstance(response, messages_pb2.StreamingInputCallResponse) - self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE, - response.aggregated_payload_size) - - self.assertEqual(await call.code(), grpc.StatusCode.OK) - - async def test_error_in_async_generator(self): - # Server will pause between responses - request = messages_pb2.StreamingOutputCallRequest() - request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE, - interval_us=_RESPONSE_INTERVAL_US, - )) - - # We expect the request iterator to receive the exception - request_iterator_received_the_exception = asyncio.Event() - - async def request_iterator(): - with self.assertRaises(asyncio.CancelledError): - for _ in range(_NUM_STREAM_RESPONSES): - yield request - await asyncio.sleep(test_constants.SHORT_TIMEOUT) - request_iterator_received_the_exception.set() - - call = self._stub.StreamingInputCall(request_iterator()) - - # Cancel the RPC after at least one response - async def cancel_later(): - await asyncio.sleep(test_constants.SHORT_TIMEOUT * 2) - call.cancel() - - cancel_later_task = self.loop.create_task(cancel_later()) - - # No exceptions here - with self.assertRaises(asyncio.CancelledError): - await call - - await request_iterator_received_the_exception.wait() - - # No failures in the cancel later task! - await cancel_later_task - - -# Prepares the request that stream in a ping-pong manner. -_STREAM_OUTPUT_REQUEST_ONE_RESPONSE = messages_pb2.StreamingOutputCallRequest() -_STREAM_OUTPUT_REQUEST_ONE_RESPONSE.response_parameters.append( - messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)) - - -class TestStreamStreamCall(_MulticallableTestMixin, AioTestBase): - - async def test_cancel(self): - # Invokes the actual RPC - call = self._stub.FullDuplexCall() - - for _ in range(_NUM_STREAM_RESPONSES): - await call.write(_STREAM_OUTPUT_REQUEST_ONE_RESPONSE) - response = await call.read() - self.assertIsInstance(response, - messages_pb2.StreamingOutputCallResponse) - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) - - # Cancels the RPC - self.assertFalse(call.done()) - self.assertFalse(call.cancelled()) - self.assertTrue(call.cancel()) - self.assertTrue(call.cancelled()) - self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) - - async def test_cancel_with_pending_read(self): - call = self._stub.FullDuplexCall() - - await call.write(_STREAM_OUTPUT_REQUEST_ONE_RESPONSE) - - # Cancels the RPC - self.assertFalse(call.done()) - self.assertFalse(call.cancelled()) - self.assertTrue(call.cancel()) - self.assertTrue(call.cancelled()) - self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) - - async def test_cancel_with_ongoing_read(self): - call = self._stub.FullDuplexCall() - coro_started = asyncio.Event() - - async def read_coro(): - coro_started.set() - await call.read() - - read_task = self.loop.create_task(read_coro()) - await coro_started.wait() - self.assertFalse(read_task.done()) - - # Cancels the RPC - self.assertFalse(call.done()) - self.assertFalse(call.cancelled()) - self.assertTrue(call.cancel()) - self.assertTrue(call.cancelled()) - self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) - - async def test_early_cancel(self): - call = self._stub.FullDuplexCall() - - # Cancels the RPC - self.assertFalse(call.done()) - self.assertFalse(call.cancelled()) - self.assertTrue(call.cancel()) - self.assertTrue(call.cancelled()) - self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) - - async def test_cancel_after_done_writing(self): - call = self._stub.FullDuplexCall() - await call.done_writing() - - # Cancels the RPC - self.assertFalse(call.done()) - self.assertFalse(call.cancelled()) - self.assertTrue(call.cancel()) - self.assertTrue(call.cancelled()) - self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) - - async def test_late_cancel(self): - call = self._stub.FullDuplexCall() - await call.done_writing() - self.assertEqual(grpc.StatusCode.OK, await call.code()) - - # Cancels the RPC - self.assertTrue(call.done()) - self.assertFalse(call.cancelled()) - self.assertFalse(call.cancel()) - self.assertFalse(call.cancelled()) - - # Status is still OK - self.assertEqual(grpc.StatusCode.OK, await call.code()) - - async def test_async_generator(self): - - async def request_generator(): - yield _STREAM_OUTPUT_REQUEST_ONE_RESPONSE - yield _STREAM_OUTPUT_REQUEST_ONE_RESPONSE - - call = self._stub.FullDuplexCall(request_generator()) - async for response in call: - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) - - self.assertEqual(await call.code(), grpc.StatusCode.OK) - - async def test_too_many_reads(self): - - async def request_generator(): - for _ in range(_NUM_STREAM_RESPONSES): - yield _STREAM_OUTPUT_REQUEST_ONE_RESPONSE - - call = self._stub.FullDuplexCall(request_generator()) - for _ in range(_NUM_STREAM_RESPONSES): - response = await call.read() - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) - self.assertIs(await call.read(), aio.EOF) - - self.assertEqual(await call.code(), grpc.StatusCode.OK) - # After the RPC finished, the read should also produce EOF - self.assertIs(await call.read(), aio.EOF) - - async def test_read_write_after_done_writing(self): - call = self._stub.FullDuplexCall() - - # Writes two requests, and pending two requests - await call.write(_STREAM_OUTPUT_REQUEST_ONE_RESPONSE) - await call.write(_STREAM_OUTPUT_REQUEST_ONE_RESPONSE) - await call.done_writing() - - # Further write should fail - with self.assertRaises(asyncio.InvalidStateError): - await call.write(_STREAM_OUTPUT_REQUEST_ONE_RESPONSE) - - # But read should be unaffected - response = await call.read() - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) - response = await call.read() - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) - - self.assertEqual(await call.code(), grpc.StatusCode.OK) - - async def test_error_in_async_generator(self): - # Server will pause between responses - request = messages_pb2.StreamingOutputCallRequest() - request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE, - interval_us=_RESPONSE_INTERVAL_US, - )) - - # We expect the request iterator to receive the exception - request_iterator_received_the_exception = asyncio.Event() - - async def request_iterator(): - with self.assertRaises(asyncio.CancelledError): - for _ in range(_NUM_STREAM_RESPONSES): - yield request - await asyncio.sleep(test_constants.SHORT_TIMEOUT) - request_iterator_received_the_exception.set() - - call = self._stub.FullDuplexCall(request_iterator()) - - # Cancel the RPC after at least one response - async def cancel_later(): - await asyncio.sleep(test_constants.SHORT_TIMEOUT * 2) - call.cancel() - - cancel_later_task = self.loop.create_task(cancel_later()) - - # No exceptions here - async for response in call: - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) - - await request_iterator_received_the_exception.wait() - - self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) - # No failures in the cancel later task! - await cancel_later_task +# # No failures in the cancel later task! +# await cancel_later_task + +# # Prepares the request that stream in a ping-pong manner. +# _STREAM_OUTPUT_REQUEST_ONE_RESPONSE = messages_pb2.StreamingOutputCallRequest() +# _STREAM_OUTPUT_REQUEST_ONE_RESPONSE.response_parameters.append( +# messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)) +# class TestStreamStreamCall(_MulticallableTestMixin, AioTestBase): + +# async def test_cancel(self): +# # Invokes the actual RPC +# call = self._stub.FullDuplexCall() + +# for _ in range(_NUM_STREAM_RESPONSES): +# await call.write(_STREAM_OUTPUT_REQUEST_ONE_RESPONSE) +# response = await call.read() +# self.assertIsInstance(response, +# messages_pb2.StreamingOutputCallResponse) +# self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + +# # Cancels the RPC +# self.assertFalse(call.done()) +# self.assertFalse(call.cancelled()) +# self.assertTrue(call.cancel()) +# self.assertTrue(call.cancelled()) +# self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) + +# async def test_cancel_with_pending_read(self): +# call = self._stub.FullDuplexCall() + +# await call.write(_STREAM_OUTPUT_REQUEST_ONE_RESPONSE) + +# # Cancels the RPC +# self.assertFalse(call.done()) +# self.assertFalse(call.cancelled()) +# self.assertTrue(call.cancel()) +# self.assertTrue(call.cancelled()) +# self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) + +# async def test_cancel_with_ongoing_read(self): +# call = self._stub.FullDuplexCall() +# coro_started = asyncio.Event() + +# async def read_coro(): +# coro_started.set() +# await call.read() + +# read_task = self.loop.create_task(read_coro()) +# await coro_started.wait() +# self.assertFalse(read_task.done()) + +# # Cancels the RPC +# self.assertFalse(call.done()) +# self.assertFalse(call.cancelled()) +# self.assertTrue(call.cancel()) +# self.assertTrue(call.cancelled()) +# self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) + +# async def test_early_cancel(self): +# call = self._stub.FullDuplexCall() + +# # Cancels the RPC +# self.assertFalse(call.done()) +# self.assertFalse(call.cancelled()) +# self.assertTrue(call.cancel()) +# self.assertTrue(call.cancelled()) +# self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) + +# async def test_cancel_after_done_writing(self): +# call = self._stub.FullDuplexCall() +# await call.done_writing() + +# # Cancels the RPC +# self.assertFalse(call.done()) +# self.assertFalse(call.cancelled()) +# self.assertTrue(call.cancel()) +# self.assertTrue(call.cancelled()) +# self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) + +# async def test_late_cancel(self): +# call = self._stub.FullDuplexCall() +# await call.done_writing() +# self.assertEqual(grpc.StatusCode.OK, await call.code()) + +# # Cancels the RPC +# self.assertTrue(call.done()) +# self.assertFalse(call.cancelled()) +# self.assertFalse(call.cancel()) +# self.assertFalse(call.cancelled()) + +# # Status is still OK +# self.assertEqual(grpc.StatusCode.OK, await call.code()) + +# async def test_async_generator(self): + +# async def request_generator(): +# yield _STREAM_OUTPUT_REQUEST_ONE_RESPONSE +# yield _STREAM_OUTPUT_REQUEST_ONE_RESPONSE + +# call = self._stub.FullDuplexCall(request_generator()) +# async for response in call: +# self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + +# self.assertEqual(await call.code(), grpc.StatusCode.OK) + +# async def test_too_many_reads(self): + +# async def request_generator(): +# for _ in range(_NUM_STREAM_RESPONSES): +# yield _STREAM_OUTPUT_REQUEST_ONE_RESPONSE + +# call = self._stub.FullDuplexCall(request_generator()) +# for _ in range(_NUM_STREAM_RESPONSES): +# response = await call.read() +# self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) +# self.assertIs(await call.read(), aio.EOF) + +# self.assertEqual(await call.code(), grpc.StatusCode.OK) +# # After the RPC finished, the read should also produce EOF +# self.assertIs(await call.read(), aio.EOF) + +# async def test_read_write_after_done_writing(self): +# call = self._stub.FullDuplexCall() + +# # Writes two requests, and pending two requests +# await call.write(_STREAM_OUTPUT_REQUEST_ONE_RESPONSE) +# await call.write(_STREAM_OUTPUT_REQUEST_ONE_RESPONSE) +# await call.done_writing() + +# # Further write should fail +# with self.assertRaises(asyncio.InvalidStateError): +# await call.write(_STREAM_OUTPUT_REQUEST_ONE_RESPONSE) + +# # But read should be unaffected +# response = await call.read() +# self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) +# response = await call.read() +# self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + +# self.assertEqual(await call.code(), grpc.StatusCode.OK) + +# async def test_error_in_async_generator(self): +# # Server will pause between responses +# request = messages_pb2.StreamingOutputCallRequest() +# request.response_parameters.append( +# messages_pb2.ResponseParameters( +# size=_RESPONSE_PAYLOAD_SIZE, +# interval_us=_RESPONSE_INTERVAL_US, +# )) + +# # We expect the request iterator to receive the exception +# request_iterator_received_the_exception = asyncio.Event() + +# async def request_iterator(): +# with self.assertRaises(asyncio.CancelledError): +# for _ in range(_NUM_STREAM_RESPONSES): +# yield request +# await asyncio.sleep(test_constants.SHORT_TIMEOUT) +# request_iterator_received_the_exception.set() + +# call = self._stub.FullDuplexCall(request_iterator()) + +# # Cancel the RPC after at least one response +# async def cancel_later(): +# await asyncio.sleep(test_constants.SHORT_TIMEOUT * 2) +# call.cancel() + +# cancel_later_task = self.loop.create_task(cancel_later()) + +# # No exceptions here +# async for response in call: +# self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + +# await request_iterator_received_the_exception.wait() + +# self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) +# # No failures in the cancel later task! +# await cancel_later_task if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG)