Allow multiple asyncio engine

pull/22258/head
Lidi Zheng 5 years ago
parent efd483311c
commit 221a50bf87
  1. 8
      src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi
  2. 21
      src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi
  3. 3
      src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi
  4. 3
      src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi
  5. 13
      src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi
  6. 41
      src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi
  7. 41
      src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi
  8. 3
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi
  9. 5
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
  10. 2
      src/python/grpcio/grpc/_cython/cygrpc.pxd
  11. 3
      src/python/grpcio/grpc/_cython/cygrpc.pyx
  12. 2
      tools/bazel.rc
  13. 1
      tools/internal_ci/linux/grpc_python_bazel_test_in_docker.sh

@ -52,13 +52,5 @@ cdef class CallbackWrapper:
cdef grpc_experimental_completion_queue_functor *c_functor(self)
cdef class CallbackCompletionQueue:
cdef grpc_completion_queue *_cq
cdef object _shutdown_completed # asyncio.Future
cdef CallbackWrapper _wrapper
cdef grpc_completion_queue* c_ptr(self)
cdef class GrpcCallWrapper:
cdef grpc_call* call

@ -69,27 +69,6 @@ cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler
InternalError)
cdef class CallbackCompletionQueue:
def __cinit__(self):
self._shutdown_completed = grpc_aio_loop().create_future()
self._wrapper = CallbackWrapper(
self._shutdown_completed,
CQ_SHUTDOWN_FAILURE_HANDLER)
self._cq = grpc_completion_queue_create_for_callback(
self._wrapper.c_functor(),
NULL
)
cdef grpc_completion_queue* c_ptr(self):
return self._cq
async def shutdown(self):
grpc_completion_queue_shutdown(self._cq)
await self._shutdown_completed
grpc_completion_queue_destroy(self._cq)
class ExecuteBatchError(Exception): pass

@ -21,8 +21,7 @@ cdef enum AioChannelStatus:
cdef class AioChannel:
cdef:
grpc_channel * channel
# CallbackCompletionQueue cq
BackgroundCompletionQueue cq
BaseCompletionQueue cq
object loop
bytes _target
AioChannelStatus _status

@ -31,8 +31,7 @@ cdef class AioChannel:
options = ()
cdef _ChannelArgs channel_args = _ChannelArgs(options)
self._target = target
# self.cq = CallbackCompletionQueue()
self.cq = BackgroundCompletionQueue()
self.cq = create_completion_queue()
self.loop = loop
self._status = AIO_CHANNEL_STATUS_READY

@ -1,6 +1,10 @@
cdef gpr_timespec _GPR_INF_FUTURE = gpr_inf_future(GPR_CLOCK_REALTIME)
cdef class BackgroundCompletionQueue:
cdef class BaseCompletionQueue:
cdef grpc_completion_queue* c_ptr(self)
cdef class PollerCompletionQueue(BaseCompletionQueue):
cdef grpc_completion_queue *_cq
cdef bint _shutdown
cdef object _shutdown_completed
@ -8,4 +12,9 @@ cdef class BackgroundCompletionQueue:
cdef object _poller_running
cdef _polling(self)
cdef grpc_completion_queue* c_ptr(self)
cdef class CallbackCompletionQueue(BaseCompletionQueue):
cdef grpc_completion_queue *_cq
cdef object _shutdown_completed # asyncio.Future
cdef CallbackWrapper _wrapper

@ -19,7 +19,16 @@ def _handle_callback_wrapper(CallbackWrapper callback_wrapper, int success):
CallbackWrapper.functor_run(callback_wrapper.c_functor(), success)
cdef class BackgroundCompletionQueue:
cdef class BaseCompletionQueue:
async def shutdown(self):
raise NotImplementedError()
cdef grpc_completion_queue* c_ptr(self):
raise NotImplementedError()
cdef class PollerCompletionQueue(BaseCompletionQueue):
def __cinit__(self):
self._cq = grpc_completion_queue_create_for_next(NULL)
@ -65,3 +74,33 @@ cdef class BackgroundCompletionQueue:
cdef grpc_completion_queue* c_ptr(self):
return self._cq
cdef class CallbackCompletionQueue(BaseCompletionQueue):
def __cinit__(self):
self._shutdown_completed = grpc_aio_loop().create_future()
self._wrapper = CallbackWrapper(
self._shutdown_completed,
CQ_SHUTDOWN_FAILURE_HANDLER)
self._cq = grpc_completion_queue_create_for_callback(
self._wrapper.c_functor(),
NULL
)
cdef grpc_completion_queue* c_ptr(self):
return self._cq
async def shutdown(self):
grpc_completion_queue_shutdown(self._cq)
await self._shutdown_completed
grpc_completion_queue_destroy(self._cq)
cdef BaseCompletionQueue create_completion_queue():
if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER:
return CallbackCompletionQueue()
elif grpc_aio_engine is AsyncIOEngine.POLLER:
return PollerCompletionQueue()
else:
raise ValueError('Unexpected engine type [%s]' % grpc_aio_engine)

