Merge pull request #22311 from lidizheng/aio-one-cq

[Aio] One completion queue & many-thread many-loop
pull/22368/head
Lidi Zheng 5 years ago committed by GitHub
commit da6f9a28bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi
  2. 1
      src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi
  3. 4
      src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi
  4. 1
      src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi
  5. 10
      src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi
  6. 13
      src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi
  7. 4
      src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi
  8. 34
      src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi
  9. 23
      src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi
  10. 196
      src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi
  11. 1
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pxd.pxi
  12. 5
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi
  13. 9
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi
  14. 1
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi
  15. 3
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi
  16. 1
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi
  17. 14
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
  18. 1
      src/python/grpcio/grpc/_cython/cygrpc.pyx
  19. 3
      src/python/grpcio/grpc/experimental/aio/__init__.py
  20. 2
      src/python/grpcio/grpc/experimental/aio/_channel.py
  21. 8
      src/python/grpcio/grpc/experimental/aio/_server.py
  22. 2
      src/python/grpcio_tests/commands.py
  23. 1
      src/python/grpcio_tests/tests_aio/benchmark/server.py
  24. 1
      src/python/grpcio_tests/tests_aio/benchmark/worker.py
  25. 1
      src/python/grpcio_tests/tests_aio/interop/client.py
  26. 3
      src/python/grpcio_tests/tests_aio/interop/server.py
  27. 1
      src/python/grpcio_tests/tests_aio/tests.json
  28. 6
      src/python/grpcio_tests/tests_aio/unit/BUILD.bazel
  29. 10
      src/python/grpcio_tests/tests_aio/unit/_test_base.py
  30. 208
      src/python/grpcio_tests/tests_aio/unit/compatibility_test.py

@ -115,7 +115,7 @@ cdef class _AioCall(GrpcCallWrapper):
self._channel.channel,
NULL,
_EMPTY_MASK,
self._channel.cq.c_ptr(),
global_completion_queue(),
method_slice,
NULL,
c_deadline,

@ -35,6 +35,7 @@ cdef struct CallbackContext:
# management.
grpc_experimental_completion_queue_functor functor
cpython.PyObject *waiter
cpython.PyObject *loop
cpython.PyObject *failure_handler
cpython.PyObject *callback_wrapper

