Hide init_grpc_aio and guard async API outside of AsyncIO context

pull/23280/head
Lidi Zheng 4 years ago
parent 4b78ae524e
commit 8bcffaa5ca
  1. 25
      doc/python/sphinx/grpc_asyncio.rst
  2. 8
      src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi
  3. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi
  4. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi
  5. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi
  6. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi
  7. 2
      src/python/grpcio/grpc/experimental/aio/_channel.py
  8. 2
      src/python/grpcio/grpc/experimental/aio/_server.py
  9. 3
      src/python/grpcio_tests/tests_aio/unit/_test_base.py
  10. 46
      src/python/grpcio_tests/tests_aio/unit/outside_init_test.py

@ -24,27 +24,20 @@ gRPC Async API objects may only be used on the thread on which they were
created. AsyncIO doesn't provide thread safety for most of its APIs. created. AsyncIO doesn't provide thread safety for most of its APIs.
Module Contents
---------------
Enable AsyncIO in gRPC Enable AsyncIO in gRPC
^^^^^^^^^^^^^^^^^^^^^^ ----------------------
.. function:: init_grpc_aio Enable AsyncIO in gRPC Python is automatic when instantiating gRPC AsyncIO
objects (e.g., channels and servers). No additional function invocation is
required.
Enable AsyncIO for gRPC Python. Making blocking function calls in coroutines or in the thread running event
loop will block the event loop, potentially starving all RPCs in the process.
Refer to the Python language documentation on AsyncIO for more details (`running-blocking-code <https://docs.python.org/3/library/asyncio-dev.html#running-blocking-code>`_).
This function is idempotent and it should be invoked before creation of
AsyncIO stack objects. Otherwise, the application might deadlock.
This function configurates the gRPC C-Core to invoke AsyncIO methods for IO Module Contents
operations (e.g., socket read, write). The configuration applies to the ---------------
entire process.
After invoking this function, making blocking function calls in coroutines
or in the thread running event loop will block the event loop, potentially
starving all RPCs in the process. Refer to the Python language
documentation on AsyncIO for more details (`running-blocking-code <https://docs.python.org/3/library/asyncio-dev.html#running-blocking-code>`_).
Create Channel Create Channel

@ -41,7 +41,11 @@ cdef class BaseCompletionQueue:
cdef class PollerCompletionQueue(BaseCompletionQueue): cdef class PollerCompletionQueue(BaseCompletionQueue):
def __cinit__(self): def __cinit__(self):
self._loop = asyncio.get_event_loop() # NOTE(lidiz) Due to a defect of asyncio.get_event_loop, its returned
# event loop might not be set as the default event loop for the main
# thread. So, asyncio.get_running_loop() is needed to ensure the poller
# is bound to a working event loop.
self._loop = asyncio.get_running_loop()
self._cq = grpc_completion_queue_create_for_next(NULL) self._cq = grpc_completion_queue_create_for_next(NULL)
self._shutdown = False self._shutdown = False
self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True) self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True)
@ -119,7 +123,7 @@ cdef class PollerCompletionQueue(BaseCompletionQueue):
cdef class CallbackCompletionQueue(BaseCompletionQueue): cdef class CallbackCompletionQueue(BaseCompletionQueue):
def __cinit__(self): def __cinit__(self):
self._loop = asyncio.get_event_loop() self._loop = asyncio.get_running_loop()
self._shutdown_completed = self._loop.create_future() self._shutdown_completed = self._loop.create_future()
self._wrapper = CallbackWrapper( self._wrapper = CallbackWrapper(
self._shutdown_completed, self._shutdown_completed,

@ -216,7 +216,7 @@ def _auth_plugin_callback_wrapper(object cb,
str service_url, str service_url,
str method_name, str method_name,
object callback): object callback):
asyncio.get_event_loop().call_soon(cb, service_url, method_name, callback) asyncio.get_running_loop().call_soon(cb, service_url, method_name, callback)
def install_asyncio_iomgr(): def install_asyncio_iomgr():

@ -15,7 +15,7 @@
cdef class _AsyncioResolver: cdef class _AsyncioResolver:
def __cinit__(self): def __cinit__(self):
self._loop = asyncio.get_event_loop() self._loop = asyncio.get_running_loop()
self._grpc_resolver = NULL self._grpc_resolver = NULL
self._task_resolve = None self._task_resolve = None

@ -37,7 +37,7 @@ cdef class _AsyncioSocket:
self._py_socket = None self._py_socket = None
self._peername = None self._peername = None
self._closed = False self._closed = False
self._loop = asyncio.get_event_loop() self._loop = asyncio.get_running_loop()
@staticmethod @staticmethod
cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket, cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket,

@ -18,7 +18,7 @@ cdef class _AsyncioTimer:
self._grpc_timer = NULL self._grpc_timer = NULL
self._timer_future = None self._timer_future = None
self._active = False self._active = False
self._loop = asyncio.get_event_loop() self._loop = asyncio.get_running_loop()
cpython.Py_INCREF(self) cpython.Py_INCREF(self)
@staticmethod @staticmethod

@ -269,7 +269,7 @@ class Channel(_base_channel.Channel):
"{} or ".format(StreamUnaryClientInterceptor.__name__) + "{} or ".format(StreamUnaryClientInterceptor.__name__) +
"{}. ".format(StreamStreamClientInterceptor.__name__)) "{}. ".format(StreamStreamClientInterceptor.__name__))
self._loop = asyncio.get_event_loop() self._loop = asyncio.get_running_loop()
self._channel = cygrpc.AioChannel( self._channel = cygrpc.AioChannel(
_common.encode(target), _common.encode(target),
_augment_channel_arguments(options, compression), credentials, _augment_channel_arguments(options, compression), credentials,

@ -41,7 +41,7 @@ class Server(_base_server.Server):
options: ChannelArgumentType, options: ChannelArgumentType,
maximum_concurrent_rpcs: Optional[int], maximum_concurrent_rpcs: Optional[int],
compression: Optional[grpc.Compression]): compression: Optional[grpc.Compression]):
self._loop = asyncio.get_event_loop() self._loop = asyncio.get_running_loop()
if interceptors: if interceptors:
invalid_interceptors = [ invalid_interceptors = [
interceptor for interceptor in interceptors interceptor for interceptor in interceptors

@ -64,6 +64,3 @@ class AioTestBase(unittest.TestCase):
return _async_to_sync_decorator(attr, self._TEST_LOOP) return _async_to_sync_decorator(attr, self._TEST_LOOP)
# For other attributes, let them pass. # For other attributes, let them pass.
return attr return attr
aio.init_grpc_aio()

@ -0,0 +1,46 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests behavior around the metadata mechanism."""
import asyncio
import logging
import unittest
from grpc.experimental import aio
import grpc
class TestOutsideInit(unittest.TestCase):
def test_behavior_outside_asyncio(self):
# Ensures non-AsyncIO object can be initiated
channel_creds = grpc.ssl_channel_credentials()
# Ensures AsyncIO API NOT working outside of AsyncIO
with self.assertRaises(RuntimeError):
aio.insecure_channel('')
with self.assertRaises(RuntimeError):
aio.secure_channel('', channel_creds)
with self.assertRaises(RuntimeError):
aio.server('', None)
# Ensures init_grpc_aio fail outside of AsyncIO
with self.assertRaises(RuntimeError):
aio.init_grpc_aio()
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
unittest.main(verbosity=2)
Loading…
Cancel
Save