Merge pull request #22258 from lidizheng/aio-cq

[Aio] Add an alternative mode for the AsyncIO Stack
pull/22349/head
Lidi Zheng 5 years ago committed by GitHub
commit fda2111af7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pxd.pxi
  2. 21
      src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi
  3. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi
  4. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi
  5. 30
      src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi
  6. 96
      src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi
  7. 99
      src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi
  8. 1
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi
  9. 4
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi
  10. 14
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi
  11. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi
  12. 4
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
  13. 1
      src/python/grpcio/grpc/_cython/cygrpc.pxd
  14. 2
      src/python/grpcio/grpc/_cython/cygrpc.pyx
  15. 2
      src/python/grpcio/grpc/experimental/aio/_server.py
  16. 14
      src/python/grpcio_tests/tests_aio/unit/server_test.py
  17. 2
      tools/bazel.rc
  18. 1
      tools/internal_ci/linux/grpc_python_bazel_test_in_docker.sh

@ -52,13 +52,5 @@ cdef class CallbackWrapper:
cdef grpc_experimental_completion_queue_functor *c_functor(self)
cdef class CallbackCompletionQueue:
cdef grpc_completion_queue *_cq
cdef object _shutdown_completed # asyncio.Future
cdef CallbackWrapper _wrapper
cdef grpc_completion_queue* c_ptr(self)
cdef class GrpcCallWrapper:
cdef grpc_call* call

@ -69,27 +69,6 @@ cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler
InternalError)
cdef class CallbackCompletionQueue:
def __cinit__(self):
self._shutdown_completed = grpc_aio_loop().create_future()
self._wrapper = CallbackWrapper(
self._shutdown_completed,
CQ_SHUTDOWN_FAILURE_HANDLER)
self._cq = grpc_completion_queue_create_for_callback(
self._wrapper.c_functor(),
NULL
)
cdef grpc_completion_queue* c_ptr(self):
return self._cq
async def shutdown(self):
grpc_completion_queue_shutdown(self._cq)
await self._shutdown_completed
grpc_completion_queue_destroy(self._cq)
class ExecuteBatchError(Exception): pass

@ -21,7 +21,7 @@ cdef enum AioChannelStatus:
cdef class AioChannel:
cdef:
grpc_channel * channel
CallbackCompletionQueue cq
BaseCompletionQueue cq
object loop
bytes _target
AioChannelStatus _status

@ -31,7 +31,7 @@ cdef class AioChannel:
options = ()
cdef _ChannelArgs channel_args = _ChannelArgs(options)
self._target = target
self.cq = CallbackCompletionQueue()
self.cq = create_completion_queue()
self.loop = loop
self._status = AIO_CHANNEL_STATUS_READY

@ -0,0 +1,30 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class BaseCompletionQueue:
cdef grpc_completion_queue *_cq
cdef grpc_completion_queue* c_ptr(self)
cdef class PollerCompletionQueue(BaseCompletionQueue):
cdef bint _shutdown
cdef object _shutdown_completed
cdef object _poller_thread
cdef void _poll(self) except *
cdef class CallbackCompletionQueue(BaseCompletionQueue):
cdef object _shutdown_completed # asyncio.Future
cdef CallbackWrapper _wrapper

