diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi index b8f04615297..5bd4f90c07b 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi @@ -70,7 +70,8 @@ cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler InternalError) -class ExecuteBatchError(Exception): pass +class ExecuteBatchError(InternalError): + """Raised upon execute batch returns a fail from Core.""" async def execute_batch(GrpcCallWrapper grpc_call_wrapper, @@ -128,7 +129,7 @@ async def _receive_message(GrpcCallWrapper grpc_call_wrapper, # the callback (e.g. cancelled). # # Since they all indicates finish, they are better be merged. - _LOGGER.debug(e) + _LOGGER.debug('Failed to received message from Core') return receive_op.message() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi index 3923244748a..c2c08161248 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -41,6 +41,18 @@ cdef class RPCState(GrpcCallWrapper): cdef Operation create_send_initial_metadata_op_if_not_sent(self) +cdef class _ServicerContext: + cdef RPCState _rpc_state + cdef object _loop + cdef object _request_deserializer + cdef object _response_serializer + + +cdef class _MessageReceiver: + cdef _ServicerContext _servicer_context + cdef object _agen + + cdef enum AioServerStatus: AIO_SERVER_STATUS_UNKNOWN AIO_SERVER_STATUS_READY diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 668ecab0a85..b0c62f9b9f2 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -109,10 +109,6 @@ cdef class RPCState: cdef class _ServicerContext: - cdef RPCState _rpc_state - cdef object _loop - cdef object _request_deserializer - cdef object _response_serializer def __cinit__(self, RPCState rpc_state, @@ -128,9 +124,9 @@ cdef class _ServicerContext: cdef bytes raw_message self._rpc_state.raise_for_termination() - if self._rpc_state.client_closed: - return EOF raw_message = await _receive_message(self._rpc_state, self._loop) + self._rpc_state.raise_for_termination() + if raw_message is None: return EOF else: @@ -414,15 +410,28 @@ async def _handle_unary_stream_rpc(object method_handler, ) -async def _message_receiver(_ServicerContext servicer_context): +cdef class _MessageReceiver: """Bridge between the async generator API and the reader-writer API.""" - cdef object message - while True: - message = await servicer_context.read() - if message is not EOF: - yield message - else: - break + + def __cinit__(self, _ServicerContext servicer_context): + self._servicer_context = servicer_context + self._agen = None + + async def _async_message_receiver(self): + """An async generator that receives messages.""" + cdef object message + while True: + message = await self._servicer_context.read() + if message is not EOF: + yield message + else: + break + + def __aiter__(self): + # Prevents never awaited warning if application never used the async generator + if self._agen is None: + self._agen = self._async_message_receiver() + return self._agen async def _handle_stream_unary_rpc(object method_handler, @@ -437,7 +446,7 @@ async def _handle_stream_unary_rpc(object method_handler, ) # Prepares the request generator - cdef object request_async_iterator = _message_receiver(servicer_context) + cdef object request_async_iterator = _MessageReceiver(servicer_context) # Finishes the application handler await _finish_handler_with_unary_response( @@ -462,7 +471,7 @@ async def _handle_stream_stream_rpc(object method_handler, ) # Prepares the request generator - cdef object request_async_iterator = _message_receiver(servicer_context) + cdef object request_async_iterator = _MessageReceiver(servicer_context) # Finishes the application handler await _finish_handler_with_stream_responses( @@ -495,6 +504,12 @@ async def _handle_exceptions(RPCState rpc_state, object rpc_coro, object loop): _LOGGER.debug('RPC cancelled for servicer method [%s]', _decode(rpc_state.method())) except _ServerStoppedError: _LOGGER.warning('Aborting method [%s] due to server stop.', _decode(rpc_state.method())) + except ExecuteBatchError: + # If client closed (aka. cancelled), ignore the failed batch operations. + if rpc_state.client_closed: + return + else: + raise except Exception as e: _LOGGER.exception('Unexpected [%s] raised by servicer method [%s]' % ( type(e).__name__, diff --git a/src/python/grpcio_tests/tests_aio/interop/local_interop_test.py b/src/python/grpcio_tests/tests_aio/interop/local_interop_test.py index c8b6083ae39..0db15be3a94 100644 --- a/src/python/grpcio_tests/tests_aio/interop/local_interop_test.py +++ b/src/python/grpcio_tests/tests_aio/interop/local_interop_test.py @@ -64,7 +64,6 @@ class InteropTestCaseMixin: await methods.test_interoperability( methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE, self._stub, None) - @unittest.skip('TODO(https://github.com/grpc/grpc/issues/21707)') async def test_timeout_on_sleeping_server(self): await methods.test_interoperability( methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER, self._stub, None) diff --git a/src/python/grpcio_tests/tests_aio/interop/methods.py b/src/python/grpcio_tests/tests_aio/interop/methods.py index 7c5c1edfd2b..019b3bca894 100644 --- a/src/python/grpcio_tests/tests_aio/interop/methods.py +++ b/src/python/grpcio_tests/tests_aio/interop/methods.py @@ -15,8 +15,9 @@ import argparse import asyncio -import enum import collections +import datetime +import enum import inspect import json import os @@ -220,12 +221,15 @@ async def _cancel_after_first_response(stub: test_pb2_grpc.TestServiceStub): async def _timeout_on_sleeping_server(stub: test_pb2_grpc.TestServiceStub): request_payload_size = 27182 + time_limit = datetime.timedelta(seconds=1) - call = stub.FullDuplexCall(timeout=0.001) + call = stub.FullDuplexCall(timeout=time_limit.total_seconds()) request = messages_pb2.StreamingOutputCallRequest( response_type=messages_pb2.COMPRESSABLE, - payload=messages_pb2.Payload(body=b'\x00' * request_payload_size)) + payload=messages_pb2.Payload(body=b'\x00' * request_payload_size), + response_parameters=(messages_pb2.ResponseParameters( + interval_us=int(time_limit.total_seconds() * 2 * 10**6)),)) await call.write(request) await call.done_writing() try: diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index e23548eed71..f64f4e44802 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -740,5 +740,5 @@ class TestStreamStreamCall(_MulticallableTestMixin, AioTestBase): if __name__ == '__main__': - logging.basicConfig() + logging.basicConfig(level=logging.DEBUG) unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests_aio/unit/channel_test.py b/src/python/grpcio_tests/tests_aio/unit/channel_test.py index 10949ac180c..58cd555491d 100644 --- a/src/python/grpcio_tests/tests_aio/unit/channel_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/channel_test.py @@ -226,5 +226,5 @@ class TestChannel(AioTestBase): if __name__ == '__main__': - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.DEBUG) unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests_aio/unit/client_interceptor_test.py b/src/python/grpcio_tests/tests_aio/unit/client_interceptor_test.py index 9fa08a78806..8f5a356ca4a 100644 --- a/src/python/grpcio_tests/tests_aio/unit/client_interceptor_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/client_interceptor_test.py @@ -686,5 +686,5 @@ class TestInterceptedUnaryUnaryCall(AioTestBase): if __name__ == '__main__': - logging.basicConfig() + logging.basicConfig(level=logging.DEBUG) unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests_aio/unit/close_channel_test.py b/src/python/grpcio_tests/tests_aio/unit/close_channel_test.py index f05c74392d9..1e10074c47c 100644 --- a/src/python/grpcio_tests/tests_aio/unit/close_channel_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/close_channel_test.py @@ -134,5 +134,5 @@ class TestCloseChannel(AioTestBase): if __name__ == '__main__': - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.DEBUG) unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests_aio/unit/init_test.py b/src/python/grpcio_tests/tests_aio/unit/init_test.py index 9104a0368c5..2582857751d 100644 --- a/src/python/grpcio_tests/tests_aio/unit/init_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/init_test.py @@ -35,7 +35,7 @@ class TestChannel(AioTestBase): channel = aio.insecure_channel(server_target) self.assertIsInstance(channel, aio.Channel) - async def tests_secure_channel(self): + async def test_secure_channel(self): server_target, _ = await start_test_server(secure=True) # pylint: disable=unused-variable credentials = grpc.ssl_channel_credentials( root_certificates=_TEST_ROOT_CERTIFICATES, @@ -48,5 +48,5 @@ class TestChannel(AioTestBase): if __name__ == '__main__': - logging.basicConfig() + logging.basicConfig(level=logging.DEBUG) unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests_aio/unit/server_interceptor_test.py b/src/python/grpcio_tests/tests_aio/unit/server_interceptor_test.py index 0c443389967..dabf005591f 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_interceptor_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_interceptor_test.py @@ -164,5 +164,5 @@ class TestServerInterceptor(AioTestBase): if __name__ == '__main__': - logging.basicConfig() + logging.basicConfig(level=logging.DEBUG) unittest.main(verbosity=2)