diff --git a/doc/python/sphinx/grpc_asyncio.rst b/doc/python/sphinx/grpc_asyncio.rst index 16267d94433..3513f16f036 100644 --- a/doc/python/sphinx/grpc_asyncio.rst +++ b/doc/python/sphinx/grpc_asyncio.rst @@ -24,27 +24,20 @@ gRPC Async API objects may only be used on the thread on which they were created. AsyncIO doesn't provide thread safety for most of its APIs. -Module Contents ---------------- - Enable AsyncIO in gRPC -^^^^^^^^^^^^^^^^^^^^^^ +---------------------- -.. function:: init_grpc_aio +Enable AsyncIO in gRPC Python is automatic when instantiating gRPC AsyncIO +objects (e.g., channels and servers). No additional function invocation is +required. - Enable AsyncIO for gRPC Python. +Making blocking function calls in coroutines or in the thread running event +loop will block the event loop, potentially starving all RPCs in the process. +Refer to the Python language documentation on AsyncIO for more details (`running-blocking-code `_). - This function is idempotent and it should be invoked before creation of - AsyncIO stack objects. Otherwise, the application might deadlock. - This function configurates the gRPC C-Core to invoke AsyncIO methods for IO - operations (e.g., socket read, write). The configuration applies to the - entire process. - - After invoking this function, making blocking function calls in coroutines - or in the thread running event loop will block the event loop, potentially - starving all RPCs in the process. Refer to the Python language - documentation on AsyncIO for more details (`running-blocking-code `_). +Module Contents +--------------- Create Channel diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi index f42929461a0..fbc6e999608 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi @@ -41,7 +41,11 @@ cdef class BaseCompletionQueue: cdef class PollerCompletionQueue(BaseCompletionQueue): def __cinit__(self): - self._loop = asyncio.get_event_loop() + # NOTE(lidiz) Due to a defect of asyncio.get_event_loop, its returned + # event loop might not be set as the default event loop for the main + # thread. So, asyncio.get_running_loop() is needed to ensure the poller + # is bound to a working event loop. + self._loop = asyncio.get_running_loop() self._cq = grpc_completion_queue_create_for_next(NULL) self._shutdown = False self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True) @@ -119,7 +123,7 @@ cdef class PollerCompletionQueue(BaseCompletionQueue): cdef class CallbackCompletionQueue(BaseCompletionQueue): def __cinit__(self): - self._loop = asyncio.get_event_loop() + self._loop = asyncio.get_running_loop() self._shutdown_completed = self._loop.create_future() self._wrapper = CallbackWrapper( self._shutdown_completed, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi index 65ec0c8f202..b845e4d1e8c 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi @@ -216,7 +216,7 @@ def _auth_plugin_callback_wrapper(object cb, str service_url, str method_name, object callback): - asyncio.get_event_loop().call_soon(cb, service_url, method_name, callback) + asyncio.get_running_loop().call_soon(cb, service_url, method_name, callback) def install_asyncio_iomgr(): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi index 3e664245aac..812b5168d9f 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi @@ -15,7 +15,7 @@ cdef class _AsyncioResolver: def __cinit__(self): - self._loop = asyncio.get_event_loop() + self._loop = asyncio.get_running_loop() self._grpc_resolver = NULL self._task_resolve = None diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi index 7524e9da94b..ad8b3d93fc1 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi @@ -37,7 +37,7 @@ cdef class _AsyncioSocket: self._py_socket = None self._peername = None self._closed = False - self._loop = asyncio.get_event_loop() + self._loop = asyncio.get_running_loop() @staticmethod cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi index 286cd9a9d43..77769c91e87 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi @@ -18,7 +18,7 @@ cdef class _AsyncioTimer: self._grpc_timer = NULL self._timer_future = None self._active = False - self._loop = asyncio.get_event_loop() + self._loop = asyncio.get_running_loop() cpython.Py_INCREF(self) @staticmethod diff --git a/src/python/grpcio/grpc/experimental/aio/_channel.py b/src/python/grpcio/grpc/experimental/aio/_channel.py index 1995db13bf5..5fcfcb637e2 100644 --- a/src/python/grpcio/grpc/experimental/aio/_channel.py +++ b/src/python/grpcio/grpc/experimental/aio/_channel.py @@ -269,7 +269,7 @@ class Channel(_base_channel.Channel): "{} or ".format(StreamUnaryClientInterceptor.__name__) + "{}. ".format(StreamStreamClientInterceptor.__name__)) - self._loop = asyncio.get_event_loop() + self._loop = asyncio.get_running_loop() self._channel = cygrpc.AioChannel( _common.encode(target), _augment_channel_arguments(options, compression), credentials, diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py index 81438891268..2a2b846de20 100644 --- a/src/python/grpcio/grpc/experimental/aio/_server.py +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -41,7 +41,7 @@ class Server(_base_server.Server): options: ChannelArgumentType, maximum_concurrent_rpcs: Optional[int], compression: Optional[grpc.Compression]): - self._loop = asyncio.get_event_loop() + self._loop = asyncio.get_running_loop() if interceptors: invalid_interceptors = [ interceptor for interceptor in interceptors diff --git a/src/python/grpcio_tests/tests_aio/unit/_test_base.py b/src/python/grpcio_tests/tests_aio/unit/_test_base.py index 82ec7b456ad..ec5f2112da0 100644 --- a/src/python/grpcio_tests/tests_aio/unit/_test_base.py +++ b/src/python/grpcio_tests/tests_aio/unit/_test_base.py @@ -64,6 +64,3 @@ class AioTestBase(unittest.TestCase): return _async_to_sync_decorator(attr, self._TEST_LOOP) # For other attributes, let them pass. return attr - - -aio.init_grpc_aio() diff --git a/src/python/grpcio_tests/tests_aio/unit/outside_init_test.py b/src/python/grpcio_tests/tests_aio/unit/outside_init_test.py new file mode 100644 index 00000000000..aa028fe9adb --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/unit/outside_init_test.py @@ -0,0 +1,46 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests behavior around the metadata mechanism.""" + +import asyncio +import logging +import unittest +from grpc.experimental import aio +import grpc + + +class TestOutsideInit(unittest.TestCase): + + def test_behavior_outside_asyncio(self): + # Ensures non-AsyncIO object can be initiated + channel_creds = grpc.ssl_channel_credentials() + + # Ensures AsyncIO API NOT working outside of AsyncIO + with self.assertRaises(RuntimeError): + aio.insecure_channel('') + + with self.assertRaises(RuntimeError): + aio.secure_channel('', channel_creds) + + with self.assertRaises(RuntimeError): + aio.server('', None) + + # Ensures init_grpc_aio fail outside of AsyncIO + with self.assertRaises(RuntimeError): + aio.init_grpc_aio() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + unittest.main(verbosity=2)