Resolve the compatibility issue that get_runnint_loop is 3.7+

pull/23280/head
Lidi Zheng 4 years ago
parent 6717ecb3cb
commit 286a80dad5
  1. 21
      src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi
  2. 9
      src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi
  3. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi
  4. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi
  5. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi
  6. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi
  7. 2
      src/python/grpcio/grpc/experimental/aio/_channel.py
  8. 2
      src/python/grpcio/grpc/experimental/aio/_server.py
  9. 2
      src/python/grpcio_tests/tests_aio/unit/outside_init_test.py

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from cpython.version cimport PY_MAJOR_VERSION, PY_MINOR_VERSION
cdef grpc_status_code get_status_code(object code) except *:
if isinstance(code, int):
@ -165,3 +167,22 @@ async def generator_to_async_generator(object gen, object loop, object thread_po
# Port the exception if there is any
await future
if PY_MAJOR_VERSION >=3 and PY_MINOR_VERSION >=7:
def get_working_loop():
"""Returns a running event loop."""
return asyncio.get_running_loop()
else:
def get_working_loop():
"""Returns a running event loop.
Due to a defect of asyncio.get_event_loop, its returned event loop might
not be set as the default event loop for the main thread. So, we will
raise RuntimeError if the returned event loop is not running.
"""
loop = asyncio.get_event_loop()
if loop.is_running():
return loop
else:
raise RuntimeError('no running event loop')

@ -41,11 +41,8 @@ cdef class BaseCompletionQueue:
cdef class PollerCompletionQueue(BaseCompletionQueue):
def __cinit__(self):
# NOTE(lidiz) Due to a defect of asyncio.get_event_loop, its returned
# event loop might not be set as the default event loop for the main
# thread. So, asyncio.get_running_loop() is needed to ensure the poller
# is bound to a working event loop.
self._loop = asyncio.get_running_loop()
self._loop = get_working_loop()
self._cq = grpc_completion_queue_create_for_next(NULL)
self._shutdown = False
self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True)
@ -123,7 +120,7 @@ cdef class PollerCompletionQueue(BaseCompletionQueue):
cdef class CallbackCompletionQueue(BaseCompletionQueue):
def __cinit__(self):
self._loop = asyncio.get_running_loop()
self._loop = get_working_loop()
self._shutdown_completed = self._loop.create_future()
self._wrapper = CallbackWrapper(
self._shutdown_completed,

@ -216,7 +216,7 @@ def _auth_plugin_callback_wrapper(object cb,
str service_url,
str method_name,
object callback):
asyncio.get_running_loop().call_soon(cb, service_url, method_name, callback)
get_working_loop().call_soon(cb, service_url, method_name, callback)
def install_asyncio_iomgr():

@ -15,7 +15,7 @@
cdef class _AsyncioResolver:
def __cinit__(self):
self._loop = asyncio.get_running_loop()
self._loop = get_working_loop()
self._grpc_resolver = NULL
self._task_resolve = None

@ -37,7 +37,7 @@ cdef class _AsyncioSocket:
self._py_socket = None
self._peername = None
self._closed = False
self._loop = asyncio.get_running_loop()
self._loop = get_working_loop()
@staticmethod
cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket,

@ -18,7 +18,7 @@ cdef class _AsyncioTimer:
self._grpc_timer = NULL
self._timer_future = None
self._active = False
self._loop = asyncio.get_running_loop()
self._loop = get_working_loop()
cpython.Py_INCREF(self)
@staticmethod

@ -269,7 +269,7 @@ class Channel(_base_channel.Channel):
"{} or ".format(StreamUnaryClientInterceptor.__name__) +
"{}. ".format(StreamStreamClientInterceptor.__name__))
self._loop = asyncio.get_running_loop()
self._loop = cygrpc.get_working_loop()
self._channel = cygrpc.AioChannel(
_common.encode(target),
_augment_channel_arguments(options, compression), credentials,

@ -41,7 +41,7 @@ class Server(_base_server.Server):
options: ChannelArgumentType,
maximum_concurrent_rpcs: Optional[int],
compression: Optional[grpc.Compression]):
self._loop = asyncio.get_running_loop()
self._loop = cygrpc.get_working_loop()
if interceptors:
invalid_interceptors = [
interceptor for interceptor in interceptors

@ -34,7 +34,7 @@ class TestOutsideInit(unittest.TestCase):
aio.secure_channel('', channel_creds)
with self.assertRaises(RuntimeError):
aio.server('', None)
aio.server()
# Ensures init_grpc_aio fail outside of AsyncIO
with self.assertRaises(RuntimeError):

Loading…
Cancel
Save