diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi index 7c150c33054..b8f04615297 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi @@ -35,7 +35,6 @@ cdef class CallbackWrapper: def __cinit__(self, object future, object loop, CallbackFailureHandler failure_handler): self.context.functor.functor_run = self.functor_run self.context.waiter = future - # TODO(lidiz) switch to future.get_loop() which is available 3.7+. self.context.loop = loop self.context.failure_handler = failure_handler self.context.callback_wrapper = self diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi index 5233d0b3bb7..bc2650696f4 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi @@ -14,13 +14,13 @@ import enum -cdef str _GRPC_ASYNCIO_ENGINE = os.environ.get('GRPC_ASYNCIO_ENGINE', 'default').lower() +cdef str _GRPC_ASYNCIO_ENGINE = os.environ.get('GRPC_ASYNCIO_ENGINE', 'default').upper() cdef _AioState _global_aio_state = _AioState() class AsyncIOEngine(enum.Enum): DEFAULT = 'default' - CUSTOM_IO_MANAGER = 'custom' + CUSTOM_IO_MANAGER = 'custom_io_manager' POLLER = 'poller' @@ -28,11 +28,6 @@ cdef _default_asyncio_engine(): return AsyncIOEngine.CUSTOM_IO_MANAGER -def grpc_aio_engine(): - """Read-only access to the picked engine type.""" - return _global_aio_state.engine - - cdef grpc_completion_queue *global_completion_queue(): return _global_aio_state.cq.c_ptr() @@ -85,6 +80,7 @@ cdef _actual_aio_initialization(): ) if _global_aio_state.engine is AsyncIOEngine.DEFAULT: _global_aio_state.engine = _default_asyncio_engine() + _LOGGER.info('Using %s as I/O engine', _global_aio_state.engine) # Initializes the process-level state accordingly if _global_aio_state.engine is AsyncIOEngine.CUSTOM_IO_MANAGER: diff --git a/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel b/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel index 8db8f23e3c9..ab475bcf97c 100644 --- a/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel @@ -51,7 +51,7 @@ py_library( ) _FLAKY_TESTS = [ - # NOTE(lidiz) this tests use many tcp ports; flaky under parallel runs. + # TODO(https://github.com/grpc/grpc/issues/22347) remove from this list. "channel_argument_test.py", ] diff --git a/src/python/grpcio_tests/tests_aio/unit/_test_base.py b/src/python/grpcio_tests/tests_aio/unit/_test_base.py index 06563e08166..ec5f2112da0 100644 --- a/src/python/grpcio_tests/tests_aio/unit/_test_base.py +++ b/src/python/grpcio_tests/tests_aio/unit/_test_base.py @@ -46,10 +46,13 @@ def _get_default_loop(debug=True): # NOTE(gnossen) this test class can also be implemented with metaclass. class AioTestBase(unittest.TestCase): + # NOTE(lidi) We need to pick a loop for entire testing phase, otherwise it + # will trigger create new loops in new threads, leads to deadlock. + _TEST_LOOP = _get_default_loop() @property def loop(self): - return _get_default_loop() + return self._TEST_LOOP def __getattribute__(self, name): """Overrides the loading logic to support coroutine functions.""" @@ -58,6 +61,6 @@ class AioTestBase(unittest.TestCase): # If possible, converts the coroutine into a sync function. if name.startswith('test_') or name in _COROUTINE_FUNCTION_ALLOWLIST: if asyncio.iscoroutinefunction(attr): - return _async_to_sync_decorator(attr, _get_default_loop()) + return _async_to_sync_decorator(attr, self._TEST_LOOP) # For other attributes, let them pass. return attr diff --git a/src/python/grpcio_tests/tests_aio/unit/compatibility_test.py b/src/python/grpcio_tests/tests_aio/unit/compatibility_test.py index acf404ed51b..066ff6402c5 100644 --- a/src/python/grpcio_tests/tests_aio/unit/compatibility_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/compatibility_test.py @@ -43,8 +43,9 @@ def _unique_options() -> Sequence[Tuple[str, float]]: return (('iv', random.random()),) -@unittest.skipIf(cygrpc.grpc_aio_engine() != cygrpc.AsyncIOEngine.POLLER, - 'Compatible mode needs POLLER completion queue.') +@unittest.skipIf( + os.environ.get('GRPC_ASYNCIO_ENGINE', '').lower() != 'poller', + 'Compatible mode needs POLLER completion queue.') class TestCompatibility(AioTestBase): async def setUp(self): @@ -65,7 +66,7 @@ class TestCompatibility(AioTestBase): await self._async_server.stop(None) async def _run_in_another_thread(self, func: Callable[[], None]): - work_done = asyncio.Event() + work_done = asyncio.Event(loop=self.loop) def thread_work(): func() @@ -162,17 +163,14 @@ class TestCompatibility(AioTestBase): async def test_server(self): - def echo(a, b): - return a - class GenericHandlers(grpc.GenericRpcHandler): def service(self, handler_call_details): - return grpc.unary_unary_rpc_method_handler(echo) + return grpc.unary_unary_rpc_method_handler(lambda x, _: x) # It's fine to instantiate server object in the event loop thread. # The server will spawn its own serving thread. - server = grpc.server(ThreadPoolExecutor(max_workers=10), + server = grpc.server(ThreadPoolExecutor(), handlers=(GenericHandlers(),)) port = server.add_insecure_port('0') server.start() @@ -200,7 +198,7 @@ class TestCompatibility(AioTestBase): call = async_stub.UnaryCall(messages_pb2.SimpleRequest()) response = await call self.assertIsInstance(response, messages_pb2.SimpleResponse) - self.assertEqual(grpc.StatusCode.OK, call.code()) + self.assertEqual(grpc.StatusCode.OK, await call.code()) loop = asyncio.new_event_loop() loop.run_until_complete(async_work())