@ -20,38 +20,57 @@ cdef bint _grpc_aio_initialized = False
# a single event loop picked by "init_grpc_aio".
cdef object _grpc_aio_loop
cdef object _event_loop_thread_ident
cdef str _GRPC_ASYNCIO_ENGINE = os.environ.get('GRPC_ASYNCIO_ENGINE', 'default').lower()
grpc_aio_engine = None
class AsyncIOEngine(enum.Enum):
DEFAULT = 'default'
CUSTOM_IO_MANAGER = 'custom'
CQ_POLLER = 'poller'
def init_grpc_aio():
global _grpc_aio_initialized
global _grpc_aio_loop
global _event_loop_thread_ident
global grpc_aio_engine
# Marks this function as called
if _grpc_aio_initialized:
return
else:
_grpc_aio_initialized = True
_event_loop_thread_ident = threading.current_thread().ident
# Picks the engine for gRPC AsyncIO Stack
for engine_type in AsyncIOEngine:
if engine_type.value == _GRPC_ASYNCIO_ENGINE:
grpc_aio_engine = engine_type
break
if grpc_aio_engine is None or grpc_aio_engine is AsyncIOEngine.DEFAULT:
grpc_aio_engine = AsyncIOEngine.CUSTOM_IO_MANAGER
# Anchors the event loop that the gRPC library going to use.
_grpc_aio_loop = asyncio.get_event_loop()
# Activates asyncio IO manager
# install_asyncio_iomgr()
_event_loop_thread_ident = threading.current_thread().ident
# TODO(https://github.com/grpc/grpc/issues/22244) we need a the
# grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
# library won't shutdown cleanly.
grpc_init()
# Timers are triggered by the Asyncio loop. We disable
# the background thread that is being used by the native
# gRPC iomgr.
# grpc_timer_manager_set_threading(False)
if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER:
# Activates asyncio IO manager
install_asyncio_iomgr()
# Timers are triggered by the Asyncio loop. We disable
# the background thread that is being used by the native
# gRPC iomgr.
grpc_timer_manager_set_threading(False)
# gRPC callbaks are executed within the same thread used by the Asyncio
# event loop, as it is being done by the other Asyncio callbacks.
# Executor.SetThreadingAll(False)
# gRPC callbaks are executed within the same thread used by the Asyncio
# event loop, as it is being done by the other Asyncio callbacks.
Executor.SetThreadingAll(False)
_grpc_aio_initialized = False

@ -51,8 +51,7 @@ cdef enum AioServerStatus:
cdef class AioServer:
cdef Server _server
# cdef CallbackCompletionQueue _cq
cdef BackgroundCompletionQueue _cq
cdef BaseCompletionQueue _cq
cdef list _generic_handlers
cdef AioServerStatus _status
cdef object _loop # asyncio.EventLoop

@ -613,8 +613,7 @@ cdef class AioServer:
# NOTE(lidiz) Core objects won't be deallocated automatically.
# If AioServer.shutdown is not called, those objects will leak.
self._server = Server(options)
# self._cq = CallbackCompletionQueue()
self._cq = BackgroundCompletionQueue()
self._cq = create_completion_queue()
grpc_server_register_completion_queue(
self._server.c_server,
self._cq.c_ptr(),
@ -737,7 +736,7 @@ cdef class AioServer:
# The shutdown callback won't be called until there is no live RPC.
grpc_server_shutdown_and_notify(
self._server.c_server,
self._cq._cq,
self._cq.c_ptr(),
self._shutdown_callback_wrapper.c_functor())
# Ensures the serving task (coroutine) exits.

@ -45,7 +45,7 @@ IF UNAME_SYSNAME != "Windows":
include "_cygrpc/aio/iomgr/socket.pxd.pxi"
include "_cygrpc/aio/iomgr/timer.pxd.pxi"
include "_cygrpc/aio/iomgr/resolver.pxd.pxi"
include "_cygrpc/aio/poller.pxd.pxi"
include "_cygrpc/aio/completion_queue.pxd.pxi"
include "_cygrpc/aio/rpc_status.pxd.pxi"
include "_cygrpc/aio/grpc_aio.pxd.pxi"
include "_cygrpc/aio/callback_common.pxd.pxi"

@ -20,6 +20,7 @@ import os
import sys
import threading
import time
import enum
import grpc
@ -71,8 +72,8 @@ include "_cygrpc/aio/iomgr/timer.pyx.pxi"
include "_cygrpc/aio/iomgr/resolver.pyx.pxi"
include "_cygrpc/aio/common.pyx.pxi"
include "_cygrpc/aio/rpc_status.pyx.pxi"
include "_cygrpc/aio/completion_queue.pyx.pxi"
include "_cygrpc/aio/callback_common.pyx.pxi"
include "_cygrpc/aio/poller.pyx.pxi"
include "_cygrpc/aio/grpc_aio.pyx.pxi"
include "_cygrpc/aio/call.pyx.pxi"
include "_cygrpc/aio/channel.pyx.pxi"

@ -89,3 +89,5 @@ build:basicprof --copt=-DGRPC_BASIC_PROFILER
build:basicprof --copt=-DGRPC_TIMERS_RDTSC
build:python_single_threaded_unary_stream --test_env="GRPC_SINGLE_THREADED_UNARY_STREAM=true"
build:python_poller_engine --test_env="GRPC_ASYNCIO_ENGINE=poller"

@ -28,6 +28,7 @@ TEST_TARGETS="//src/python/... //examples/python/..."
BAZEL_FLAGS="--spawn_strategy=standalone --genrule_strategy=standalone --test_output=errors"
bazel test ${BAZEL_FLAGS} ${TEST_TARGETS}
bazel test --config=python_single_threaded_unary_stream ${BAZEL_FLAGS} ${TEST_TARGETS}
bazel test --config=python_poller_engine ${BAZEL_FLAGS} ${TEST_TARGETS}
# TODO(https://github.com/grpc/grpc/issues/19854): Move this to a new Kokoro

Loading…
Cancel
Save