@ -0,0 +1,96 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef gpr_timespec _GPR_INF_FUTURE = gpr_inf_future(GPR_CLOCK_REALTIME)
def _handle_callback_wrapper(CallbackWrapper callback_wrapper, int success):
CallbackWrapper.functor_run(callback_wrapper.c_functor(), success)
cdef class BaseCompletionQueue:
async def shutdown(self):
raise NotImplementedError()
cdef grpc_completion_queue* c_ptr(self):
return self._cq
cdef class PollerCompletionQueue(BaseCompletionQueue):
def __cinit__(self):
self._cq = grpc_completion_queue_create_for_next(NULL)
self._shutdown = False
self._shutdown_completed = asyncio.get_event_loop().create_future()
self._poller_thread = threading.Thread(target=self._poll_wrapper, daemon=True)
self._poller_thread.start()
cdef void _poll(self) except *:
cdef grpc_event event
cdef CallbackContext *context
while not self._shutdown:
with nogil:
event = grpc_completion_queue_next(self._cq,
_GPR_INF_FUTURE,
NULL)
if event.type == GRPC_QUEUE_TIMEOUT:
raise AssertionError("Core should not return timeout error!")
elif event.type == GRPC_QUEUE_SHUTDOWN:
self._shutdown = True
aio_loop_call_soon_threadsafe(self._shutdown_completed.set_result, None)
else:
context = <CallbackContext *>event.tag
aio_loop_call_soon_threadsafe(
_handle_callback_wrapper,
<CallbackWrapper>context.callback_wrapper,
event.success)
def _poll_wrapper(self):
self._poll()
async def shutdown(self):
grpc_completion_queue_shutdown(self._cq)
await self._shutdown_completed
grpc_completion_queue_destroy(self._cq)
self._poller_thread.join()
cdef class CallbackCompletionQueue(BaseCompletionQueue):
def __cinit__(self):
self._shutdown_completed = grpc_aio_loop().create_future()
self._wrapper = CallbackWrapper(
self._shutdown_completed,
CQ_SHUTDOWN_FAILURE_HANDLER)
self._cq = grpc_completion_queue_create_for_callback(
self._wrapper.c_functor(),
NULL
)
async def shutdown(self):
grpc_completion_queue_shutdown(self._cq)
await self._shutdown_completed
grpc_completion_queue_destroy(self._cq)
cdef BaseCompletionQueue create_completion_queue():
if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER:
return CallbackCompletionQueue()
elif grpc_aio_engine is AsyncIOEngine.POLLER:
return PollerCompletionQueue()
else:
raise ValueError('Unsupported engine type [%s]' % grpc_aio_engine)

@ -15,44 +15,95 @@
cdef bint _grpc_aio_initialized = False
# NOTE(lidiz) Theoretically, applications can run in multiple event loops as
# long as they are in the same thread with same magic. However, I don't think
# we should support this use case. So, the gRPC Python Async Stack should use
# a single event loop picked by "init_grpc_aio".
cdef object _grpc_aio_loop
# long as they are in the same thread with same magic. This is not a supported
# use case. So, the gRPC Python Async Stack should use a single event loop
# picked by "init_grpc_aio".
cdef object _grpc_aio_loop # asyncio.AbstractEventLoop
cdef int64_t _event_loop_thread_ident
cdef str _GRPC_ASYNCIO_ENGINE = os.environ.get('GRPC_ASYNCIO_ENGINE', 'default').lower()
grpc_aio_engine = None
cdef object _grpc_initialization_lock = threading.Lock()
class AsyncIOEngine(enum.Enum):
DEFAULT = 'default'
CUSTOM_IO_MANAGER = 'custom'
POLLER = 'poller'
def init_grpc_aio():
global _grpc_aio_initialized
global _grpc_aio_loop
global _event_loop_thread_ident
global grpc_aio_engine
if _grpc_aio_initialized:
return
else:
_grpc_aio_initialized = True
with _grpc_initialization_lock:
# Marks this function as called
if _grpc_aio_initialized:
return
else:
_grpc_aio_initialized = True
# Anchors the event loop that the gRPC library going to use.
_grpc_aio_loop = asyncio.get_event_loop()
# Picks the engine for gRPC AsyncIO Stack
for engine_type in AsyncIOEngine:
if engine_type.value == _GRPC_ASYNCIO_ENGINE:
grpc_aio_engine = engine_type
break
if grpc_aio_engine is None or grpc_aio_engine is AsyncIOEngine.DEFAULT:
grpc_aio_engine = AsyncIOEngine.CUSTOM_IO_MANAGER
# Activates asyncio IO manager
install_asyncio_iomgr()
# Anchors the event loop that the gRPC library going to use.
_grpc_aio_loop = asyncio.get_event_loop()
_event_loop_thread_ident = threading.current_thread().ident
# TODO(https://github.com/grpc/grpc/issues/22244) we need a the
# grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
# library won't shutdown cleanly.
grpc_init()
if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER:
# Activates asyncio IO manager.
# NOTE(lidiz) Custom IO manager must be activated before the first
# `grpc_init()`. Otherwise, some special configurations in Core won't
# pick up the change, and resulted in SEGFAULT or ABORT.
install_asyncio_iomgr()
# Timers are triggered by the Asyncio loop. We disable
# the background thread that is being used by the native
# gRPC iomgr.
grpc_timer_manager_set_threading(False)
# TODO(https://github.com/grpc/grpc/issues/22244) we need a the
# grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
# library won't shutdown cleanly.
grpc_init()
# gRPC callbaks are executed within the same thread used by the Asyncio
# event loop, as it is being done by the other Asyncio callbacks.
Executor.SetThreadingAll(False)
# Timers are triggered by the Asyncio loop. We disable
# the background thread that is being used by the native
# gRPC iomgr.
grpc_timer_manager_set_threading(False)
_grpc_aio_initialized = False
# gRPC callbaks are executed within the same thread used by the Asyncio
# event loop, as it is being done by the other Asyncio callbacks.
Executor.SetThreadingAll(False)
else:
# TODO(https://github.com/grpc/grpc/issues/22244) we need a the
# grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
# library won't shutdown cleanly.
grpc_init()
def grpc_aio_loop():
"""Returns the one-and-only gRPC Aio event loop."""
return _grpc_aio_loop
def aio_loop_schedule_coroutine(object coro):
"""Thread-safely schedules coroutine to gRPC Aio event loop.
If invoked within the same thread as the event loop, return an
Asyncio.Task. Otherwise, return a concurrent.futures.Future (the sync
Future). For non-asyncio threads, sync Future objects are probably easier
to handle (without worrying other thread-safety stuff).
"""
if _event_loop_thread_ident != threading.current_thread().ident:
return asyncio.run_coroutine_threadsafe(coro, _grpc_aio_loop)
else:
return _grpc_aio_loop.create_task(coro)
def aio_loop_call_soon_threadsafe(object func, *args):
# TODO(lidiz) After we are confident, we can drop this assert. Otherwsie,
# we should limit this function to non-grpc-event-loop thread.
assert _event_loop_thread_ident != threading.current_thread().ident
return _grpc_aio_loop.call_soon_threadsafe(func, *args)

