diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 8330f64e4cb..30e8447c0e5 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -157,31 +157,33 @@ async def _handle_unary_stream_rpc(object method_handler, loop, ) + cdef object async_response_generator + cdef object response_message if inspect.iscoroutinefunction(method_handler.unary_stream): # The handler uses reader / writer API, returns None. await method_handler.unary_stream( request_message, servicer_context, ) - return - - # The handler uses async generator API - cdef object async_response_generator = method_handler.unary_stream( - request_message, - servicer_context, - ) + else: + # The handler uses async generator API + async_response_generator = method_handler.unary_stream( + request_message, + servicer_context, + ) - # Consumes messages from the generator - cdef object response_message - async for response_message in async_response_generator: - await servicer_context.write(response_message) + # Consumes messages from the generator + async for response_message in async_response_generator: + await servicer_context.write(response_message) + # Sends the final status of this RPC cdef SendStatusFromServerOperation op = SendStatusFromServerOperation( None, StatusCode.ok, b'', _EMPTY_FLAGS, ) + cdef tuple ops = (op,) await callback_start_batch(rpc_state, ops, loop) diff --git a/src/python/grpcio/grpc/experimental/aio/_base_call.py b/src/python/grpcio/grpc/experimental/aio/_base_call.py index dd96c5e1107..7f66520b146 100644 --- a/src/python/grpcio/grpc/experimental/aio/_base_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_base_call.py @@ -147,9 +147,10 @@ class UnaryStreamCall( async def read(self) -> ResponseType: """Reads one message from the RPC. - Concurrent reads in multiple coroutines are not allowed. If you want to - perform read in multiple coroutines, you needs synchronization. So, you - can start another read after current read is finished. + For each streaming RPC, concurrent reads in multiple coroutines are not + allowed. If you want to perform read in multiple coroutines, you needs + synchronization. So, you can start another read after current read is + finished. Returns: A response message of the RPC. diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index 09257e397a6..9c6780d5aaf 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -174,12 +174,9 @@ class Call(_base_call.Call): return self._status.done() def add_done_callback(self, unused_callback) -> None: - raise NotImplementedError() - - def is_active(self) -> bool: - return self.done() + pass - def time_remaining(self) -> float: + def time_remaining(self) -> Optional[float]: pass async def initial_metadata(self) -> MetadataType: diff --git a/src/python/grpcio_tests/tests_aio/unit/_test_base.py b/src/python/grpcio_tests/tests_aio/unit/_test_base.py index 72bd8fec44e..d9284585c4b 100644 --- a/src/python/grpcio_tests/tests_aio/unit/_test_base.py +++ b/src/python/grpcio_tests/tests_aio/unit/_test_base.py @@ -44,6 +44,7 @@ def _get_default_loop(debug=True): return loop +# NOTE(gnossen) this test class can also be implemented with metaclass. class AioTestBase(unittest.TestCase): @property diff --git a/src/python/grpcio_tests/tests_aio/unit/_test_server.py b/src/python/grpcio_tests/tests_aio/unit/_test_server.py index 014a14aa7f3..41648da378c 100644 --- a/src/python/grpcio_tests/tests_aio/unit/_test_server.py +++ b/src/python/grpcio_tests/tests_aio/unit/_test_server.py @@ -13,8 +13,8 @@ # limitations under the License. import asyncio -from time import sleep import logging +import datetime from grpc.experimental import aio from src.proto.grpc.testing import messages_pb2 @@ -40,7 +40,8 @@ class _TestServiceServicer(test_pb2_grpc.TestServiceServicer): for response_parameters in request.response_parameters: if response_parameters.interval_us != 0: await asyncio.sleep( - response_parameters.interval_us / _US_IN_A_SECOND) + datetime.timedelta(microseconds=response_parameters. + interval_us).total_seconds()) yield messages_pb2.StreamingOutputCallResponse( payload=messages_pb2.Payload( type=request.response_type, diff --git a/src/python/grpcio_tests/tests_aio/unit/aio_rpc_error_test.py b/src/python/grpcio_tests/tests_aio/unit/aio_rpc_error_test.py index 60bc6ead06f..6ca3c534f29 100644 --- a/src/python/grpcio_tests/tests_aio/unit/aio_rpc_error_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/aio_rpc_error_test.py @@ -21,23 +21,28 @@ import grpc from grpc.experimental.aio._call import AioRpcError from tests_aio.unit._test_base import AioTestBase +_TEST_INITIAL_METADATA = ('initial metadata',) +_TEST_TRAILING_METADATA = ('trailing metadata',) +_TEST_DEBUG_ERROR_STRING = '{This is a debug string}' + class TestAioRpcError(unittest.TestCase): - _TEST_INITIAL_METADATA = ("initial metadata",) - _TEST_TRAILING_METADATA = ("trailing metadata",) def test_attributes(self): aio_rpc_error = AioRpcError( grpc.StatusCode.CANCELLED, - "details", - initial_metadata=self._TEST_INITIAL_METADATA, - trailing_metadata=self._TEST_TRAILING_METADATA) + 'details', + initial_metadata=_TEST_INITIAL_METADATA, + trailing_metadata=_TEST_TRAILING_METADATA, + debug_error_string=_TEST_DEBUG_ERROR_STRING) self.assertEqual(aio_rpc_error.code(), grpc.StatusCode.CANCELLED) - self.assertEqual(aio_rpc_error.details(), "details") + self.assertEqual(aio_rpc_error.details(), 'details') self.assertEqual(aio_rpc_error.initial_metadata(), - self._TEST_INITIAL_METADATA) + _TEST_INITIAL_METADATA) self.assertEqual(aio_rpc_error.trailing_metadata(), - self._TEST_TRAILING_METADATA) + _TEST_TRAILING_METADATA) + self.assertEqual(aio_rpc_error.debug_error_string(), + _TEST_DEBUG_ERROR_STRING) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index 0ed296c6b7b..77d60500123 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -54,7 +54,7 @@ class TestUnaryUnaryCall(AioTestBase): response = await call self.assertTrue(call.done()) - self.assertEqual(type(response), messages_pb2.SimpleResponse) + self.assertIsInstance(response, messages_pb2.SimpleResponse) self.assertEqual(await call.code(), grpc.StatusCode.OK) # Response is cached at call object level, reentrance @@ -81,9 +81,12 @@ class TestUnaryUnaryCall(AioTestBase): with self.assertRaises(grpc.RpcError) as exception_context: await call + self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, + exception_context.exception.code()) + self.assertTrue(call.done()) - self.assertEqual(await call.code(), - grpc.StatusCode.DEADLINE_EXCEEDED) + self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, await + call.code()) # Exception is cached at call object level, reentrance # returns again the same exception @@ -138,7 +141,7 @@ class TestUnaryUnaryCall(AioTestBase): # NOTE(lidiz) The CancelledError is almost always re-created, # so we might not want to use it to transmit data. - # https://github.com/python/cpython/blob/master/Lib/asyncio/tasks.py#L785 + # https://github.com/python/cpython/blob/edad4d89e357c92f70c0324b937845d652b20afd/Lib/asyncio/tasks.py#L785 class TestUnaryStreamCall(AioTestBase): @@ -272,6 +275,8 @@ class TestUnaryStreamCall(AioTestBase): # is received or on its way. It's basically a data race, so our # expectation here is do not crash :) call.cancel() + self.assertIn(await call.code(), + [grpc.StatusCode.OK, grpc.StatusCode.CANCELLED]) async def test_too_many_reads_unary_stream(self): """Test cancellation after received all messages.""" @@ -309,9 +314,7 @@ class TestUnaryStreamCall(AioTestBase): for _ in range(_NUM_STREAM_RESPONSES): request.response_parameters.append( messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE, - interval_us=_RESPONSE_INTERVAL_US, - )) + size=_RESPONSE_PAYLOAD_SIZE,)) # Invokes the actual RPC call = stub.StreamingOutputCall(request) diff --git a/src/python/grpcio_tests/tests_aio/unit/channel_test.py b/src/python/grpcio_tests/tests_aio/unit/channel_test.py index bf32d105f60..6ed57f6219e 100644 --- a/src/python/grpcio_tests/tests_aio/unit/channel_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/channel_test.py @@ -59,7 +59,7 @@ class TestChannel(AioTestBase): response_deserializer=messages_pb2.SimpleResponse.FromString) response = await hi(messages_pb2.SimpleRequest()) - self.assertIs(type(response), messages_pb2.SimpleResponse) + self.assertIsInstance(response, messages_pb2.SimpleResponse) async def test_unary_call_times_out(self): async with aio.insecure_channel(self._server_target) as channel: @@ -96,7 +96,7 @@ class TestChannel(AioTestBase): response_deserializer=messages_pb2.SimpleResponse.FromString) response = await hi(messages_pb2.SimpleRequest()) - self.assertIs(type(response), messages_pb2.SimpleResponse) + self.assertIsInstance(response, messages_pb2.SimpleResponse) await channel.close() @@ -122,6 +122,7 @@ class TestChannel(AioTestBase): self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) self.assertEqual(_NUM_STREAM_RESPONSES, response_cnt) + self.assertEqual(await call.code(), grpc.StatusCode.OK) await channel.close() diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py index 1ba00c0312a..265744b1f54 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -28,6 +28,7 @@ _BLOCK_FOREVER = '/test/BlockForever' _BLOCK_BRIEFLY = '/test/BlockBriefly' _UNARY_STREAM_ASYNC_GEN = '/test/UnaryStreamAsyncGen' _UNARY_STREAM_READER_WRITER = '/test/UnaryStreamReaderWriter' +_UNARY_STREAM_EVILLY_MIXED = '/test/UnaryStreamEvillyMixed' _REQUEST = b'\x00\x00\x00' _RESPONSE = b'\x01\x01\x01' @@ -56,7 +57,12 @@ class _GenericHandler(grpc.GenericRpcHandler): async def _unary_stream_reader_writer(self, unused_request, context): for _ in range(_NUM_STREAM_RESPONSES): - context.write(_RESPONSE) + await context.write(_RESPONSE) + + async def _unary_stream_evilly_mixed(self, unused_request, context): + yield _RESPONSE + for _ in range(_NUM_STREAM_RESPONSES - 1): + await context.write(_RESPONSE) def service(self, handler_details): self._called.set_result(None) @@ -72,6 +78,9 @@ class _GenericHandler(grpc.GenericRpcHandler): if handler_details.method == _UNARY_STREAM_READER_WRITER: return grpc.unary_stream_rpc_method_handler( self._unary_stream_reader_writer) + if handler_details.method == _UNARY_STREAM_EVILLY_MIXED: + return grpc.unary_stream_rpc_method_handler( + self._unary_stream_evilly_mixed) async def wait_for_call(self): await self._called @@ -105,7 +114,6 @@ class TestServer(AioTestBase): async with aio.insecure_channel(self._server_target) as channel: unary_stream_call = channel.unary_stream(_UNARY_STREAM_ASYNC_GEN) call = unary_stream_call(_REQUEST) - await self._generic_handler.wait_for_call() # Expecting the request message to reach server before retriving # any responses. @@ -122,9 +130,9 @@ class TestServer(AioTestBase): async def test_unary_stream_reader_writer(self): async with aio.insecure_channel(self._server_target) as channel: - unary_stream_call = channel.unary_stream(_UNARY_STREAM_ASYNC_GEN) + unary_stream_call = channel.unary_stream( + _UNARY_STREAM_READER_WRITER) call = unary_stream_call(_REQUEST) - await self._generic_handler.wait_for_call() # Expecting the request message to reach server before retriving # any responses. @@ -137,6 +145,29 @@ class TestServer(AioTestBase): self.assertEqual(await call.code(), grpc.StatusCode.OK) + async def test_unary_stream_evilly_mixed(self): + async with aio.insecure_channel(self._server_target) as channel: + unary_stream_call = channel.unary_stream(_UNARY_STREAM_EVILLY_MIXED) + call = unary_stream_call(_REQUEST) + + # Expecting the request message to reach server before retriving + # any responses. + await asyncio.wait_for(self._generic_handler.wait_for_call(), + test_constants.SHORT_TIMEOUT) + + # Uses reader API + self.assertEqual(_RESPONSE, await call.read()) + + # Uses async generator API + response_cnt = 0 + async for response in call: + response_cnt += 1 + self.assertEqual(_RESPONSE, response) + + self.assertEqual(_NUM_STREAM_RESPONSES - 1, response_cnt) + + self.assertEqual(await call.code(), grpc.StatusCode.OK) + async def test_shutdown(self): await self._server.stop(None) # Ensures no SIGSEGV triggered, and ends within timeout. @@ -229,5 +260,5 @@ class TestServer(AioTestBase): if __name__ == '__main__': - logging.basicConfig() + logging.basicConfig(level=logging.DEBUG) unittest.main(verbosity=2) diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 84b6160d279..0f287841da5 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -727,8 +727,9 @@ class PythonLanguage(object): self.args.iomgr_platform]) as tests_json_file: tests_json = json.load(tests_json_file) environment = dict(_FORCE_ENVIRON_FOR_WRAPPERS) - # NOTE(lidiz) Fork handlers is not designed for non-native IO manager. - # It has a side-effect that overrides threading settings in C-Core. + # TODO(https://github.com/grpc/grpc/issues/21401) Fork handlers is not + # designed for non-native IO manager. It has a side-effect that + # overrides threading settings in C-Core. if args.iomgr_platform != 'native': environment['GRPC_ENABLE_FORK_SUPPORT'] = '0' return [