Merge pull request #23280 from lidizheng/aio-universal-init

[Aio] Hide init_grpc_aio and guard async API outside of AsyncIO context
pull/23301/head
Lidi Zheng 5 years ago committed by GitHub
commit 0e8190a3c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      doc/python/sphinx/grpc_asyncio.rst
  2. 22
      src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi
  3. 5
      src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi
  4. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi
  5. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi
  6. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi
  7. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi
  8. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
  9. 2
      src/python/grpcio/grpc/experimental/aio/_channel.py
  10. 3
      src/python/grpcio/grpc/experimental/aio/_server.py
  11. 1
      src/python/grpcio_tests/tests_aio/tests.json
  12. 3
      src/python/grpcio_tests/tests_aio/unit/_test_base.py
  13. 46
      src/python/grpcio_tests/tests_aio/unit/outside_init_test.py

@ -24,27 +24,16 @@ gRPC Async API objects may only be used on the thread on which they were
created. AsyncIO doesn't provide thread safety for most of its APIs.
Module Contents
---------------
Enable AsyncIO in gRPC
^^^^^^^^^^^^^^^^^^^^^^
.. function:: init_grpc_aio
Blocking Code in AsyncIO
------------------------
Enable AsyncIO for gRPC Python.
Making blocking function calls in coroutines or in the thread running event
loop will block the event loop, potentially starving all RPCs in the process.
Refer to the Python language documentation on AsyncIO for more details (`running-blocking-code <https://docs.python.org/3/library/asyncio-dev.html#running-blocking-code>`_).
This function is idempotent and it should be invoked before creation of
AsyncIO stack objects. Otherwise, the application might deadlock.
This function configurates the gRPC C-Core to invoke AsyncIO methods for IO
operations (e.g., socket read, write). The configuration applies to the
entire process.
After invoking this function, making blocking function calls in coroutines
or in the thread running event loop will block the event loop, potentially
starving all RPCs in the process. Refer to the Python language
documentation on AsyncIO for more details (`running-blocking-code <https://docs.python.org/3/library/asyncio-dev.html#running-blocking-code>`_).
Module Contents
---------------
Create Channel

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from cpython.version cimport PY_MAJOR_VERSION, PY_MINOR_VERSION
cdef grpc_status_code get_status_code(object code) except *:
if isinstance(code, int):
@ -165,3 +167,23 @@ async def generator_to_async_generator(object gen, object loop, object thread_po
# Port the exception if there is any
await future
if PY_MAJOR_VERSION >= 3 and PY_MINOR_VERSION >= 7:
def get_working_loop():
"""Returns a running event loop."""
return asyncio.get_running_loop()
else:
def get_working_loop():
"""Returns a running event loop.
Due to a defect of asyncio.get_event_loop, its returned event loop might
not be set as the default event loop for the main thread. So, we will
raise RuntimeError if the returned event loop is not running.
"""
loop = asyncio.get_event_loop()
if loop.is_running():
return loop
else:
raise RuntimeError('No running event loop detected. This function '
+ 'must be called from inside of a running event loop.')

@ -41,7 +41,8 @@ cdef class BaseCompletionQueue:
cdef class PollerCompletionQueue(BaseCompletionQueue):
def __cinit__(self):
self._loop = asyncio.get_event_loop()
self._loop = get_working_loop()
self._cq = grpc_completion_queue_create_for_next(NULL)
self._shutdown = False
self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True)
@ -119,7 +120,7 @@ cdef class PollerCompletionQueue(BaseCompletionQueue):
cdef class CallbackCompletionQueue(BaseCompletionQueue):
def __cinit__(self):
self._loop = asyncio.get_event_loop()
self._loop = get_working_loop()
self._shutdown_completed = self._loop.create_future()
self._wrapper = CallbackWrapper(
self._shutdown_completed,

@ -216,7 +216,7 @@ def _auth_plugin_callback_wrapper(object cb,
str service_url,
str method_name,
object callback):
asyncio.get_event_loop().call_soon(cb, service_url, method_name, callback)
get_working_loop().call_soon(cb, service_url, method_name, callback)
def install_asyncio_iomgr():

@ -15,7 +15,7 @@
cdef class _AsyncioResolver:
def __cinit__(self):
self._loop = asyncio.get_event_loop()
self._loop = get_working_loop()
self._grpc_resolver = NULL
self._task_resolve = None

@ -37,7 +37,7 @@ cdef class _AsyncioSocket:
self._py_socket = None
self._peername = None
self._closed = False
self._loop = asyncio.get_event_loop()
self._loop = get_working_loop()
@staticmethod
cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket,

@ -18,7 +18,7 @@ cdef class _AsyncioTimer:
self._grpc_timer = NULL
self._timer_future = None
self._active = False
self._loop = asyncio.get_event_loop()
self._loop = get_working_loop()
cpython.Py_INCREF(self)
@staticmethod

@ -990,7 +990,7 @@ cdef class AioServer:
# TODO(lidiz) if users create server, and then dealloc it immediately.
# There is a potential memory leak of created Core server.
if self._status != AIO_SERVER_STATUS_STOPPED:
_LOGGER.warning(
_LOGGER.debug(
'__dealloc__ called on running server %s with status %d',
self,
self._status

@ -269,7 +269,7 @@ class Channel(_base_channel.Channel):
"{} or ".format(StreamUnaryClientInterceptor.__name__) +
"{}. ".format(StreamStreamClientInterceptor.__name__))
self._loop = asyncio.get_event_loop()
self._loop = cygrpc.get_working_loop()
self._channel = cygrpc.AioChannel(
_common.encode(target),
_augment_channel_arguments(options, compression), credentials,

@ -13,7 +13,6 @@
# limitations under the License.
"""Server-side implementation of gRPC Asyncio Python."""
import asyncio
from concurrent.futures import Executor
from typing import Any, Optional, Sequence
@ -41,7 +40,7 @@ class Server(_base_server.Server):
options: ChannelArgumentType,
maximum_concurrent_rpcs: Optional[int],
compression: Optional[grpc.Compression]):
self._loop = asyncio.get_event_loop()
self._loop = cygrpc.get_working_loop()
if interceptors:
invalid_interceptors = [
interceptor for interceptor in interceptors

@ -30,6 +30,7 @@
"unit.done_callback_test.TestDoneCallback",
"unit.init_test.TestChannel",
"unit.metadata_test.TestMetadata",
"unit.outside_init_test.TestOutsideInit",
"unit.secure_call_test.TestStreamStreamSecureCall",
"unit.secure_call_test.TestUnaryStreamSecureCall",
"unit.secure_call_test.TestUnaryUnarySecureCall",

@ -64,6 +64,3 @@ class AioTestBase(unittest.TestCase):
return _async_to_sync_decorator(attr, self._TEST_LOOP)
# For other attributes, let them pass.
return attr
aio.init_grpc_aio()

@ -0,0 +1,46 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests behavior around the metadata mechanism."""
import asyncio
import logging
import unittest
from grpc.experimental import aio
import grpc
class TestOutsideInit(unittest.TestCase):
def test_behavior_outside_asyncio(self):
# Ensures non-AsyncIO object can be initiated
channel_creds = grpc.ssl_channel_credentials()
# Ensures AsyncIO API NOT working outside of AsyncIO
with self.assertRaises(RuntimeError):
aio.insecure_channel('')
with self.assertRaises(RuntimeError):
aio.secure_channel('', channel_creds)
with self.assertRaises(RuntimeError):
aio.server()
# Ensures init_grpc_aio fail outside of AsyncIO
with self.assertRaises(RuntimeError):
aio.init_grpc_aio()
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
unittest.main(verbosity=2)
Loading…
Cancel
Save