@ -188,6 +188,7 @@ cdef void asyncio_timer_start(grpc_custom_timer* grpc_timer) with gil:
cdef void asyncio_timer_stop(grpc_custom_timer* grpc_timer) with gil:
# TODO(https://github.com/grpc/grpc/issues/22278) remove this if condition
if grpc_timer.timer == NULL:
return
else:

@ -24,10 +24,14 @@ cdef class _AsyncioSocket:
object _task_read
object _task_write
object _task_connect
object _task_listen
char * _read_buffer
# Caches the picked event loop, so we can avoid the 30ns overhead each
# time we need access to the event loop.
object _loop
# TODO(lidiz) Drop after 3.6 deprecation. Python 3.7 introduces methods
# like `is_closing()` to help graceful shutdown.
bint _closed
# Client-side attributes
grpc_custom_connect_callback _grpc_connect_cb

@ -31,10 +31,12 @@ cdef class _AsyncioSocket:
self._task_connect = None
self._task_read = None
self._task_write = None
self._task_listen = None
self._read_buffer = NULL
self._server = None
self._py_socket = None
self._peername = None
self._closed = False
@staticmethod
cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket,
@ -159,8 +161,14 @@ cdef class _AsyncioSocket:
return self._reader and not self._reader._transport.is_closing()
cdef void close(self):
if self._closed:
return
else:
self._closed = True
if self.is_connected():
self._writer.close()
if self._task_listen and not self._task_listen.done():
self._task_listen.close()
if self._server:
self._server.close()
# NOTE(lidiz) If the asyncio.Server is created from a Python socket,
@ -170,6 +178,10 @@ cdef class _AsyncioSocket:
self._py_socket.close()
def _new_connection_callback(self, object reader, object writer):
# If the socket is closed, stop.
if self._closed:
return
# Close the connection if server is not started yet.
if self._grpc_accept_cb == NULL:
writer.close()
@ -197,7 +209,7 @@ cdef class _AsyncioSocket:
sock=self._py_socket,
)
grpc_aio_loop().create_task(create_asyncio_server())
self._task_listen = grpc_aio_loop().create_task(create_asyncio_server())
cdef accept(self,
grpc_custom_socket* grpc_socket_client,

@ -51,7 +51,7 @@ cdef enum AioServerStatus:
cdef class AioServer:
cdef Server _server
cdef CallbackCompletionQueue _cq
cdef BaseCompletionQueue _cq
cdef list _generic_handlers
cdef AioServerStatus _status
cdef object _loop # asyncio.EventLoop

@ -613,7 +613,7 @@ cdef class AioServer:
# NOTE(lidiz) Core objects won't be deallocated automatically.
# If AioServer.shutdown is not called, those objects will leak.
self._server = Server(options)
self._cq = CallbackCompletionQueue()
self._cq = create_completion_queue()
grpc_server_register_completion_queue(
self._server.c_server,
self._cq.c_ptr(),
@ -736,7 +736,7 @@ cdef class AioServer:
# The shutdown callback won't be called until there is no live RPC.
grpc_server_shutdown_and_notify(
self._server.c_server,
self._cq._cq,
self._cq.c_ptr(),
self._shutdown_callback_wrapper.c_functor())
# Ensures the serving task (coroutine) exits.

@ -45,6 +45,7 @@ IF UNAME_SYSNAME != "Windows":
include "_cygrpc/aio/iomgr/socket.pxd.pxi"
include "_cygrpc/aio/iomgr/timer.pxd.pxi"
include "_cygrpc/aio/iomgr/resolver.pxd.pxi"
include "_cygrpc/aio/completion_queue.pxd.pxi"
include "_cygrpc/aio/rpc_status.pxd.pxi"
include "_cygrpc/aio/grpc_aio.pxd.pxi"
include "_cygrpc/aio/callback_common.pxd.pxi"

@ -20,6 +20,7 @@ import os
import sys
import threading
import time
import enum
import grpc
@ -71,6 +72,7 @@ include "_cygrpc/aio/iomgr/timer.pyx.pxi"
include "_cygrpc/aio/iomgr/resolver.pyx.pxi"
include "_cygrpc/aio/common.pyx.pxi"
include "_cygrpc/aio/rpc_status.pyx.pxi"
include "_cygrpc/aio/completion_queue.pyx.pxi"
include "_cygrpc/aio/callback_common.pyx.pxi"
include "_cygrpc/aio/grpc_aio.pyx.pxi"
include "_cygrpc/aio/call.pyx.pxi"

@ -162,7 +162,7 @@ class Server(_base_server.Server):
be safe to slightly extend the underlying Cython object's life span.
"""
if hasattr(self, '_server'):
self._loop.create_task(self._server.shutdown(None))
cygrpc.aio_loop_schedule_coroutine(self._server.shutdown(None))
def server(migration_thread_pool: Optional[Executor] = None,

@ -348,11 +348,10 @@ class TestServer(AioTestBase):
await self._server.stop(test_constants.SHORT_TIMEOUT)
with self.assertRaises(grpc.RpcError) as exception_context:
with self.assertRaises(aio.AioRpcError) as exception_context:
await call
self.assertEqual(grpc.StatusCode.UNAVAILABLE,
exception_context.exception.code())
self.assertIn('GOAWAY', exception_context.exception.details())
async def test_concurrent_graceful_shutdown(self):
call = self._channel.unary_unary(_BLOCK_BRIEFLY)(_REQUEST)
@ -384,21 +383,18 @@ class TestServer(AioTestBase):
self._server.stop(test_constants.LONG_TIMEOUT),
)
with self.assertRaises(grpc.RpcError) as exception_context:
with self.assertRaises(aio.AioRpcError) as exception_context:
await call
self.assertEqual(grpc.StatusCode.UNAVAILABLE,
exception_context.exception.code())
self.assertIn('GOAWAY', exception_context.exception.details())
@unittest.skip('https://github.com/grpc/grpc/issues/20818')
async def test_shutdown_before_call(self):
server_target, server, _ = _start_test_server()
await server.stop(None)
await self._server.stop(None)
# Ensures the server is cleaned up at this point.
# Some proper exception should be raised.
async with aio.insecure_channel('localhost:%d' % port) as channel:
await channel.unary_unary(_SIMPLE_UNARY_UNARY)(_REQUEST)
with self.assertRaises(aio.AioRpcError):
await self._channel.unary_unary(_SIMPLE_UNARY_UNARY)(_REQUEST)
async def test_unimplemented(self):
call = self._channel.unary_unary(_UNIMPLEMENTED_METHOD)

@ -89,3 +89,5 @@ build:basicprof --copt=-DGRPC_BASIC_PROFILER
build:basicprof --copt=-DGRPC_TIMERS_RDTSC
build:python_single_threaded_unary_stream --test_env="GRPC_SINGLE_THREADED_UNARY_STREAM=true"
build:python_poller_engine --test_env="GRPC_ASYNCIO_ENGINE=poller"

@ -28,6 +28,7 @@ TEST_TARGETS="//src/python/... //examples/python/..."
BAZEL_FLAGS="--spawn_strategy=standalone --genrule_strategy=standalone --test_output=errors"
bazel test ${BAZEL_FLAGS} ${TEST_TARGETS}
bazel test --config=python_single_threaded_unary_stream ${BAZEL_FLAGS} ${TEST_TARGETS}
bazel test --config=python_poller_engine ${BAZEL_FLAGS} ${TEST_TARGETS}
# TODO(https://github.com/grpc/grpc/issues/19854): Move this to a new Kokoro

Loading…
Cancel
Save