@ -32,9 +32,10 @@ cdef class CallbackFailureHandler:
cdef class CallbackWrapper:
def __cinit__(self, object future, CallbackFailureHandler failure_handler):
def __cinit__(self, object future, object loop, CallbackFailureHandler failure_handler):
self.context.functor.functor_run = self.functor_run
self.context.waiter = <cpython.PyObject*>future
self.context.loop = <cpython.PyObject*>loop
self.context.failure_handler = <cpython.PyObject*>failure_handler
self.context.callback_wrapper = <cpython.PyObject*>self
# NOTE(lidiz) Not using a list here, because this class is critical in
@ -82,6 +83,7 @@ async def execute_batch(GrpcCallWrapper grpc_call_wrapper,
cdef object future = loop.create_future()
cdef CallbackWrapper wrapper = CallbackWrapper(
future,
loop,
CallbackFailureHandler('execute_batch', operations, ExecuteBatchError))
cdef grpc_call_error error = grpc_call_start_batch(
grpc_call_wrapper.call,

@ -21,7 +21,6 @@ cdef enum AioChannelStatus:
cdef class AioChannel:
cdef:
grpc_channel * channel
BaseCompletionQueue cq
object loop
bytes _target
AioChannelStatus _status

@ -27,11 +27,11 @@ cdef CallbackFailureHandler _WATCH_CONNECTIVITY_FAILURE_HANDLER = CallbackFailur
cdef class AioChannel:
def __cinit__(self, bytes target, tuple options, ChannelCredentials credentials, object loop):
init_grpc_aio()
if options is None:
options = ()
cdef _ChannelArgs channel_args = _ChannelArgs(options)
self._target = target
self.cq = create_completion_queue()
self.loop = loop
self._status = AIO_CHANNEL_STATUS_READY
@ -47,6 +47,9 @@ cdef class AioChannel:
channel_args.c_args(),
NULL)
def __dealloc__(self):
shutdown_grpc_aio()
def __repr__(self):
class_name = self.__class__.__name__
id_ = id(self)
@ -78,12 +81,13 @@ cdef class AioChannel:
cdef object future = self.loop.create_future()
cdef CallbackWrapper wrapper = CallbackWrapper(
future,
self.loop,
_WATCH_CONNECTIVITY_FAILURE_HANDLER)
grpc_channel_watch_connectivity_state(
self.channel,
last_observed_state,
c_deadline,
self.cq.c_ptr(),
global_completion_queue(),
wrapper.c_functor())
try:
@ -111,7 +115,7 @@ cdef class AioChannel:
"""Assembles a Cython Call object.
Returns:
The _AioCall object.
An _AioCall object.
"""
if self.closed():
raise UsageError('Channel is closed.')

@ -99,3 +99,16 @@ class AbortError(BaseError):
class InternalError(BaseError):
"""Raised upon unexpected errors in native code."""
def schedule_coro_threadsafe(object coro, object loop):
try:
return loop.create_task(coro)
except RuntimeError as runtime_error:
if 'Non-thread-safe operation' in str(runtime_error):
return asyncio.run_coroutine_threadsafe(
coro,
loop,
)
else:
raise

@ -19,12 +19,14 @@ cdef class BaseCompletionQueue:
cdef class PollerCompletionQueue(BaseCompletionQueue):
cdef bint _shutdown
cdef object _shutdown_completed
cdef object _poller_thread
cdef object _loop
cdef void _poll(self) except *
cdef void shutdown(self) nogil
cdef class CallbackCompletionQueue(BaseCompletionQueue):
cdef object _shutdown_completed # asyncio.Future
cdef CallbackWrapper _wrapper
cdef object _loop

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from libc.stdio cimport printf
cdef gpr_timespec _GPR_INF_FUTURE = gpr_inf_future(GPR_CLOCK_REALTIME)
@ -21,9 +23,6 @@ def _handle_callback_wrapper(CallbackWrapper callback_wrapper, int success):
cdef class BaseCompletionQueue:
async def shutdown(self):
raise NotImplementedError()
cdef grpc_completion_queue* c_ptr(self):
return self._cq
@ -33,7 +32,6 @@ cdef class PollerCompletionQueue(BaseCompletionQueue):
def __cinit__(self):
self._cq = grpc_completion_queue_create_for_next(NULL)
self._shutdown = False
self._shutdown_completed = asyncio.get_event_loop().create_future()
self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True)
self._poller_thread.start()
@ -44,17 +42,17 @@ cdef class PollerCompletionQueue(BaseCompletionQueue):
while not self._shutdown:
with nogil:
event = grpc_completion_queue_next(self._cq,
_GPR_INF_FUTURE,
NULL)
_GPR_INF_FUTURE,
NULL)
if event.type == GRPC_QUEUE_TIMEOUT:
raise AssertionError("Core should not return timeout error!")
raise AssertionError("Core should not return GRPC_QUEUE_TIMEOUT!")
elif event.type == GRPC_QUEUE_SHUTDOWN:
self._shutdown = True
aio_loop_call_soon_threadsafe(self._shutdown_completed.set_result, None)
else:
context = <CallbackContext *>event.tag
aio_loop_call_soon_threadsafe(
loop = <object>context.loop
loop.call_soon_threadsafe(
_handle_callback_wrapper,
<CallbackWrapper>context.callback_wrapper,
event.success)
@ -62,19 +60,20 @@ cdef class PollerCompletionQueue(BaseCompletionQueue):
def _poll_wrapper(self):
self._poll()
async def shutdown(self):
cdef void shutdown(self) nogil:
# TODO(https://github.com/grpc/grpc/issues/22365) perform graceful shutdown
grpc_completion_queue_shutdown(self._cq)
await self._shutdown_completed
grpc_completion_queue_destroy(self._cq)
self._poller_thread.join()
cdef class CallbackCompletionQueue(BaseCompletionQueue):
def __cinit__(self):
self._shutdown_completed = grpc_aio_loop().create_future()
self._loop = asyncio.get_event_loop()
self._shutdown_completed = self._loop.create_future()
self._wrapper = CallbackWrapper(
self._shutdown_completed,
self._loop,
CQ_SHUTDOWN_FAILURE_HANDLER)
self._cq = grpc_completion_queue_create_for_callback(
self._wrapper.c_functor(),
@ -85,12 +84,3 @@ cdef class CallbackCompletionQueue(BaseCompletionQueue):
grpc_completion_queue_shutdown(self._cq)
await self._shutdown_completed
grpc_completion_queue_destroy(self._cq)
cdef BaseCompletionQueue create_completion_queue():
if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER:
return CallbackCompletionQueue()
elif grpc_aio_engine is AsyncIOEngine.POLLER:
return PollerCompletionQueue()
else:
raise ValueError('Unsupported engine type [%s]' % grpc_aio_engine)

@ -13,14 +13,31 @@
# limitations under the License.
# distutils: language=c++
cdef class _AioState:
cdef object lock # threading.RLock
cdef int refcount
cdef object engine # AsyncIOEngine
cdef BaseCompletionQueue cq
cdef grpc_completion_queue *global_completion_queue()
cdef init_grpc_aio()
cdef shutdown_grpc_aio()
cdef extern from "src/core/lib/iomgr/timer_manager.h":
void grpc_timer_manager_set_threading(bint enabled);
void grpc_timer_manager_set_threading(bint enabled)
cdef extern from "src/core/lib/iomgr/iomgr_internal.h":
void grpc_set_default_iomgr_platform();
void grpc_set_default_iomgr_platform()
cdef extern from "src/core/lib/iomgr/executor.h" namespace "grpc_core":
cdef cppclass Executor:
@staticmethod
void SetThreadingAll(bint enable);
void SetThreadingAll(bint enable)

@ -12,98 +12,128 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
cdef bint _grpc_aio_initialized = False
# NOTE(lidiz) Theoretically, applications can run in multiple event loops as
# long as they are in the same thread with same magic. This is not a supported
# use case. So, the gRPC Python Async Stack should use a single event loop
# picked by "init_grpc_aio".
cdef object _grpc_aio_loop # asyncio.AbstractEventLoop
cdef int64_t _event_loop_thread_ident
cdef str _GRPC_ASYNCIO_ENGINE = os.environ.get('GRPC_ASYNCIO_ENGINE', 'default').lower()
grpc_aio_engine = None
cdef object _grpc_initialization_lock = threading.Lock()
cdef str _GRPC_ASYNCIO_ENGINE = os.environ.get('GRPC_ASYNCIO_ENGINE', 'default').upper()
cdef _AioState _global_aio_state = _AioState()
class AsyncIOEngine(enum.Enum):
DEFAULT = 'default'
CUSTOM_IO_MANAGER = 'custom'
CUSTOM_IO_MANAGER = 'custom_io_manager'
POLLER = 'poller'
def init_grpc_aio():
global _grpc_aio_initialized
global _grpc_aio_loop
global _event_loop_thread_ident
global grpc_aio_engine
with _grpc_initialization_lock:
# Marks this function as called
if _grpc_aio_initialized:
return
else:
_grpc_aio_initialized = True
# Picks the engine for gRPC AsyncIO Stack
for engine_type in AsyncIOEngine:
if engine_type.value == _GRPC_ASYNCIO_ENGINE:
grpc_aio_engine = engine_type
break
if grpc_aio_engine is None or grpc_aio_engine is AsyncIOEngine.DEFAULT:
grpc_aio_engine = AsyncIOEngine.CUSTOM_IO_MANAGER
# Anchors the event loop that the gRPC library going to use.
_grpc_aio_loop = asyncio.get_event_loop()
_event_loop_thread_ident = threading.current_thread().ident
if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER:
# Activates asyncio IO manager.
# NOTE(lidiz) Custom IO manager must be activated before the first
# `grpc_init()`. Otherwise, some special configurations in Core won't
# pick up the change, and resulted in SEGFAULT or ABORT.
install_asyncio_iomgr()
# TODO(https://github.com/grpc/grpc/issues/22244) we need a the
# grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
# library won't shutdown cleanly.
grpc_init()
# Timers are triggered by the Asyncio loop. We disable
# the background thread that is being used by the native
# gRPC iomgr.
grpc_timer_manager_set_threading(False)
# gRPC callbaks are executed within the same thread used by the Asyncio
# event loop, as it is being done by the other Asyncio callbacks.
Executor.SetThreadingAll(False)
else:
# TODO(https://github.com/grpc/grpc/issues/22244) we need a the
# grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
# library won't shutdown cleanly.
grpc_init()
def grpc_aio_loop():
"""Returns the one-and-only gRPC Aio event loop."""
return _grpc_aio_loop
def aio_loop_schedule_coroutine(object coro):
"""Thread-safely schedules coroutine to gRPC Aio event loop.
If invoked within the same thread as the event loop, return an
Asyncio.Task. Otherwise, return a concurrent.futures.Future (the sync
Future). For non-asyncio threads, sync Future objects are probably easier
to handle (without worrying other thread-safety stuff).
cdef _default_asyncio_engine():
return AsyncIOEngine.CUSTOM_IO_MANAGER
cdef grpc_completion_queue *global_completion_queue():
return _global_aio_state.cq.c_ptr()
cdef class _AioState:
def __cinit__(self):
self.lock = threading.RLock()
self.refcount = 0
self.engine = None
self.cq = None
cdef _initialize_custom_io_manager():
# Activates asyncio IO manager.
# NOTE(lidiz) Custom IO manager must be activated before the first
# `grpc_init()`. Otherwise, some special configurations in Core won't
# pick up the change, and resulted in SEGFAULT or ABORT.
install_asyncio_iomgr()
# Initializes gRPC Core, must be called before other Core API
grpc_init()
# Timers are triggered by the Asyncio loop. We disable
# the background thread that is being used by the native
# gRPC iomgr.
grpc_timer_manager_set_threading(False)
# gRPC callbaks are executed within the same thread used by the Asyncio
# event loop, as it is being done by the other Asyncio callbacks.
Executor.SetThreadingAll(False)
# Creates the only completion queue
_global_aio_state.cq = CallbackCompletionQueue()
cdef _initialize_poller():
# Initializes gRPC Core, must be called before other Core API
grpc_init()
# Creates the only completion queue
_global_aio_state.cq = PollerCompletionQueue()
cdef _actual_aio_initialization():
# Picks the engine for gRPC AsyncIO Stack
_global_aio_state.engine = AsyncIOEngine.__members__.get(
_GRPC_ASYNCIO_ENGINE,
AsyncIOEngine.DEFAULT,
)
if _global_aio_state.engine is AsyncIOEngine.DEFAULT:
_global_aio_state.engine = _default_asyncio_engine()
_LOGGER.info('Using %s as I/O engine', _global_aio_state.engine)
# Initializes the process-level state accordingly
if _global_aio_state.engine is AsyncIOEngine.CUSTOM_IO_MANAGER:
_initialize_custom_io_manager()
elif _global_aio_state.engine is AsyncIOEngine.POLLER:
_initialize_poller()
else:
raise ValueError('Unsupported engine type [%s]' % _global_aio_state.engine)
def _grpc_shutdown_wrapper(_):
"""A thin Python wrapper of Core's shutdown function.
Define functions are not allowed in "cdef" functions, and Cython complains
about a simple lambda with a C function.
"""
if _event_loop_thread_ident != threading.current_thread().ident:
return asyncio.run_coroutine_threadsafe(coro, _grpc_aio_loop)
grpc_shutdown_blocking()
cdef _actual_aio_shutdown():
if _global_aio_state.engine is AsyncIOEngine.CUSTOM_IO_MANAGER:
future = schedule_coro_threadsafe(
_global_aio_state.cq.shutdown(),
(<CallbackCompletionQueue>_global_aio_state.cq)._loop
)
future.add_done_callback(_grpc_shutdown_wrapper)
elif _global_aio_state.engine is AsyncIOEngine.POLLER:
(<PollerCompletionQueue>_global_aio_state.cq).shutdown()
grpc_shutdown_blocking()
else:
return _grpc_aio_loop.create_task(coro)
raise ValueError('Unsupported engine type [%s]' % _global_aio_state.engine)
cdef init_grpc_aio():
"""Initialis the gRPC AsyncIO module.
Expected to be invoked on critical class constructors.
E.g., AioChannel, AioServer.
"""
with _global_aio_state.lock:
_global_aio_state.refcount += 1
if _global_aio_state.refcount == 1:
_actual_aio_initialization()
cdef shutdown_grpc_aio():
"""Shuts down the gRPC AsyncIO module.
def aio_loop_call_soon_threadsafe(object func, *args):
# TODO(lidiz) After we are confident, we can drop this assert. Otherwsie,
# we should limit this function to non-grpc-event-loop thread.
assert _event_loop_thread_ident != threading.current_thread().ident
return _grpc_aio_loop.call_soon_threadsafe(func, *args)
Expected to be invoked on critical class destructors.
E.g., AioChannel, AioServer.
"""
with _global_aio_state.lock:
assert _global_aio_state.refcount > 0
_global_aio_state.refcount -= 1
if not _global_aio_state.refcount:
_actual_aio_shutdown()

@ -14,6 +14,7 @@
cdef class _AsyncioResolver:
cdef:
object _loop
grpc_custom_resolver* _grpc_resolver
object _task_resolve

@ -15,6 +15,7 @@
cdef class _AsyncioResolver:
def __cinit__(self):
self._loop = asyncio.get_event_loop()
self._grpc_resolver = NULL
self._task_resolve = None
@ -32,7 +33,7 @@ cdef class _AsyncioResolver:
async def _async_resolve(self, bytes host, bytes port):
self._task_resolve = None
try:
resolved = await grpc_aio_loop().getaddrinfo(host, port)
resolved = await self._loop.getaddrinfo(host, port)
except Exception as e:
grpc_custom_resolve_callback(
<grpc_custom_resolver*>self._grpc_resolver,
@ -50,6 +51,6 @@ cdef class _AsyncioResolver:
cdef void resolve(self, char* host, char* port):
assert not self._task_resolve
self._task_resolve = grpc_aio_loop().create_task(
self._task_resolve = self._loop.create_task(
self._async_resolve(host, port)
)

@ -37,6 +37,7 @@ cdef class _AsyncioSocket:
self._py_socket = None
self._peername = None
self._closed = False
self._loop = asyncio.get_event_loop()
@staticmethod
cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket,
@ -90,7 +91,7 @@ cdef class _AsyncioSocket:
assert not self._reader
assert not self._task_connect
self._task_connect = grpc_aio_loop().create_task(
self._task_connect = self._loop.create_task(
self._async_connect(host, port)
)
self._grpc_connect_cb = grpc_connect_cb
@ -122,7 +123,7 @@ cdef class _AsyncioSocket:
self._grpc_read_cb = grpc_read_cb
self._read_buffer = buffer_
self._task_read = grpc_aio_loop().create_task(self._async_read(length))
self._task_read = self._loop.create_task(self._async_read(length))
async def _async_write(self, bytearray outbound_buffer):
self._writer.write(outbound_buffer)
@ -155,7 +156,7 @@ cdef class _AsyncioSocket:
outbound_buffer.extend(<bytes>start[:length])
self._grpc_write_cb = grpc_write_cb
self._task_write = grpc_aio_loop().create_task(self._async_write(outbound_buffer))
self._task_write = self._loop.create_task(self._async_write(outbound_buffer))
cdef bint is_connected(self):
return self._reader and not self._reader._transport.is_closing()
@ -209,7 +210,7 @@ cdef class _AsyncioSocket:
sock=self._py_socket,
)
self._task_listen = grpc_aio_loop().create_task(create_asyncio_server())
self._task_listen = self._loop.create_task(create_asyncio_server())
cdef accept(self,
grpc_custom_socket* grpc_socket_client,

@ -17,6 +17,7 @@ cdef class _AsyncioTimer:
grpc_custom_timer * _grpc_timer
object _timer_future
bint _active
object _loop
@staticmethod
cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, float timeout)

@ -18,13 +18,14 @@ cdef class _AsyncioTimer:
self._grpc_timer = NULL
self._timer_future = None
self._active = False
self._loop = asyncio.get_event_loop()
cpython.Py_INCREF(self)
@staticmethod
cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, float timeout):
timer = _AsyncioTimer()
timer._grpc_timer = grpc_timer
timer._timer_future = grpc_aio_loop().call_later(timeout, timer.on_time_up)
timer._timer_future = timer._loop.call_later(timeout, timer.on_time_up)
timer._active = True
return timer

@ -51,7 +51,6 @@ cdef enum AioServerStatus:
cdef class AioServer:
cdef Server _server
cdef BaseCompletionQueue _cq
cdef list _generic_handlers
cdef AioServerStatus _status
cdef object _loop # asyncio.EventLoop

@ -610,13 +610,13 @@ cdef class AioServer:
def __init__(self, loop, thread_pool, generic_handlers, interceptors,
options, maximum_concurrent_rpcs):
init_grpc_aio()
# NOTE(lidiz) Core objects won't be deallocated automatically.
# If AioServer.shutdown is not called, those objects will leak.
self._server = Server(options)
self._cq = create_completion_queue()
grpc_server_register_completion_queue(
self._server.c_server,
self._cq.c_ptr(),
global_completion_queue(),
NULL
)
@ -631,6 +631,7 @@ cdef class AioServer:
self._shutdown_completed = self._loop.create_future()
self._shutdown_callback_wrapper = CallbackWrapper(
self._shutdown_completed,
self._loop,
SERVER_SHUTDOWN_FAILURE_HANDLER)
self._crash_exception = None
@ -659,11 +660,12 @@ cdef class AioServer:
cdef object future = self._loop.create_future()
cdef CallbackWrapper wrapper = CallbackWrapper(
future,
self._loop,
REQUEST_CALL_FAILURE_HANDLER)
error = grpc_server_request_call(
self._server.c_server, &rpc_state.call, &rpc_state.details,
&rpc_state.request_metadata,
self._cq.c_ptr(), self._cq.c_ptr(),
global_completion_queue(), global_completion_queue(),
wrapper.c_functor()
)
if error != GRPC_CALL_OK:
@ -736,7 +738,7 @@ cdef class AioServer:
# The shutdown callback won't be called until there is no live RPC.
grpc_server_shutdown_and_notify(
self._server.c_server,
self._cq.c_ptr(),
global_completion_queue(),
self._shutdown_callback_wrapper.c_functor())
# Ensures the serving task (coroutine) exits.
@ -789,9 +791,6 @@ cdef class AioServer:
self._server.is_shutdown = True
self._status = AIO_SERVER_STATUS_STOPPED
# Shuts down the completion queue
await self._cq.shutdown()
async def wait_for_termination(self, object timeout):
if timeout is None:
await self._shutdown_completed
@ -823,3 +822,4 @@ cdef class AioServer:
self,
self._status
)
shutdown_grpc_aio()

@ -20,7 +20,6 @@ import os
import sys
import threading
import time
import enum
import grpc

@ -21,7 +21,7 @@ from typing import Any, Optional, Sequence, Tuple
import grpc
from grpc._cython.cygrpc import (EOF, AbortError, BaseError, InternalError,
UsageError, init_grpc_aio)
UsageError)
from ._base_call import (Call, RpcContext, StreamStreamCall, StreamUnaryCall,
UnaryStreamCall, UnaryUnaryCall)
@ -46,7 +46,6 @@ __all__ = (
'UnaryStreamCall',
'StreamUnaryCall',
'StreamStreamCall',
'init_grpc_aio',
'Channel',
'UnaryUnaryMultiCallable',
'UnaryStreamMultiCallable',

@ -228,7 +228,7 @@ class Channel(_base_channel.Channel):
"UnaryUnaryClientInterceptors, the following are invalid: {}"\
.format(invalid_interceptors))
self._loop = cygrpc.grpc_aio_loop()
self._loop = asyncio.get_event_loop()
self._channel = cygrpc.AioChannel(
_common.encode(target),
_augment_channel_arguments(options, compression), credentials,

@ -13,6 +13,7 @@
# limitations under the License.
"""Server-side implementation of gRPC Asyncio Python."""
import asyncio
from concurrent.futures import Executor
from typing import Any, Optional, Sequence
@ -40,7 +41,7 @@ class Server(_base_server.Server):
options: ChannelArgumentType,
maximum_concurrent_rpcs: Optional[int],
compression: Optional[grpc.Compression]):
self._loop = cygrpc.grpc_aio_loop()
self._loop = asyncio.get_event_loop()
if interceptors:
invalid_interceptors = [
interceptor for interceptor in interceptors
@ -162,7 +163,10 @@ class Server(_base_server.Server):
be safe to slightly extend the underlying Cython object's life span.
"""
if hasattr(self, '_server'):
cygrpc.aio_loop_schedule_coroutine(self._server.shutdown(None))
cygrpc.schedule_coro_threadsafe(
self._server.shutdown(None),
self._loop,
)
def server(migration_thread_pool: Optional[Executor] = None,

@ -151,8 +151,6 @@ class TestAio(setuptools.Command):
def run(self):
self._add_eggs_to_path()
from grpc.experimental.aio import init_grpc_aio
init_grpc_aio()
import tests
loader = tests.Loader()

@ -36,7 +36,6 @@ async def _start_async_server():
def main():
aio.init_grpc_aio()
loop = asyncio.get_event_loop()
loop.create_task(_start_async_server())
loop.run_forever()

@ -23,7 +23,6 @@ from tests_aio.benchmark import worker_servicer
async def run_worker_server(port: int) -> None:
aio.init_grpc_aio()
server = aio.server()
servicer = worker_servicer.WorkerServicer()

@ -47,7 +47,6 @@ def _test_case_from_arg(test_case_arg):
async def test_interoperability():
aio.init_grpc_aio()
args = interop_client_lib.parse_interop_client_args()
channel = _create_channel(args)

@ -18,7 +18,6 @@ import argparse
import logging
import grpc
from grpc.experimental.aio import init_grpc_aio
from tests.interop import server as interop_server_lib
from tests_aio.unit import _test_server
@ -29,8 +28,6 @@ _LOGGER.setLevel(logging.DEBUG)
async def serve():
init_grpc_aio()
args = interop_server_lib.parse_interop_server_arguments()
if args.use_tls:

@ -15,6 +15,7 @@
"unit.client_interceptor_test.TestInterceptedUnaryUnaryCall",
"unit.client_interceptor_test.TestUnaryUnaryClientInterceptor",
"unit.close_channel_test.TestCloseChannel",
"unit.compatibility_test.TestCompatibility",
"unit.compression_test.TestCompression",
"unit.connectivity_test.TestConnectivityState",
"unit.done_callback_test.TestDoneCallback",

@ -50,6 +50,11 @@ py_library(
srcs_version = "PY3",
)
_FLAKY_TESTS = [
# TODO(https://github.com/grpc/grpc/issues/22347) remove from this list.
"channel_argument_test.py",
]
[
py_test(
name = test_file_name[:-3],
@ -58,6 +63,7 @@ py_library(
data = [
"//src/python/grpcio_tests/tests/unit/credentials",
],
flaky = test_file_name in _FLAKY_TESTS,
imports = ["../../"],
main = test_file_name,
python_version = "PY3",

@ -46,10 +46,13 @@ def _get_default_loop(debug=True):
# NOTE(gnossen) this test class can also be implemented with metaclass.
class AioTestBase(unittest.TestCase):
# NOTE(lidi) We need to pick a loop for entire testing phase, otherwise it
# will trigger create new loops in new threads, leads to deadlock.
_TEST_LOOP = _get_default_loop()
@property
def loop(self):
return _get_default_loop()
return self._TEST_LOOP
def __getattribute__(self, name):
"""Overrides the loading logic to support coroutine functions."""
@ -58,9 +61,6 @@ class AioTestBase(unittest.TestCase):
# If possible, converts the coroutine into a sync function.
if name.startswith('test_') or name in _COROUTINE_FUNCTION_ALLOWLIST:
if asyncio.iscoroutinefunction(attr):
return _async_to_sync_decorator(attr, _get_default_loop())
return _async_to_sync_decorator(attr, self._TEST_LOOP)
# For other attributes, let them pass.
return attr
aio.init_grpc_aio()

@ -0,0 +1,208 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Testing the compatibility between AsyncIO stack and the old stack."""
import asyncio
import logging
import os
import random
import threading
import unittest
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Sequence, Tuple
import grpc
from grpc.experimental import aio
from src.proto.grpc.testing import messages_pb2, test_pb2_grpc
from tests.unit.framework.common import test_constants
from tests_aio.unit._test_base import AioTestBase
from tests_aio.unit._test_server import start_test_server
_NUM_STREAM_RESPONSES = 5
_REQUEST_PAYLOAD_SIZE = 7
_RESPONSE_PAYLOAD_SIZE = 42
def _unique_options() -> Sequence[Tuple[str, float]]:
return (('iv', random.random()),)
@unittest.skipIf(
os.environ.get('GRPC_ASYNCIO_ENGINE', '').lower() != 'poller',
'Compatible mode needs POLLER completion queue.')
class TestCompatibility(AioTestBase):
async def setUp(self):
address, self._async_server = await start_test_server()
# Create async stub
self._async_channel = aio.insecure_channel(address,
options=_unique_options())
self._async_stub = test_pb2_grpc.TestServiceStub(self._async_channel)
# Create sync stub
self._sync_channel = grpc.insecure_channel(address,
options=_unique_options())
self._sync_stub = test_pb2_grpc.TestServiceStub(self._sync_channel)
async def tearDown(self):
self._sync_channel.close()
await self._async_channel.close()
await self._async_server.stop(None)
async def _run_in_another_thread(self, func: Callable[[], None]):
work_done = asyncio.Event(loop=self.loop)
def thread_work():
func()
self.loop.call_soon_threadsafe(work_done.set)
thread = threading.Thread(target=thread_work, daemon=True)
thread.start()
await work_done.wait()
thread.join()
async def test_unary_unary(self):
# Calling async API in this thread
await self._async_stub.UnaryCall(messages_pb2.SimpleRequest(),
timeout=test_constants.LONG_TIMEOUT)
# Calling sync API in a different thread
def sync_work() -> None:
response, call = self._sync_stub.UnaryCall.with_call(
messages_pb2.SimpleRequest(),
timeout=test_constants.LONG_TIMEOUT)
self.assertIsInstance(response, messages_pb2.SimpleResponse)
self.assertEqual(grpc.StatusCode.OK, call.code())
await self._run_in_another_thread(sync_work)
async def test_unary_stream(self):
request = messages_pb2.StreamingOutputCallRequest()
for _ in range(_NUM_STREAM_RESPONSES):
request.response_parameters.append(
messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE))
# Calling async API in this thread
call = self._async_stub.StreamingOutputCall(request)
for _ in range(_NUM_STREAM_RESPONSES):
await call.read()
self.assertEqual(grpc.StatusCode.OK, await call.code())
# Calling sync API in a different thread
def sync_work() -> None:
response_iterator = self._sync_stub.StreamingOutputCall(request)
for response in response_iterator:
assert _RESPONSE_PAYLOAD_SIZE == len(response.payload.body)
self.assertEqual(grpc.StatusCode.OK, response_iterator.code())
await self._run_in_another_thread(sync_work)
async def test_stream_unary(self):
payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE)
request = messages_pb2.StreamingInputCallRequest(payload=payload)
# Calling async API in this thread
async def gen():
for _ in range(_NUM_STREAM_RESPONSES):
yield request
response = await self._async_stub.StreamingInputCall(gen())
self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE,
response.aggregated_payload_size)
# Calling sync API in a different thread
def sync_work() -> None:
response = self._sync_stub.StreamingInputCall(
iter([request] * _NUM_STREAM_RESPONSES))
self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE,
response.aggregated_payload_size)
await self._run_in_another_thread(sync_work)
async def test_stream_stream(self):
request = messages_pb2.StreamingOutputCallRequest()
request.response_parameters.append(
messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE))
# Calling async API in this thread
call = self._async_stub.FullDuplexCall()
for _ in range(_NUM_STREAM_RESPONSES):
await call.write(request)
response = await call.read()
assert _RESPONSE_PAYLOAD_SIZE == len(response.payload.body)
await call.done_writing()
assert await call.code() == grpc.StatusCode.OK
# Calling sync API in a different thread
def sync_work() -> None:
response_iterator = self._sync_stub.FullDuplexCall(iter([request]))
for response in response_iterator:
assert _RESPONSE_PAYLOAD_SIZE == len(response.payload.body)
self.assertEqual(grpc.StatusCode.OK, response_iterator.code())
await self._run_in_another_thread(sync_work)
async def test_server(self):
class GenericHandlers(grpc.GenericRpcHandler):
def service(self, handler_call_details):
return grpc.unary_unary_rpc_method_handler(lambda x, _: x)
# It's fine to instantiate server object in the event loop thread.
# The server will spawn its own serving thread.
server = grpc.server(ThreadPoolExecutor(),
handlers=(GenericHandlers(),))
port = server.add_insecure_port('localhost:0')
server.start()
def sync_work() -> None:
for _ in range(100):
with grpc.insecure_channel('localhost:%d' % port) as channel:
response = channel.unary_unary('/test/test')(b'\x07\x08')
self.assertEqual(response, b'\x07\x08')
await self._run_in_another_thread(sync_work)
async def test_many_loop(self):
address, server = await start_test_server()
# Run another loop in another thread
def sync_work():
async def async_work():
# Create async stub
async_channel = aio.insecure_channel(address,
options=_unique_options())
async_stub = test_pb2_grpc.TestServiceStub(async_channel)
call = async_stub.UnaryCall(messages_pb2.SimpleRequest())
response = await call
self.assertIsInstance(response, messages_pb2.SimpleResponse)
self.assertEqual(grpc.StatusCode.OK, await call.code())
loop = asyncio.new_event_loop()
loop.run_until_complete(async_work())
await self._run_in_another_thread(sync_work)
await server.stop(None)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
unittest.main(verbosity=2)
Loading…
Cancel
Save