From 9f5dbf70dcf089fe1bdd7a246e64e217f8f29ec0 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 24 Oct 2019 18:38:20 -0700 Subject: [PATCH 01/14] Add shutdown test --- .../grpc/_cython/_cygrpc/aio/server.pxd.pxi | 2 + .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 82 +++++++++++++++---- .../grpc/_cython/_cygrpc/server.pyx.pxi | 32 +++++--- .../grpcio/grpc/experimental/aio/_server.py | 64 ++++++++------- .../tests_aio/unit/server_test.py | 10 +++ 5 files changed, 137 insertions(+), 53 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi index 1906463d088..99a1af09792 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -30,6 +30,7 @@ cdef enum AioServerStatus: AIO_SERVER_STATUS_READY AIO_SERVER_STATUS_RUNNING AIO_SERVER_STATUS_STOPPED + AIO_SERVER_STATUS_STOPPING cdef class _CallbackCompletionQueue: @@ -42,3 +43,4 @@ cdef class AioServer: cdef _CallbackCompletionQueue _cq cdef list _generic_handlers cdef AioServerStatus _status + cdef object _loop diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index dd6ff8b29d8..4a1c5aec48c 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +_LOGGER = logging.getLogger(__name__) + cdef class _HandlerCallDetails: def __cinit__(self, str method, tuple invocation_metadata): self.method = method @@ -186,10 +188,10 @@ async def _server_call_request_call(Server server, return rpc_state -async def _server_main_loop(Server server, +async def _server_main_loop(object loop, + Server server, _CallbackCompletionQueue cq, list generic_handlers): - cdef object loop = asyncio.get_event_loop() cdef RPCState rpc_state while True: @@ -201,11 +203,12 @@ async def _server_main_loop(Server server, loop.create_task(_handle_rpc(generic_handlers, rpc_state, loop)) -async def _server_start(Server server, +async def _server_start(object loop, + Server server, _CallbackCompletionQueue cq, list generic_handlers): - server.start() - await _server_main_loop(server, cq, generic_handlers) + server.start(backup_queue=False) + await _server_main_loop(loop, server, cq, generic_handlers) cdef class _CallbackCompletionQueue: @@ -222,17 +225,18 @@ cdef class _CallbackCompletionQueue: cdef class AioServer: - def __init__(self, thread_pool, generic_handlers, interceptors, options, - maximum_concurrent_rpcs, compression): + def __init__(self, loop, thread_pool, generic_handlers, interceptors, + options, maximum_concurrent_rpcs, compression): + self._loop = loop self._server = Server(options) self._cq = _CallbackCompletionQueue() - self._status = AIO_SERVER_STATUS_READY - self._generic_handlers = [] grpc_server_register_completion_queue( self._server.c_server, self._cq.c_ptr(), NULL ) + self._status = AIO_SERVER_STATUS_READY + self._generic_handlers = [] self.add_generic_rpc_handlers(generic_handlers) if interceptors: @@ -262,14 +266,62 @@ cdef class AioServer: raise RuntimeError('Server not in ready state') self._status = AIO_SERVER_STATUS_RUNNING - loop = asyncio.get_event_loop() - loop.create_task(_server_start( + self._loop.create_task(_server_start( + self._loop, self._server, self._cq, self._generic_handlers, )) - # TODO(https://github.com/grpc/grpc/issues/20668) - # Implement Destruction Methods for AsyncIO Server - def stop(self, unused_grace): - pass + async def shutdown(self, grace): + """Gracefully shutdown the C-Core server. + + Application should only call shutdown once. + + Args: + grace: An optional float indicates the length of grace period in + seconds. + """ + if self._status != AIO_SERVER_STATUS_RUNNING: + # The server either is shutting down, or not started. + return + cdef object shutdown_completed = self._loop.create_future() + cdef CallbackWrapper wrapper = CallbackWrapper(shutdown_completed) + # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed + # when calling "await". This is an over-optimization by Cython. + cpython.Py_INCREF(wrapper) + + # Starts the shutdown process. + # The shutdown callback won't be called unless there is no live RPC. + grpc_server_shutdown_and_notify( + self._server.c_server, + self._cq._cq, + wrapper.c_functor()) + self._server.is_shutting_down = True + self._status = AIO_SERVER_STATUS_STOPPING + + if grace is None: + # Directly cancels all calls + grpc_server_cancel_all_calls(self._server.c_server) + await shutdown_completed + else: + try: + await asyncio.wait_for(shutdown_completed, grace) + except asyncio.TimeoutError: + # Cancels all ongoing calls by the end of grace period. + grpc_server_cancel_all_calls(self._server.c_server) + await shutdown_completed + + # Keeps wrapper object alive until now. + cpython.Py_DECREF(wrapper) + grpc_server_destroy(self._server.c_server) + self._server.c_server = NULL + self._server.is_shutdown = True + self._status = AIO_SERVER_STATUS_STOPPED + + def __dealloc__(self): + if self._status == AIO_SERVER_STATUS_STOPPED: + grpc_completion_queue_shutdown(self._cq._cq) + grpc_completion_queue_destroy(self._cq._cq) + else: + _LOGGER.error('Server is not stopped while deallocation: %d', self._status) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 67b2e9d4e88..7ca332050b5 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -61,16 +61,25 @@ cdef class Server: self.c_server, queue.c_completion_queue, NULL) self.registered_completion_queues.append(queue) - def start(self): + def start(self, backup_queue=True): + """Start the Cython gRPC Server. + + Args: + backup_queue: a bool indicates whether to spawn a backup completion + queue. In case of no CQ is bound to the server, and the shutdown + process of server becomes un-observable. + """ if self.is_started: raise ValueError("the server has already started") - self.backup_shutdown_queue = CompletionQueue(shutdown_cq=True) - self.register_completion_queue(self.backup_shutdown_queue) + if backup_queue: + self.backup_shutdown_queue = CompletionQueue(shutdown_cq=True) + self.register_completion_queue(self.backup_shutdown_queue) self.is_started = True with nogil: grpc_server_start(self.c_server) - # Ensure the core has gotten a chance to do the start-up work - self.backup_shutdown_queue.poll(deadline=time.time()) + if backup_queue: + # Ensure the core has gotten a chance to do the start-up work + self.backup_shutdown_queue.poll(deadline=time.time()) def add_http2_port(self, bytes address, ServerCredentials server_credentials=None): @@ -134,11 +143,14 @@ cdef class Server: elif self.is_shutdown: pass elif not self.is_shutting_down: - # the user didn't call shutdown - use our backup queue - self._c_shutdown(self.backup_shutdown_queue, None) - # and now we wait - while not self.is_shutdown: - self.backup_shutdown_queue.poll() + if self.backup_shutdown_queue is None: + raise RuntimeError('Server shutdown failed: no completion queue.') + else: + # the user didn't call shutdown - use our backup queue + self._c_shutdown(self.backup_shutdown_queue, None) + # and now we wait + while not self.is_shutdown: + self.backup_shutdown_queue.poll() else: # We're in the process of shutting down, but have not shutdown; can't do # much but repeatedly release the GIL and wait diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py index 6bc3d210aed..ca39cef905f 100644 --- a/src/python/grpcio/grpc/experimental/aio/_server.py +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -25,9 +25,17 @@ class Server: def __init__(self, thread_pool, generic_handlers, interceptors, options, maximum_concurrent_rpcs, compression): - self._server = cygrpc.AioServer(thread_pool, generic_handlers, - interceptors, options, - maximum_concurrent_rpcs, compression) + self._loop = asyncio.get_event_loop() + self._server = cygrpc.AioServer( + self._loop, + thread_pool, + generic_handlers, + interceptors, + options, + maximum_concurrent_rpcs, + compression) + self._shutdown_started = False + self._shutdown_future = self._loop.create_future() def add_generic_rpc_handlers( self, @@ -83,35 +91,32 @@ class Server: """ await self._server.start() - def stop(self, grace: Optional[float]) -> asyncio.Event: + async def stop(self, grace: Optional[float]) -> None: """Stops this Server. - "This method immediately stops the server from servicing new RPCs in + This method immediately stops the server from servicing new RPCs in all cases. - If a grace period is specified, this method returns immediately - and all RPCs active at the end of the grace period are aborted. - If a grace period is not specified (by passing None for `grace`), - all existing RPCs are aborted immediately and this method - blocks until the last RPC handler terminates. + If a grace period is specified, all RPCs active at the end of the grace + period are aborted. + + If a grace period is not specified (by passing None for `grace`), all + existing RPCs are aborted immediately and this method blocks until the + last RPC handler terminates. - This method is idempotent and may be called at any time. - Passing a smaller grace value in a subsequent call will have - the effect of stopping the Server sooner (passing None will - have the effect of stopping the server immediately). Passing - a larger grace value in a subsequent call *will not* have the - effect of stopping the server later (i.e. the most restrictive - grace value is used). + Only the first call to "stop" sets the length of grace period. + Additional calls is allowed and will block until the termination of + the server. Args: grace: A duration of time in seconds or None. - - Returns: - A threading.Event that will be set when this Server has completely - stopped, i.e. when running RPCs either complete or are aborted and - all handlers have terminated. """ - raise NotImplementedError() + if self._shutdown_started: + await self._shutdown_future + else: + self._shutdown_started = True + await self._server.shutdown(grace) + self._shutdown_future.set_result(None) async def wait_for_termination(self, timeout: Optional[float] = None) -> bool: @@ -135,11 +140,14 @@ class Server: Returns: A bool indicates if the operation times out. """ - if timeout: - raise NotImplementedError() - # TODO(lidiz) replace this wait forever logic - future = asyncio.get_event_loop().create_future() - await future + if timeout == None: + await self._shutdown_future + else: + try: + await asyncio.wait_for(self._shutdown_future, timeout) + except asyncio.TimeoutError: + return False + return True def server(migration_thread_pool=None, diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py index 937cce9eebb..924dff836f4 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -51,6 +51,16 @@ class TestServer(AioTestBase): self.assertEqual(response, _RESPONSE) self.loop.run_until_complete(test_unary_unary_body()) + + def test_shutdown(self): + + async def test_shutdown_body(): + server = aio.server() + port = server.add_insecure_port('[::]:0') + server.add_generic_rpc_handlers((GenericHandler(),)) + await server.start() + await server.stop(None) + asyncio.get_event_loop().run_until_complete(test_shutdown_body()) if __name__ == '__main__': From 3c43e6330f2044b09224c4c8756fc8c39b71e9aa Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 24 Oct 2019 19:03:42 -0700 Subject: [PATCH 02/14] Add shutdown process for completion queue --- .../grpc/_cython/_cygrpc/aio/server.pxd.pxi | 15 +++++++++++ .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 27 ++++++++++++------- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi index 99a1af09792..c1d5bd8c387 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -25,6 +25,18 @@ cdef class RPCState: cdef bytes method(self) +cdef class CallbackWrapper: + cdef CallbackContext context + cdef object _reference + + @staticmethod + cdef void functor_run( + grpc_experimental_completion_queue_functor* functor, + int succeed) + + cdef grpc_experimental_completion_queue_functor *c_functor(self) + + cdef enum AioServerStatus: AIO_SERVER_STATUS_UNKNOWN AIO_SERVER_STATUS_READY @@ -36,6 +48,9 @@ cdef enum AioServerStatus: cdef class _CallbackCompletionQueue: cdef grpc_completion_queue *_cq cdef grpc_completion_queue* c_ptr(self) + cdef object _shutdown_completed + cdef CallbackWrapper _wrapper + cdef object _loop cdef class AioServer: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 4a1c5aec48c..1ade4fadaf8 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -26,8 +26,6 @@ class _ServicerContextPlaceHolder(object): pass # TODO(https://github.com/grpc/grpc/issues/20669) # Apply this to the client-side cdef class CallbackWrapper: - cdef CallbackContext context - cdef object _reference def __cinit__(self, object future): self.context.functor.functor_run = self.functor_run @@ -213,14 +211,22 @@ async def _server_start(object loop, cdef class _CallbackCompletionQueue: - def __cinit__(self): + def __cinit__(self, object loop): + self._loop = loop + self._shutdown_completed = loop.create_future() + self._wrapper = CallbackWrapper(self._shutdown_completed) self._cq = grpc_completion_queue_create_for_callback( - NULL, + self._wrapper.c_functor(), NULL ) cdef grpc_completion_queue* c_ptr(self): return self._cq + + async def shutdown(self): + grpc_completion_queue_shutdown(self._cq) + await self._shutdown_completed + grpc_completion_queue_destroy(self._cq) cdef class AioServer: @@ -228,13 +234,16 @@ cdef class AioServer: def __init__(self, loop, thread_pool, generic_handlers, interceptors, options, maximum_concurrent_rpcs, compression): self._loop = loop + + # C-Core objects won't be deallocated automatically. self._server = Server(options) - self._cq = _CallbackCompletionQueue() + self._cq = _CallbackCompletionQueue(loop) grpc_server_register_completion_queue( self._server.c_server, self._cq.c_ptr(), NULL ) + self._status = AIO_SERVER_STATUS_READY self._generic_handlers = [] self.add_generic_rpc_handlers(generic_handlers) @@ -319,9 +328,9 @@ cdef class AioServer: self._server.is_shutdown = True self._status = AIO_SERVER_STATUS_STOPPED + # Shuts down the completion queue + await self._cq.shutdown() + def __dealloc__(self): - if self._status == AIO_SERVER_STATUS_STOPPED: - grpc_completion_queue_shutdown(self._cq._cq) - grpc_completion_queue_destroy(self._cq._cq) - else: + if self._status != AIO_SERVER_STATUS_STOPPED: _LOGGER.error('Server is not stopped while deallocation: %d', self._status) From 980bcaf07609a0798eecea6865725c9f66339b42 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Fri, 25 Oct 2019 14:43:55 -0700 Subject: [PATCH 03/14] Add failure handling mechanism to CallbackWrapper --- .../_cygrpc/aio/callbackcontext.pxd.pxi | 4 +- .../grpc/_cython/_cygrpc/aio/server.pxd.pxi | 10 +- .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 135 +++++++++++++----- 3 files changed, 105 insertions(+), 44 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi index 8e52c856dd2..38b7e9b3aed 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi @@ -16,5 +16,5 @@ cimport cpython cdef struct CallbackContext: grpc_experimental_completion_queue_functor functor - cpython.PyObject *waiter - + cpython.PyObject *waiter # asyncio.Future + cpython.PyObject *failure_handler # cygrpc.CallbackFailureHandler diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi index c1d5bd8c387..0a3c2e4d9af 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -27,7 +27,8 @@ cdef class RPCState: cdef class CallbackWrapper: cdef CallbackContext context - cdef object _reference + cdef object _reference_of_future + cdef object _reference_of_failure_handler @staticmethod cdef void functor_run( @@ -48,9 +49,9 @@ cdef enum AioServerStatus: cdef class _CallbackCompletionQueue: cdef grpc_completion_queue *_cq cdef grpc_completion_queue* c_ptr(self) - cdef object _shutdown_completed + cdef object _shutdown_completed # asyncio.Future cdef CallbackWrapper _wrapper - cdef object _loop + cdef object _loop # asyncio.EventLoop cdef class AioServer: @@ -58,4 +59,5 @@ cdef class AioServer: cdef _CallbackCompletionQueue _cq cdef list _generic_handlers cdef AioServerStatus _status - cdef object _loop + cdef object _loop # asyncio.EventLoop + cdef object _serving_task # asyncio.Task diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 1ade4fadaf8..f198dbee9f3 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -14,6 +14,7 @@ _LOGGER = logging.getLogger(__name__) + cdef class _HandlerCallDetails: def __cinit__(self, str method, tuple invocation_metadata): self.method = method @@ -23,22 +24,60 @@ cdef class _HandlerCallDetails: class _ServicerContextPlaceHolder(object): pass +cdef class CallbackFailureHandler: + cdef str _c_core_api + cdef object _error_details + cdef object _exception_type + cdef object _callback # Callable[[Future], None] + + def __cinit__(self, + str c_core_api="", + object error_details="UNKNOWN", + object exception_type=RuntimeError, + object callback=None): + """Handles failure by raising exception or execute a callbcak. + + The callback accepts a future, returns nothing. The callback is + expected to finish the future either "set_result" or "set_exception". + """ + if callback is None: + self._c_core_api = c_core_api + self._error_details = error_details + self._exception_type = exception_type + self._callback = self._raise_exception + else: + self._callback = callback + + def _raise_exception(self, object future): + future.set_exception(self._exception_type( + 'Failed "%s": %s' % (self._c_core_api, self._error_details) + )) + + cdef handle(self, object future): + self._callback(future) + + # TODO(https://github.com/grpc/grpc/issues/20669) # Apply this to the client-side cdef class CallbackWrapper: - def __cinit__(self, object future): + def __cinit__(self, object future, CallbackFailureHandler failure_handler): self.context.functor.functor_run = self.functor_run - self.context.waiter = (future) - self._reference = future + self.context.waiter = future + self.context.failure_handler = failure_handler + # NOTE(lidiz) Not using a list here, because this class is critical in + # data path. We should make it as efficient as possible. + self._reference_of_future = future + self._reference_of_failure_handler = failure_handler @staticmethod cdef void functor_run( grpc_experimental_completion_queue_functor* functor, int success): cdef CallbackContext *context = functor - if success == 0: - (context.waiter).set_exception(RuntimeError()) + if succeed == 0: + (context.failure_handler).handle( + context.waiter) else: (context.waiter).set_result(None) @@ -85,7 +124,9 @@ async def callback_start_batch(RPCState rpc_state, batch_operation_tag.prepare() cdef object future = loop.create_future() - cdef CallbackWrapper wrapper = CallbackWrapper(future) + cdef CallbackWrapper wrapper = CallbackWrapper( + future, + CallbackFailureHandler('callback_start_batch', operations)) # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed # when calling "await". This is an over-optimization by Cython. cpython.Py_INCREF(wrapper) @@ -162,13 +203,21 @@ async def _handle_rpc(list generic_handlers, RPCState rpc_state, object loop): ) +def _FINISH_FUTURE(future): + future.set_result(None) + +cdef CallbackFailureHandler IGNORE_FAILURE = CallbackFailureHandler(callback=_FINISH_FUTURE) + + async def _server_call_request_call(Server server, _CallbackCompletionQueue cq, object loop): cdef grpc_call_error error cdef RPCState rpc_state = RPCState() cdef object future = loop.create_future() - cdef CallbackWrapper wrapper = CallbackWrapper(future) + cdef CallbackWrapper wrapper = CallbackWrapper( + future, + IGNORE_FAILURE) # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed # when calling "await". This is an over-optimization by Cython. cpython.Py_INCREF(wrapper) @@ -186,27 +235,7 @@ async def _server_call_request_call(Server server, return rpc_state -async def _server_main_loop(object loop, - Server server, - _CallbackCompletionQueue cq, - list generic_handlers): - cdef RPCState rpc_state - - while True: - rpc_state = await _server_call_request_call( - server, - cq, - loop) - - loop.create_task(_handle_rpc(generic_handlers, rpc_state, loop)) - - -async def _server_start(object loop, - Server server, - _CallbackCompletionQueue cq, - list generic_handlers): - server.start(backup_queue=False) - await _server_main_loop(loop, server, cq, generic_handlers) +cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler('grpc_completion_queue_shutdown') cdef class _CallbackCompletionQueue: @@ -214,7 +243,9 @@ cdef class _CallbackCompletionQueue: def __cinit__(self, object loop): self._loop = loop self._shutdown_completed = loop.create_future() - self._wrapper = CallbackWrapper(self._shutdown_completed) + self._wrapper = CallbackWrapper( + self._shutdown_completed, + CQ_SHUTDOWN_FAILURE_HANDLER) self._cq = grpc_completion_queue_create_for_callback( self._wrapper.c_functor(), NULL @@ -229,12 +260,13 @@ cdef class _CallbackCompletionQueue: grpc_completion_queue_destroy(self._cq) +cdef CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler('grpc_server_shutdown_and_notify') + + cdef class AioServer: def __init__(self, loop, thread_pool, generic_handlers, interceptors, options, maximum_concurrent_rpcs, compression): - self._loop = loop - # C-Core objects won't be deallocated automatically. self._server = Server(options) self._cq = _CallbackCompletionQueue(loop) @@ -244,9 +276,11 @@ cdef class AioServer: NULL ) + self._loop = loop self._status = AIO_SERVER_STATUS_READY self._generic_handlers = [] self.add_generic_rpc_handlers(generic_handlers) + self._serving_task = None if interceptors: raise NotImplementedError() @@ -268,6 +302,27 @@ cdef class AioServer: return self._server.add_http2_port(address, server_credentials._credentials) + async def _server_main_loop(self, + object server_started): + self._server.start(backup_queue=False) + server_started.set_result(True) + cdef RPCState rpc_state + + while True: + # When shutdown process starts, no more new connections. + if self._status != AIO_SERVER_STATUS_RUNNING: + break + + rpc_state = await _server_call_request_call( + self._server, + self._cq, + self._loop) + + self._loop.create_task(_handle_rpc( + self._generic_handlers, + rpc_state, + self._loop)) + async def start(self): if self._status == AIO_SERVER_STATUS_RUNNING: return @@ -275,12 +330,11 @@ cdef class AioServer: raise RuntimeError('Server not in ready state') self._status = AIO_SERVER_STATUS_RUNNING - self._loop.create_task(_server_start( - self._loop, - self._server, - self._cq, - self._generic_handlers, - )) + cdef object server_started = self._loop.create_future() + self._serving_task = self._loop.create_task(self._server_main_loop(server_started)) + # Needs to explicitly wait for the server to start up. + # Otherwise, the actual start time of the server is un-controllable. + await server_started async def shutdown(self, grace): """Gracefully shutdown the C-Core server. @@ -295,7 +349,9 @@ cdef class AioServer: # The server either is shutting down, or not started. return cdef object shutdown_completed = self._loop.create_future() - cdef CallbackWrapper wrapper = CallbackWrapper(shutdown_completed) + cdef CallbackWrapper wrapper = CallbackWrapper( + shutdown_completed, + SERVER_SHUTDOWN_FAILURE_HANDLER) # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed # when calling "await". This is an over-optimization by Cython. cpython.Py_INCREF(wrapper) @@ -309,6 +365,9 @@ cdef class AioServer: self._server.is_shutting_down = True self._status = AIO_SERVER_STATUS_STOPPING + # Ensures the serving task (coroutine) exits normally + await self._serving_task + if grace is None: # Directly cancels all calls grpc_server_cancel_all_calls(self._server.c_server) From a978449c3ff43e1c2b5b97d558e1e783022ba01f Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Fri, 25 Oct 2019 15:23:01 -0700 Subject: [PATCH 04/14] Correctly propagate and catch request_call failure --- .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index f198dbee9f3..7f17a695017 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -203,10 +203,10 @@ async def _handle_rpc(list generic_handlers, RPCState rpc_state, object loop): ) -def _FINISH_FUTURE(future): - future.set_result(None) +class _RequestCallError(Exception): pass -cdef CallbackFailureHandler IGNORE_FAILURE = CallbackFailureHandler(callback=_FINISH_FUTURE) +cdef CallbackFailureHandler REQUEST_CALL_FAILURE_HANDLER = CallbackFailureHandler( + 'grpc_server_request_call', 'server shutdown', _RequestCallError) async def _server_call_request_call(Server server, @@ -217,7 +217,7 @@ async def _server_call_request_call(Server server, cdef object future = loop.create_future() cdef CallbackWrapper wrapper = CallbackWrapper( future, - IGNORE_FAILURE) + REQUEST_CALL_FAILURE_HANDLER) # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed # when calling "await". This is an over-optimization by Cython. cpython.Py_INCREF(wrapper) @@ -365,8 +365,11 @@ cdef class AioServer: self._server.is_shutting_down = True self._status = AIO_SERVER_STATUS_STOPPING - # Ensures the serving task (coroutine) exits normally - await self._serving_task + # Ensures the serving task (coroutine) exits. + try: + await self._serving_task + except _RequestCallError: + pass if grace is None: # Directly cancels all calls From cfea8c7d2563fdead7bf1403527b95e3a0281caf Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Fri, 25 Oct 2019 16:03:15 -0700 Subject: [PATCH 05/14] Add 3 more shutdown test cases --- .../tests_aio/unit/server_test.py | 68 ++++++++++++++++--- 1 file changed, 59 insertions(+), 9 deletions(-) diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py index 924dff836f4..c1e6eddc81e 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -19,20 +19,28 @@ import grpc from grpc.experimental import aio from tests_aio.unit._test_base import AioTestBase -_TEST_METHOD_PATH = '' +_SIMPLE_UNARY_UNARY = '/test/SimpleUnaryUnary' +_BLOCK_FOREVER = '/test/BlockForever' _REQUEST = b'\x00\x00\x00' _RESPONSE = b'\x01\x01\x01' -async def unary_unary(unused_request, unused_context): +async def _unary_unary(unused_request, unused_context): return _RESPONSE -class GenericHandler(grpc.GenericRpcHandler): +async def _block_forever(unused_request, unused_context): + await asyncio.get_event_loop().create_future() - def service(self, unused_handler_details): - return grpc.unary_unary_rpc_method_handler(unary_unary) + +class _GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_details): + if handler_details.method == _SIMPLE_UNARY_UNARY: + return grpc.unary_unary_rpc_method_handler(_unary_unary) + if handler_details.method == _BLOCK_FOREVER: + return grpc.unary_unary_rpc_method_handler(_block_forever) class TestServer(AioTestBase): @@ -42,11 +50,11 @@ class TestServer(AioTestBase): async def test_unary_unary_body(): server = aio.server() port = server.add_insecure_port('[::]:0') - server.add_generic_rpc_handlers((GenericHandler(),)) + server.add_generic_rpc_handlers((_GenericHandler(),)) await server.start() async with aio.insecure_channel('localhost:%d' % port) as channel: - unary_call = channel.unary_unary(_TEST_METHOD_PATH) + unary_call = channel.unary_unary(_SIMPLE_UNARY_UNARY) response = await unary_call(_REQUEST) self.assertEqual(response, _RESPONSE) @@ -57,10 +65,52 @@ class TestServer(AioTestBase): async def test_shutdown_body(): server = aio.server() port = server.add_insecure_port('[::]:0') - server.add_generic_rpc_handlers((GenericHandler(),)) await server.start() await server.stop(None) - asyncio.get_event_loop().run_until_complete(test_shutdown_body()) + self.loop.run_until_complete(test_shutdown_body()) + + def test_shutdown_after_call(self): + + async def test_shutdown_body(): + server = aio.server() + port = server.add_insecure_port('[::]:0') + server.add_generic_rpc_handlers((_GenericHandler(),)) + await server.start() + + async with aio.insecure_channel('localhost:%d' % port) as channel: + await channel.unary_unary(_SIMPLE_UNARY_UNARY)(_REQUEST) + + await server.stop(None) + self.loop.run_until_complete(test_shutdown_body()) + + def test_shutdown_during_call(self): + + async def test_shutdown_body(): + server = aio.server() + port = server.add_insecure_port('[::]:0') + server.add_generic_rpc_handlers((_GenericHandler(),)) + await server.start() + + async with aio.insecure_channel('localhost:%d' % port) as channel: + self.loop.create_task(channel.unary_unary(_BLOCK_FOREVER)(_REQUEST)) + await asyncio.sleep(0) + + await server.stop(None) + self.loop.run_until_complete(test_shutdown_body()) + + def test_shutdown_before_call(self): + + async def test_shutdown_body(): + server = aio.server() + port = server.add_insecure_port('[::]:0') + server.add_generic_rpc_handlers((_GenericHandler(),)) + await server.start() + await server.stop(None) + + async with aio.insecure_channel('localhost:%d' % port) as channel: + await channel.unary_unary(_SIMPLE_UNARY_UNARY)(_REQUEST) + + self.loop.run_until_complete(test_shutdown_body()) if __name__ == '__main__': From 0a423d05ca03fd8f787e2c441f80e1408ba6a9c6 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Fri, 25 Oct 2019 17:31:58 -0700 Subject: [PATCH 06/14] Add 4 server tests and 1 channel tests * Improve the shutdown process * Refactor the AioRpcError --- .../_cython/_cygrpc/aio/iomgr/socket.pyx.pxi | 2 + .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 4 +- .../grpcio/grpc/experimental/aio/__init__.py | 5 + .../tests_aio/unit/channel_test.py | 27 ++++- .../grpcio_tests/tests_aio/unit/init_test.py | 39 +++++++ .../tests_aio/unit/server_test.py | 106 +++++++++++------- 6 files changed, 139 insertions(+), 44 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi index 2d56a568348..7c5c3fa2e9a 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi @@ -152,6 +152,8 @@ cdef class _AsyncioSocket: cdef void close(self): if self.is_connected(): self._writer.close() + if self._server: + self._server.close() def _new_connection_callback(self, object reader, object writer): client_socket = _AsyncioSocket.create( diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 7f17a695017..413abc24b56 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -305,8 +305,8 @@ cdef class AioServer: async def _server_main_loop(self, object server_started): self._server.start(backup_queue=False) - server_started.set_result(True) cdef RPCState rpc_state + server_started.set_result(True) while True: # When shutdown process starts, no more new connections. @@ -377,7 +377,7 @@ cdef class AioServer: await shutdown_completed else: try: - await asyncio.wait_for(shutdown_completed, grace) + await asyncio.wait_for(asyncio.shield(shutdown_completed), grace) except asyncio.TimeoutError: # Cancels all ongoing calls by the end of grace period. grpc_server_cancel_all_calls(self._server.c_server) diff --git a/src/python/grpcio/grpc/experimental/aio/__init__.py b/src/python/grpcio/grpc/experimental/aio/__init__.py index 696db001133..3a736197d65 100644 --- a/src/python/grpcio/grpc/experimental/aio/__init__.py +++ b/src/python/grpcio/grpc/experimental/aio/__init__.py @@ -17,6 +17,11 @@ import abc import six import grpc +<<<<<<< HEAD +======= +from grpc import _common +from grpc._cython import cygrpc +>>>>>>> Add 4 server tests and 1 channel tests from grpc._cython.cygrpc import init_grpc_aio from ._call import AioRpcError diff --git a/src/python/grpcio_tests/tests_aio/unit/channel_test.py b/src/python/grpcio_tests/tests_aio/unit/channel_test.py index 96817c61a6f..a9069d4a5f0 100644 --- a/src/python/grpcio_tests/tests_aio/unit/channel_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/channel_test.py @@ -22,6 +22,9 @@ from tests.unit.framework.common import test_constants from tests_aio.unit._test_server import start_test_server from tests_aio.unit._test_base import AioTestBase +_UNARY_CALL_METHOD = '/grpc.testing.TestService/UnaryCall' +_EMPTY_CALL_METHOD = '/grpc.testing.TestService/EmptyCall' + class TestChannel(AioTestBase): @@ -32,7 +35,7 @@ class TestChannel(AioTestBase): async with aio.insecure_channel(server_target) as channel: hi = channel.unary_unary( - '/grpc.testing.TestService/UnaryCall', + _UNARY_CALL_METHOD, request_serializer=messages_pb2.SimpleRequest. SerializeToString, response_deserializer=messages_pb2.SimpleResponse.FromString @@ -48,7 +51,7 @@ class TestChannel(AioTestBase): channel = aio.insecure_channel(server_target) hi = channel.unary_unary( - '/grpc.testing.TestService/UnaryCall', + _UNARY_CALL_METHOD, request_serializer=messages_pb2.SimpleRequest.SerializeToString, response_deserializer=messages_pb2.SimpleResponse.FromString) response = await hi(messages_pb2.SimpleRequest()) @@ -66,7 +69,7 @@ class TestChannel(AioTestBase): async with aio.insecure_channel(server_target) as channel: empty_call_with_sleep = channel.unary_unary( - "/grpc.testing.TestService/EmptyCall", + _EMPTY_CALL_METHOD, request_serializer=messages_pb2.SimpleRequest. SerializeToString, response_deserializer=messages_pb2.SimpleResponse. @@ -95,6 +98,24 @@ class TestChannel(AioTestBase): self.loop.run_until_complete(coro()) + @unittest.skip('https://github.com/grpc/grpc/issues/20818') + def test_call_to_the_void(self): + + async def coro(): + channel = aio.insecure_channel('0.1.1.1:1111') + hi = channel.unary_unary( + _UNARY_CALL_METHOD, + request_serializer=messages_pb2.SimpleRequest.SerializeToString, + response_deserializer=messages_pb2.SimpleResponse.FromString) + response = await hi(messages_pb2.SimpleRequest()) + + self.assertEqual(type(response), messages_pb2.SimpleResponse) + + await channel.close() + + self.loop.run_until_complete(coro()) + + if __name__ == '__main__': logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests_aio/unit/init_test.py b/src/python/grpcio_tests/tests_aio/unit/init_test.py index 9f5d8bb0d85..300e2d92766 100644 --- a/src/python/grpcio_tests/tests_aio/unit/init_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/init_test.py @@ -19,6 +19,45 @@ from tests_aio.unit._test_server import start_test_server from tests_aio.unit._test_base import AioTestBase +class TestAioRpcError(unittest.TestCase): + _TEST_INITIAL_METADATA = ("initial metadata",) + _TEST_TRAILING_METADATA = ("trailing metadata",) + + def test_attributes(self): + aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, + "details", self._TEST_TRAILING_METADATA) + self.assertEqual(aio_rpc_error.initial_metadata(), + self._TEST_INITIAL_METADATA) + self.assertEqual(aio_rpc_error.code(), grpc.StatusCode.OK) + self.assertEqual(aio_rpc_error.details(), "details") + self.assertEqual(aio_rpc_error.trailing_metadata(), + self._TEST_TRAILING_METADATA) + + def test_class_hierarchy(self): + aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, + "details", self._TEST_TRAILING_METADATA) + + self.assertIsInstance(aio_rpc_error, grpc.RpcError) + + def test_class_attributes(self): + aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, + "details", self._TEST_TRAILING_METADATA) + self.assertEqual(aio_rpc_error.__class__.__name__, "AioRpcError") + self.assertEqual(aio_rpc_error.__class__.__doc__, + aio.AioRpcError.__doc__) + + def test_class_singleton(self): + first_aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, + "details", + self._TEST_TRAILING_METADATA) + second_aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, + "details", + self._TEST_TRAILING_METADATA) + + self.assertIs(first_aio_rpc_error.__class__, + second_aio_rpc_error.__class__) + + class TestInsecureChannel(AioTestBase): def test_insecure_channel(self): diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py index c1e6eddc81e..f6eabc20529 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -18,42 +18,62 @@ import unittest import grpc from grpc.experimental import aio from tests_aio.unit._test_base import AioTestBase +from tests.unit.framework.common import test_constants + _SIMPLE_UNARY_UNARY = '/test/SimpleUnaryUnary' _BLOCK_FOREVER = '/test/BlockForever' +_BLOCK_SHORTLY = '/test/BlockShortly' _REQUEST = b'\x00\x00\x00' _RESPONSE = b'\x01\x01\x01' -async def _unary_unary(unused_request, unused_context): - return _RESPONSE +class _GenericHandler(grpc.GenericRpcHandler): + def __init__(self): + self._called = asyncio.get_event_loop().create_future() + @staticmethod + async def _unary_unary(unused_request, unused_context): + return _RESPONSE -async def _block_forever(unused_request, unused_context): - await asyncio.get_event_loop().create_future() + async def _block_forever(self, unused_request, unused_context): + await asyncio.get_event_loop().create_future() -class _GenericHandler(grpc.GenericRpcHandler): + async def _block_shortly(self, unused_request, unused_context): + await asyncio.sleep(test_constants.SHORT_TIMEOUT/2) + return _RESPONSE def service(self, handler_details): + self._called.set_result(None) if handler_details.method == _SIMPLE_UNARY_UNARY: - return grpc.unary_unary_rpc_method_handler(_unary_unary) + return grpc.unary_unary_rpc_method_handler(self._unary_unary) if handler_details.method == _BLOCK_FOREVER: - return grpc.unary_unary_rpc_method_handler(_block_forever) + return grpc.unary_unary_rpc_method_handler(self._block_forever) + if handler_details.method == _BLOCK_SHORTLY: + return grpc.unary_unary_rpc_method_handler(self._block_shortly) + + async def wait_for_call(self): + await self._called + + +async def _start_test_server(): + server = aio.server() + port = server.add_insecure_port('[::]:0') + generic_handler = _GenericHandler() + server.add_generic_rpc_handlers((generic_handler,)) + await server.start() + return 'localhost:%d' % port, server, generic_handler class TestServer(AioTestBase): def test_unary_unary(self): - async def test_unary_unary_body(): - server = aio.server() - port = server.add_insecure_port('[::]:0') - server.add_generic_rpc_handlers((_GenericHandler(),)) - await server.start() + server_target, _, _ = await _start_test_server() - async with aio.insecure_channel('localhost:%d' % port) as channel: + async with aio.insecure_channel(server_target) as channel: unary_call = channel.unary_unary(_SIMPLE_UNARY_UNARY) response = await unary_call(_REQUEST) self.assertEqual(response, _RESPONSE) @@ -61,55 +81,63 @@ class TestServer(AioTestBase): self.loop.run_until_complete(test_unary_unary_body()) def test_shutdown(self): - async def test_shutdown_body(): - server = aio.server() - port = server.add_insecure_port('[::]:0') - await server.start() + _, server, _ = await _start_test_server() await server.stop(None) self.loop.run_until_complete(test_shutdown_body()) def test_shutdown_after_call(self): - async def test_shutdown_body(): - server = aio.server() - port = server.add_insecure_port('[::]:0') - server.add_generic_rpc_handlers((_GenericHandler(),)) - await server.start() + server_target, server, _ = await _start_test_server() - async with aio.insecure_channel('localhost:%d' % port) as channel: + async with aio.insecure_channel(server_target) as channel: await channel.unary_unary(_SIMPLE_UNARY_UNARY)(_REQUEST) await server.stop(None) self.loop.run_until_complete(test_shutdown_body()) - def test_shutdown_during_call(self): + def test_graceful_shutdown_success(self): + async def test_graceful_shutdown_success_body(): + server_target, server, generic_handler = await _start_test_server() - async def test_shutdown_body(): - server = aio.server() - port = server.add_insecure_port('[::]:0') - server.add_generic_rpc_handlers((_GenericHandler(),)) - await server.start() + channel = aio.insecure_channel(server_target) + call_task = self.loop.create_task(channel.unary_unary(_BLOCK_SHORTLY)(_REQUEST)) + await generic_handler.wait_for_call() - async with aio.insecure_channel('localhost:%d' % port) as channel: - self.loop.create_task(channel.unary_unary(_BLOCK_FOREVER)(_REQUEST)) - await asyncio.sleep(0) + await server.stop(test_constants.SHORT_TIMEOUT) + await channel.close() + self.assertEqual(await call_task, _RESPONSE) + self.assertTrue(call_task.done()) + self.loop.run_until_complete(test_graceful_shutdown_success_body()) - await server.stop(None) - self.loop.run_until_complete(test_shutdown_body()) + def test_graceful_shutdown_failed(self): + async def test_graceful_shutdown_failed_body(): + server_target, server, generic_handler = await _start_test_server() + channel = aio.insecure_channel(server_target) + call_task = self.loop.create_task(channel.unary_unary(_BLOCK_FOREVER)(_REQUEST)) + await generic_handler.wait_for_call() + + await server.stop(test_constants.SHORT_TIMEOUT) + + with self.assertRaises(aio.AioRpcError) as exception_context: + await call_task + self.assertEqual(exception_context.exception.code(), grpc.StatusCode.UNAVAILABLE) + self.assertIn('GOAWAY', exception_context.exception.details()) + await channel.close() + self.loop.run_until_complete(test_graceful_shutdown_failed_body()) + + @unittest.skip('https://github.com/grpc/grpc/issues/20818') def test_shutdown_before_call(self): async def test_shutdown_body(): - server = aio.server() - port = server.add_insecure_port('[::]:0') - server.add_generic_rpc_handlers((_GenericHandler(),)) - await server.start() + server_target, server, _ =_start_test_server() await server.stop(None) + # Ensures the server is cleaned up at this point. + # Some proper exception should be raised. async with aio.insecure_channel('localhost:%d' % port) as channel: await channel.unary_unary(_SIMPLE_UNARY_UNARY)(_REQUEST) - self.loop.run_until_complete(test_shutdown_body()) From 983e36d5a3299fe6e7b2dd5e4b714441d072f7b0 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Sun, 27 Oct 2019 19:23:05 -0700 Subject: [PATCH 07/14] Make YAPF and PyLint happy --- .../grpcio/grpc/experimental/aio/_server.py | 15 ++++------ .../tests_aio/unit/channel_test.py | 1 - .../tests_aio/unit/server_test.py | 28 +++++++++++++------ 3 files changed, 25 insertions(+), 19 deletions(-) diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py index ca39cef905f..d49fe6644c3 100644 --- a/src/python/grpcio/grpc/experimental/aio/_server.py +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -26,14 +26,9 @@ class Server: def __init__(self, thread_pool, generic_handlers, interceptors, options, maximum_concurrent_rpcs, compression): self._loop = asyncio.get_event_loop() - self._server = cygrpc.AioServer( - self._loop, - thread_pool, - generic_handlers, - interceptors, - options, - maximum_concurrent_rpcs, - compression) + self._server = cygrpc.AioServer(self._loop, thread_pool, + generic_handlers, interceptors, options, + maximum_concurrent_rpcs, compression) self._shutdown_started = False self._shutdown_future = self._loop.create_future() @@ -99,7 +94,7 @@ class Server: If a grace period is specified, all RPCs active at the end of the grace period are aborted. - + If a grace period is not specified (by passing None for `grace`), all existing RPCs are aborted immediately and this method blocks until the last RPC handler terminates. @@ -140,7 +135,7 @@ class Server: Returns: A bool indicates if the operation times out. """ - if timeout == None: + if timeout is None: await self._shutdown_future else: try: diff --git a/src/python/grpcio_tests/tests_aio/unit/channel_test.py b/src/python/grpcio_tests/tests_aio/unit/channel_test.py index a9069d4a5f0..2809f3b4b37 100644 --- a/src/python/grpcio_tests/tests_aio/unit/channel_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/channel_test.py @@ -97,7 +97,6 @@ class TestChannel(AioTestBase): self.loop.run_until_complete(coro()) - @unittest.skip('https://github.com/grpc/grpc/issues/20818') def test_call_to_the_void(self): diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py index f6eabc20529..d8f1c384043 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -20,7 +20,6 @@ from grpc.experimental import aio from tests_aio.unit._test_base import AioTestBase from tests.unit.framework.common import test_constants - _SIMPLE_UNARY_UNARY = '/test/SimpleUnaryUnary' _BLOCK_FOREVER = '/test/BlockForever' _BLOCK_SHORTLY = '/test/BlockShortly' @@ -30,6 +29,7 @@ _RESPONSE = b'\x01\x01\x01' class _GenericHandler(grpc.GenericRpcHandler): + def __init__(self): self._called = asyncio.get_event_loop().create_future() @@ -40,9 +40,8 @@ class _GenericHandler(grpc.GenericRpcHandler): async def _block_forever(self, unused_request, unused_context): await asyncio.get_event_loop().create_future() - async def _block_shortly(self, unused_request, unused_context): - await asyncio.sleep(test_constants.SHORT_TIMEOUT/2) + await asyncio.sleep(test_constants.SHORT_TIMEOUT / 2) return _RESPONSE def service(self, handler_details): @@ -70,6 +69,7 @@ async def _start_test_server(): class TestServer(AioTestBase): def test_unary_unary(self): + async def test_unary_unary_body(): server_target, _, _ = await _start_test_server() @@ -79,14 +79,17 @@ class TestServer(AioTestBase): self.assertEqual(response, _RESPONSE) self.loop.run_until_complete(test_unary_unary_body()) - + def test_shutdown(self): + async def test_shutdown_body(): _, server, _ = await _start_test_server() await server.stop(None) + self.loop.run_until_complete(test_shutdown_body()) def test_shutdown_after_call(self): + async def test_shutdown_body(): server_target, server, _ = await _start_test_server() @@ -94,50 +97,59 @@ class TestServer(AioTestBase): await channel.unary_unary(_SIMPLE_UNARY_UNARY)(_REQUEST) await server.stop(None) + self.loop.run_until_complete(test_shutdown_body()) def test_graceful_shutdown_success(self): + async def test_graceful_shutdown_success_body(): server_target, server, generic_handler = await _start_test_server() channel = aio.insecure_channel(server_target) - call_task = self.loop.create_task(channel.unary_unary(_BLOCK_SHORTLY)(_REQUEST)) + call_task = self.loop.create_task( + channel.unary_unary(_BLOCK_SHORTLY)(_REQUEST)) await generic_handler.wait_for_call() await server.stop(test_constants.SHORT_TIMEOUT) await channel.close() self.assertEqual(await call_task, _RESPONSE) self.assertTrue(call_task.done()) + self.loop.run_until_complete(test_graceful_shutdown_success_body()) def test_graceful_shutdown_failed(self): + async def test_graceful_shutdown_failed_body(): server_target, server, generic_handler = await _start_test_server() channel = aio.insecure_channel(server_target) - call_task = self.loop.create_task(channel.unary_unary(_BLOCK_FOREVER)(_REQUEST)) + call_task = self.loop.create_task( + channel.unary_unary(_BLOCK_FOREVER)(_REQUEST)) await generic_handler.wait_for_call() await server.stop(test_constants.SHORT_TIMEOUT) with self.assertRaises(aio.AioRpcError) as exception_context: await call_task - self.assertEqual(exception_context.exception.code(), grpc.StatusCode.UNAVAILABLE) + self.assertEqual(exception_context.exception.code(), + grpc.StatusCode.UNAVAILABLE) self.assertIn('GOAWAY', exception_context.exception.details()) await channel.close() + self.loop.run_until_complete(test_graceful_shutdown_failed_body()) @unittest.skip('https://github.com/grpc/grpc/issues/20818') def test_shutdown_before_call(self): async def test_shutdown_body(): - server_target, server, _ =_start_test_server() + server_target, server, _ = _start_test_server() await server.stop(None) # Ensures the server is cleaned up at this point. # Some proper exception should be raised. async with aio.insecure_channel('localhost:%d' % port) as channel: await channel.unary_unary(_SIMPLE_UNARY_UNARY)(_REQUEST) + self.loop.run_until_complete(test_shutdown_body()) From 8168b9e1c95c6c8260442a68561a5e45f645959b Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 29 Oct 2019 15:11:03 -0700 Subject: [PATCH 08/14] Adopt reviewer's advice * Make graceful shutdown support calls from multi-coroutine * Added comments * Make graceful shutdown success test case more strict * Add 2 more concurrent graceful shutdown tests --- .../_cygrpc/aio/callbackcontext.pxd.pxi | 13 ++- .../grpc/_cython/_cygrpc/aio/server.pxd.pxi | 3 + .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 89 +++++++++++-------- .../grpc/_cython/_cygrpc/server.pyx.pxi | 4 +- .../grpcio/grpc/experimental/aio/_server.py | 38 +++----- .../tests_aio/unit/channel_test.py | 2 +- .../tests_aio/unit/server_test.py | 79 ++++++++++++++-- 7 files changed, 153 insertions(+), 75 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi index 38b7e9b3aed..beada919f4d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi @@ -15,6 +15,15 @@ cimport cpython cdef struct CallbackContext: + # C struct to store callback context in the form of pointers. + # + # Attributes: + # functor: A grpc_experimental_completion_queue_functor represents the + # callback function in the only way C-Core understands. + # waiter: An asyncio.Future object that fulfills when the callback is + # invoked by C-Core. + # failure_handler: A CallbackFailureHandler object that called when C-Core + # returns 'success == 0' state. grpc_experimental_completion_queue_functor functor - cpython.PyObject *waiter # asyncio.Future - cpython.PyObject *failure_handler # cygrpc.CallbackFailureHandler + cpython.PyObject *waiter + cpython.PyObject *failure_handler diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi index 0a3c2e4d9af..fdd9541f0eb 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -61,3 +61,6 @@ cdef class AioServer: cdef AioServerStatus _status cdef object _loop # asyncio.EventLoop cdef object _serving_task # asyncio.Task + cdef object _shutdown_lock # asyncio.Lock + cdef object _shutdown_completed # asyncio.Future + cdef CallbackWrapper _shutdown_callback_wrapper diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 413abc24b56..6f3098d619f 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +# TODO(https://github.com/grpc/grpc/issues/20850) refactor this. _LOGGER = logging.getLogger(__name__) @@ -282,6 +283,12 @@ cdef class AioServer: self.add_generic_rpc_handlers(generic_handlers) self._serving_task = None + self._shutdown_lock = asyncio.Lock() + self._shutdown_completed = self._loop.create_future() + self._shutdown_callback_wrapper = CallbackWrapper( + self._shutdown_completed, + SERVER_SHUTDOWN_FAILURE_HANDLER) + if interceptors: raise NotImplementedError() if maximum_concurrent_rpcs: @@ -309,7 +316,7 @@ cdef class AioServer: server_started.set_result(True) while True: - # When shutdown process starts, no more new connections. + # When shutdown begins, no more new connections. if self._status != AIO_SERVER_STATUS_RUNNING: break @@ -336,34 +343,14 @@ cdef class AioServer: # Otherwise, the actual start time of the server is un-controllable. await server_started - async def shutdown(self, grace): - """Gracefully shutdown the C-Core server. - - Application should only call shutdown once. - - Args: - grace: An optional float indicates the length of grace period in - seconds. - """ - if self._status != AIO_SERVER_STATUS_RUNNING: - # The server either is shutting down, or not started. - return - cdef object shutdown_completed = self._loop.create_future() - cdef CallbackWrapper wrapper = CallbackWrapper( - shutdown_completed, - SERVER_SHUTDOWN_FAILURE_HANDLER) - # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed - # when calling "await". This is an over-optimization by Cython. - cpython.Py_INCREF(wrapper) - + async def _start_shutting_down(self): + """Prepares the server to shutting down (NOT coroutine-safe).""" # Starts the shutdown process. - # The shutdown callback won't be called unless there is no live RPC. + # The shutdown callback won't be called until there is no live RPC. grpc_server_shutdown_and_notify( self._server.c_server, self._cq._cq, - wrapper.c_functor()) - self._server.is_shutting_down = True - self._status = AIO_SERVER_STATUS_STOPPING + self._shutdown_callback_wrapper.c_functor()) # Ensures the serving task (coroutine) exits. try: @@ -371,28 +358,56 @@ cdef class AioServer: except _RequestCallError: pass + async def shutdown(self, grace): + """Gracefully shutdown the C-Core server. + + Application should only call shutdown once. + + Args: + grace: An optional float indicating the length of grace period in + seconds. + """ + if self._status == AIO_SERVER_STATUS_READY or self._status == AIO_SERVER_STATUS_STOPPED: + return + + async with self._shutdown_lock: + if self._status == AIO_SERVER_STATUS_RUNNING: + await self._start_shutting_down() + self._server.is_shutting_down = True + self._status = AIO_SERVER_STATUS_STOPPING + if grace is None: # Directly cancels all calls grpc_server_cancel_all_calls(self._server.c_server) - await shutdown_completed + await self._shutdown_completed else: try: - await asyncio.wait_for(asyncio.shield(shutdown_completed), grace) + await asyncio.wait_for(asyncio.shield(self._shutdown_completed), grace) except asyncio.TimeoutError: # Cancels all ongoing calls by the end of grace period. grpc_server_cancel_all_calls(self._server.c_server) - await shutdown_completed + await self._shutdown_completed - # Keeps wrapper object alive until now. - cpython.Py_DECREF(wrapper) - grpc_server_destroy(self._server.c_server) - self._server.c_server = NULL - self._server.is_shutdown = True - self._status = AIO_SERVER_STATUS_STOPPED + async with self._shutdown_lock: + if self._status == AIO_SERVER_STATUS_STOPPING: + grpc_server_destroy(self._server.c_server) + self._server.c_server = NULL + self._server.is_shutdown = True + self._status = AIO_SERVER_STATUS_STOPPED - # Shuts down the completion queue - await self._cq.shutdown() + # Shuts down the completion queue + await self._cq.shutdown() + + async def wait_for_termination(self, float timeout): + if timeout is None: + await self._shutdown_completed + else: + try: + await asyncio.wait_for(self._shutdown_completed, timeout) + except asyncio.TimeoutError: + return False + return True def __dealloc__(self): if self._status != AIO_SERVER_STATUS_STOPPED: - _LOGGER.error('Server is not stopped while deallocation: %d', self._status) + _LOGGER.error('__dealloc__ called on running server: %d', self._status) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 7ca332050b5..4ce554d078f 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -66,8 +66,8 @@ cdef class Server: Args: backup_queue: a bool indicates whether to spawn a backup completion - queue. In case of no CQ is bound to the server, and the shutdown - process of server becomes un-observable. + queue. In the case that no CQ is bound to the server, and the shutdown + of server becomes un-observable. """ if self.is_started: raise ValueError("the server has already started") diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py index d49fe6644c3..86d38e6525d 100644 --- a/src/python/grpcio/grpc/experimental/aio/_server.py +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -29,8 +29,6 @@ class Server: self._server = cygrpc.AioServer(self._loop, thread_pool, generic_handlers, interceptors, options, maximum_concurrent_rpcs, compression) - self._shutdown_started = False - self._shutdown_future = self._loop.create_future() def add_generic_rpc_handlers( self, @@ -92,26 +90,23 @@ class Server: This method immediately stops the server from servicing new RPCs in all cases. - If a grace period is specified, all RPCs active at the end of the grace - period are aborted. + If a grace period is specified, this method returns immediately and all + RPCs active at the end of the grace period are aborted. If a grace + period is not specified (by passing None for grace), all existing RPCs + are aborted immediately and this method blocks until the last RPC + handler terminates. - If a grace period is not specified (by passing None for `grace`), all - existing RPCs are aborted immediately and this method blocks until the - last RPC handler terminates. - - Only the first call to "stop" sets the length of grace period. - Additional calls is allowed and will block until the termination of - the server. + This method is idempotent and may be called at any time. Passing a + smaller grace value in a subsequent call will have the effect of + stopping the Server sooner (passing None will have the effect of + stopping the server immediately). Passing a larger grace value in a + subsequent call will not have the effect of stopping the server later + (i.e. the most restrictive grace value is used). Args: grace: A duration of time in seconds or None. """ - if self._shutdown_started: - await self._shutdown_future - else: - self._shutdown_started = True - await self._server.shutdown(grace) - self._shutdown_future.set_result(None) + await self._server.shutdown(grace) async def wait_for_termination(self, timeout: Optional[float] = None) -> bool: @@ -135,14 +130,7 @@ class Server: Returns: A bool indicates if the operation times out. """ - if timeout is None: - await self._shutdown_future - else: - try: - await asyncio.wait_for(self._shutdown_future, timeout) - except asyncio.TimeoutError: - return False - return True + return await self._server.wait_for_termination(timeout) def server(migration_thread_pool=None, diff --git a/src/python/grpcio_tests/tests_aio/unit/channel_test.py b/src/python/grpcio_tests/tests_aio/unit/channel_test.py index 2809f3b4b37..076300786fb 100644 --- a/src/python/grpcio_tests/tests_aio/unit/channel_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/channel_test.py @@ -108,7 +108,7 @@ class TestChannel(AioTestBase): response_deserializer=messages_pb2.SimpleResponse.FromString) response = await hi(messages_pb2.SimpleRequest()) - self.assertEqual(type(response), messages_pb2.SimpleResponse) + self.assertIs(type(response), messages_pb2.SimpleResponse) await channel.close() diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py index d8f1c384043..6c96d478c64 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -14,6 +14,7 @@ import logging import unittest +import time import grpc from grpc.experimental import aio @@ -22,7 +23,7 @@ from tests.unit.framework.common import test_constants _SIMPLE_UNARY_UNARY = '/test/SimpleUnaryUnary' _BLOCK_FOREVER = '/test/BlockForever' -_BLOCK_SHORTLY = '/test/BlockShortly' +_BLOCK_BRIEFLY = '/test/BlockBriefly' _REQUEST = b'\x00\x00\x00' _RESPONSE = b'\x01\x01\x01' @@ -40,7 +41,7 @@ class _GenericHandler(grpc.GenericRpcHandler): async def _block_forever(self, unused_request, unused_context): await asyncio.get_event_loop().create_future() - async def _block_shortly(self, unused_request, unused_context): + async def _BLOCK_BRIEFLY(self, unused_request, unused_context): await asyncio.sleep(test_constants.SHORT_TIMEOUT / 2) return _RESPONSE @@ -50,8 +51,8 @@ class _GenericHandler(grpc.GenericRpcHandler): return grpc.unary_unary_rpc_method_handler(self._unary_unary) if handler_details.method == _BLOCK_FOREVER: return grpc.unary_unary_rpc_method_handler(self._block_forever) - if handler_details.method == _BLOCK_SHORTLY: - return grpc.unary_unary_rpc_method_handler(self._block_shortly) + if handler_details.method == _BLOCK_BRIEFLY: + return grpc.unary_unary_rpc_method_handler(self._BLOCK_BRIEFLY) async def wait_for_call(self): await self._called @@ -87,6 +88,7 @@ class TestServer(AioTestBase): await server.stop(None) self.loop.run_until_complete(test_shutdown_body()) + # Ensures no SIGSEGV triggered, and ends within timeout. def test_shutdown_after_call(self): @@ -107,12 +109,18 @@ class TestServer(AioTestBase): channel = aio.insecure_channel(server_target) call_task = self.loop.create_task( - channel.unary_unary(_BLOCK_SHORTLY)(_REQUEST)) + channel.unary_unary(_BLOCK_BRIEFLY)(_REQUEST)) await generic_handler.wait_for_call() + shutdown_start_time = time.time() await server.stop(test_constants.SHORT_TIMEOUT) + grace_period_length = time.time() - shutdown_start_time + self.assertGreater(grace_period_length, + test_constants.SHORT_TIMEOUT / 3) + + # Validates the states. await channel.close() - self.assertEqual(await call_task, _RESPONSE) + self.assertEqual(_RESPONSE, await call_task) self.assertTrue(call_task.done()) self.loop.run_until_complete(test_graceful_shutdown_success_body()) @@ -131,13 +139,68 @@ class TestServer(AioTestBase): with self.assertRaises(aio.AioRpcError) as exception_context: await call_task - self.assertEqual(exception_context.exception.code(), - grpc.StatusCode.UNAVAILABLE) + self.assertEqual(grpc.StatusCode.UNAVAILABLE, + exception_context.exception.code()) self.assertIn('GOAWAY', exception_context.exception.details()) await channel.close() self.loop.run_until_complete(test_graceful_shutdown_failed_body()) + def test_concurrent_graceful_shutdown(self): + + async def test_concurrent_graceful_shutdown_body(): + server_target, server, generic_handler = await _start_test_server() + + channel = aio.insecure_channel(server_target) + call_task = self.loop.create_task( + channel.unary_unary(_BLOCK_BRIEFLY)(_REQUEST)) + await generic_handler.wait_for_call() + + # Expects the shortest grace period to be effective. + shutdown_start_time = time.time() + await asyncio.gather( + server.stop(test_constants.LONG_TIMEOUT), + server.stop(test_constants.SHORT_TIMEOUT), + server.stop(test_constants.LONG_TIMEOUT), + ) + grace_period_length = time.time() - shutdown_start_time + self.assertGreater(grace_period_length, + test_constants.SHORT_TIMEOUT / 3) + + await channel.close() + self.assertEqual(_RESPONSE, await call_task) + self.assertTrue(call_task.done()) + + self.loop.run_until_complete(test_concurrent_graceful_shutdown_body()) + + def test_concurrent_graceful_shutdown_immediate(self): + + async def test_concurrent_graceful_shutdown_immediate_body(): + server_target, server, generic_handler = await _start_test_server() + + channel = aio.insecure_channel(server_target) + call_task = self.loop.create_task( + channel.unary_unary(_BLOCK_FOREVER)(_REQUEST)) + await generic_handler.wait_for_call() + + # Expects no grace period, due to the "server.stop(None)". + await asyncio.gather( + server.stop(test_constants.LONG_TIMEOUT), + server.stop(None), + server.stop(test_constants.SHORT_TIMEOUT), + server.stop(test_constants.LONG_TIMEOUT), + ) + + with self.assertRaises(aio.AioRpcError) as exception_context: + await call_task + self.assertEqual(grpc.StatusCode.UNAVAILABLE, + exception_context.exception.code()) + self.assertIn('GOAWAY', exception_context.exception.details()) + await channel.close() + + self.loop.run_until_complete( + test_concurrent_graceful_shutdown_immediate_body()) + @unittest.skip('https://github.com/grpc/grpc/issues/20818') def test_shutdown_before_call(self): From 8af510e1dfc0b2c7ba06c535208d9e892936e143 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 29 Oct 2019 18:01:22 -0700 Subject: [PATCH 09/14] Adopt reviewer's advice: 1. Refactor (simplify) the failure handler; 2. Fix a memory leak for badly written application code. --- .../_cython/_cygrpc/aio/iomgr/socket.pyx.pxi | 5 ++ .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 71 ++++++++++--------- .../grpcio/grpc/experimental/aio/_server.py | 8 +++ .../tests_aio/unit/server_test.py | 4 +- 4 files changed, 52 insertions(+), 36 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi index 7c5c3fa2e9a..eed496df7c6 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi @@ -154,6 +154,11 @@ cdef class _AsyncioSocket: self._writer.close() if self._server: self._server.close() + # NOTE(lidiz) If the asyncio.Server is created from a Python socket, + # the server.close() won't release the fd until the close() is called + # for the Python socket. + if self._py_socket: + self._py_socket.close() def _new_connection_callback(self, object reader, object writer): client_socket = _AsyncioSocket.create( diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 6f3098d619f..8b0a4e0a4b3 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -25,44 +25,31 @@ cdef class _HandlerCallDetails: class _ServicerContextPlaceHolder(object): pass -cdef class CallbackFailureHandler: - cdef str _c_core_api +cdef class _CallbackFailureHandler: + cdef str _core_function_name cdef object _error_details cdef object _exception_type - cdef object _callback # Callable[[Future], None] def __cinit__(self, - str c_core_api="", - object error_details="UNKNOWN", - object exception_type=RuntimeError, - object callback=None): - """Handles failure by raising exception or execute a callbcak. - - The callback accepts a future, returns nothing. The callback is - expected to finish the future either "set_result" or "set_exception". - """ - if callback is None: - self._c_core_api = c_core_api - self._error_details = error_details - self._exception_type = exception_type - self._callback = self._raise_exception - else: - self._callback = callback + str core_function_name, + object error_details, + object exception_type): + """Handles failure by raising exception.""" + self._core_function_name = core_function_name + self._error_details = error_details + self._exception_type = exception_type - def _raise_exception(self, object future): + cdef handle(self, object future): future.set_exception(self._exception_type( - 'Failed "%s": %s' % (self._c_core_api, self._error_details) + 'Failed "%s": %s' % (self._core_function_name, self._error_details) )) - cdef handle(self, object future): - self._callback(future) - # TODO(https://github.com/grpc/grpc/issues/20669) # Apply this to the client-side cdef class CallbackWrapper: - def __cinit__(self, object future, CallbackFailureHandler failure_handler): + def __cinit__(self, object future, _CallbackFailureHandler failure_handler): self.context.functor.functor_run = self.functor_run self.context.waiter = future self.context.failure_handler = failure_handler @@ -77,7 +64,7 @@ cdef class CallbackWrapper: int success): cdef CallbackContext *context = functor if succeed == 0: - (context.failure_handler).handle( + (<_CallbackFailureHandler>context.failure_handler).handle( context.waiter) else: (context.waiter).set_result(None) @@ -127,7 +114,7 @@ async def callback_start_batch(RPCState rpc_state, cdef object future = loop.create_future() cdef CallbackWrapper wrapper = CallbackWrapper( future, - CallbackFailureHandler('callback_start_batch', operations)) + _CallbackFailureHandler('callback_start_batch', operations, RuntimeError)) # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed # when calling "await". This is an over-optimization by Cython. cpython.Py_INCREF(wrapper) @@ -206,7 +193,7 @@ async def _handle_rpc(list generic_handlers, RPCState rpc_state, object loop): class _RequestCallError(Exception): pass -cdef CallbackFailureHandler REQUEST_CALL_FAILURE_HANDLER = CallbackFailureHandler( +cdef _CallbackFailureHandler REQUEST_CALL_FAILURE_HANDLER = _CallbackFailureHandler( 'grpc_server_request_call', 'server shutdown', _RequestCallError) @@ -236,7 +223,10 @@ async def _server_call_request_call(Server server, return rpc_state -cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler('grpc_completion_queue_shutdown') +cdef _CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = _CallbackFailureHandler( + 'grpc_completion_queue_shutdown', + 'Unknown', + RuntimeError) cdef class _CallbackCompletionQueue: @@ -261,14 +251,18 @@ cdef class _CallbackCompletionQueue: grpc_completion_queue_destroy(self._cq) -cdef CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler('grpc_server_shutdown_and_notify') +cdef _CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = _CallbackFailureHandler( + 'grpc_server_shutdown_and_notify', + 'Unknown', + RuntimeError) cdef class AioServer: def __init__(self, loop, thread_pool, generic_handlers, interceptors, options, maximum_concurrent_rpcs, compression): - # C-Core objects won't be deallocated automatically. + # NOTE(lidiz) Core objects won't be deallocated automatically. + # If AioServer.shutdown is not called, those objects will leak. self._server = Server(options) self._cq = _CallbackCompletionQueue(loop) grpc_server_register_completion_queue( @@ -311,7 +305,7 @@ cdef class AioServer: async def _server_main_loop(self, object server_started): - self._server.start(backup_queue=False) + self._server.start() cdef RPCState rpc_state server_started.set_result(True) @@ -344,8 +338,10 @@ cdef class AioServer: await server_started async def _start_shutting_down(self): - """Prepares the server to shutting down (NOT coroutine-safe).""" - # Starts the shutdown process. + """Prepares the server to shutting down. + + This coroutine function is NOT coroutine-safe. + """ # The shutdown callback won't be called until there is no live RPC. grpc_server_shutdown_and_notify( self._server.c_server, @@ -409,5 +405,10 @@ cdef class AioServer: return True def __dealloc__(self): + """Deallocation of Core objects are ensured by Python grpc.aio.Server. + + If the Cython representation is deallocated without underlying objects + freed, raise an RuntimeError. + """ if self._status != AIO_SERVER_STATUS_STOPPED: - _LOGGER.error('__dealloc__ called on running server: %d', self._status) + raise RuntimeError('__dealloc__ called on running server: %d', self._status) diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py index 86d38e6525d..479951b1ceb 100644 --- a/src/python/grpcio/grpc/experimental/aio/_server.py +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -132,6 +132,14 @@ class Server: """ return await self._server.wait_for_termination(timeout) + def __del__(self): + """Schedules a graceful shutdown in current event loop. + + The Cython AioServer doesn't hold a ref-count to this class. It should + be safe to slightly extend the underlying Cython object's life span. + """ + self._loop.create_task(self._server.shutdown(None)) + def server(migration_thread_pool=None, handlers=None, diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py index 6c96d478c64..776d432bffc 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -15,6 +15,7 @@ import logging import unittest import time +import gc import grpc from grpc.experimental import aio @@ -72,7 +73,8 @@ class TestServer(AioTestBase): def test_unary_unary(self): async def test_unary_unary_body(): - server_target, _, _ = await _start_test_server() + result = await _start_test_server() + server_target = result[0] async with aio.insecure_channel(server_target) as channel: unary_call = channel.unary_unary(_SIMPLE_UNARY_UNARY) From f8dc1d9a8e7b93a05dbc812f3a3f2c3b21747d87 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 30 Oct 2019 10:58:25 -0700 Subject: [PATCH 10/14] Make PyLint happy --- src/python/grpcio/grpc/experimental/aio/_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py index 479951b1ceb..8b53fdd0d03 100644 --- a/src/python/grpcio/grpc/experimental/aio/_server.py +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -134,7 +134,7 @@ class Server: def __del__(self): """Schedules a graceful shutdown in current event loop. - + The Cython AioServer doesn't hold a ref-count to this class. It should be safe to slightly extend the underlying Cython object's life span. """ From 9289d34df0fdcfed239899e0ebc840be18383015 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 31 Oct 2019 13:21:44 -0700 Subject: [PATCH 11/14] Propagate unexpected error to application --- .../grpc/_cython/_cygrpc/aio/server.pxd.pxi | 1 + .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 17 ++++++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi index fdd9541f0eb..ca4a6a837ea 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -64,3 +64,4 @@ cdef class AioServer: cdef object _shutdown_lock # asyncio.Lock cdef object _shutdown_completed # asyncio.Future cdef CallbackWrapper _shutdown_callback_wrapper + cdef object _crash_exception # Exception diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 8b0a4e0a4b3..0dca070e933 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -282,6 +282,7 @@ cdef class AioServer: self._shutdown_callback_wrapper = CallbackWrapper( self._shutdown_completed, SERVER_SHUTDOWN_FAILURE_HANDLER) + self._crash_exception = None if interceptors: raise NotImplementedError() @@ -324,6 +325,15 @@ cdef class AioServer: rpc_state, self._loop)) + def _serving_task_crash_handler(self, object task): + """Shutdown the server immediately if unexpectedly exited.""" + if task.exception() is None: + return + if self._status != AIO_SERVER_STATUS_STOPPING: + self._crash_exception = task.exception() + _LOGGER.exception(self._crash_exception) + self._loop.create_task(self.shutdown(None)) + async def start(self): if self._status == AIO_SERVER_STATUS_RUNNING: return @@ -333,6 +343,7 @@ cdef class AioServer: self._status = AIO_SERVER_STATUS_RUNNING cdef object server_started = self._loop.create_future() self._serving_task = self._loop.create_task(self._server_main_loop(server_started)) + self._serving_task.add_done_callback(self._serving_task_crash_handler) # Needs to explicitly wait for the server to start up. # Otherwise, the actual start time of the server is un-controllable. await server_started @@ -368,9 +379,9 @@ cdef class AioServer: async with self._shutdown_lock: if self._status == AIO_SERVER_STATUS_RUNNING: - await self._start_shutting_down() self._server.is_shutting_down = True self._status = AIO_SERVER_STATUS_STOPPING + await self._start_shutting_down() if grace is None: # Directly cancels all calls @@ -401,7 +412,11 @@ cdef class AioServer: try: await asyncio.wait_for(self._shutdown_completed, timeout) except asyncio.TimeoutError: + if self._crash_exception is not None: + raise self._crash_exception return False + if self._crash_exception is not None: + raise self._crash_exception return True def __dealloc__(self): From 9aeefc3691642d744123e579eb038338a3eea5f8 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 31 Oct 2019 13:36:38 -0700 Subject: [PATCH 12/14] Fix missing shielding --- src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 0dca070e933..ef7160a3059 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -410,7 +410,7 @@ cdef class AioServer: await self._shutdown_completed else: try: - await asyncio.wait_for(self._shutdown_completed, timeout) + await asyncio.wait_for(asyncio.shield(self._shutdown_completed), timeout) except asyncio.TimeoutError: if self._crash_exception is not None: raise self._crash_exception From ae59fc3b7ae76f53a94ffe8741dbf100403dfc36 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 4 Nov 2019 11:55:48 -0800 Subject: [PATCH 13/14] Adopt reviewers' advice * Add cancellation handler * Use loop= --- .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 53 ++++++++++++++++--- 1 file changed, 46 insertions(+), 7 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index ef7160a3059..ef4b7f8c51d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -14,6 +14,7 @@ # TODO(https://github.com/grpc/grpc/issues/20850) refactor this. _LOGGER = logging.getLogger(__name__) +cdef int _EMPTY_FLAG = 0 cdef class _HandlerCallDetails: @@ -171,6 +172,9 @@ async def _handle_unary_unary_rpc(object method_handler, await callback_start_batch(rpc_state, send_ops, loop) + + + async def _handle_rpc(list generic_handlers, RPCState rpc_state, object loop): # Finds the method handler (application logic) cdef object method_handler = _find_method_handler( @@ -180,6 +184,7 @@ async def _handle_rpc(list generic_handlers, RPCState rpc_state, object loop): if method_handler is None: # TODO(lidiz) return unimplemented error to client side raise NotImplementedError() + # TODO(lidiz) extend to all 4 types of RPC if method_handler.request_streaming or method_handler.response_streaming: raise NotImplementedError() @@ -223,6 +228,16 @@ async def _server_call_request_call(Server server, return rpc_state +async def _handle_cancellation_from_core(object rpc_task, + RPCState rpc_state, + object loop): + cdef ReceiveCloseOnServerOperation op = ReceiveCloseOnServerOperation(_EMPTY_FLAG) + cdef tuple ops = (op,) + await callback_start_batch(rpc_state, ops, loop) + if op.cancelled() and not rpc_task.done(): + rpc_task.cancel() + + cdef _CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = _CallbackFailureHandler( 'grpc_completion_queue_shutdown', 'Unknown', @@ -277,7 +292,7 @@ cdef class AioServer: self.add_generic_rpc_handlers(generic_handlers) self._serving_task = None - self._shutdown_lock = asyncio.Lock() + self._shutdown_lock = asyncio.Lock(loop=self._loop) self._shutdown_completed = self._loop.create_future() self._shutdown_callback_wrapper = CallbackWrapper( self._shutdown_completed, @@ -320,10 +335,20 @@ cdef class AioServer: self._cq, self._loop) - self._loop.create_task(_handle_rpc( - self._generic_handlers, - rpc_state, - self._loop)) + rpc_task = self._loop.create_task( + _handle_rpc( + self._generic_handlers, + rpc_state, + self._loop + ) + ) + self._loop.create_task( + _handle_cancellation_from_core( + rpc_task, + rpc_state, + self._loop + ) + ) def _serving_task_crash_handler(self, object task): """Shutdown the server immediately if unexpectedly exited.""" @@ -389,7 +414,14 @@ cdef class AioServer: await self._shutdown_completed else: try: - await asyncio.wait_for(asyncio.shield(self._shutdown_completed), grace) + await asyncio.wait_for( + asyncio.shield( + self._shutdown_completed, + loop=self._loop + ), + grace, + loop=self._loop, + ) except asyncio.TimeoutError: # Cancels all ongoing calls by the end of grace period. grpc_server_cancel_all_calls(self._server.c_server) @@ -410,7 +442,14 @@ cdef class AioServer: await self._shutdown_completed else: try: - await asyncio.wait_for(asyncio.shield(self._shutdown_completed), timeout) + await asyncio.wait_for( + asyncio.shield( + self._shutdown_completed, + loop=self._loop, + ), + timeout, + loop=self._loop, + ) except asyncio.TimeoutError: if self._crash_exception is not None: raise self._crash_exception From 4373d88caf62587682b7fe09d1d2f3b64961b44e Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 5 Nov 2019 11:27:34 -0800 Subject: [PATCH 14/14] Fix the tests after merging with Unified Stub Call --- .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 2 +- .../grpcio/grpc/experimental/aio/__init__.py | 3 -- .../grpcio_tests/tests_aio/unit/init_test.py | 39 ------------------- .../tests_aio/unit/server_test.py | 25 ++++++------ 4 files changed, 12 insertions(+), 57 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index ef4b7f8c51d..61335ca9e60 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -64,7 +64,7 @@ cdef class CallbackWrapper: grpc_experimental_completion_queue_functor* functor, int success): cdef CallbackContext *context = functor - if succeed == 0: + if success == 0: (<_CallbackFailureHandler>context.failure_handler).handle( context.waiter) else: diff --git a/src/python/grpcio/grpc/experimental/aio/__init__.py b/src/python/grpcio/grpc/experimental/aio/__init__.py index 3a736197d65..3f6b96eaa54 100644 --- a/src/python/grpcio/grpc/experimental/aio/__init__.py +++ b/src/python/grpcio/grpc/experimental/aio/__init__.py @@ -17,11 +17,8 @@ import abc import six import grpc -<<<<<<< HEAD -======= from grpc import _common from grpc._cython import cygrpc ->>>>>>> Add 4 server tests and 1 channel tests from grpc._cython.cygrpc import init_grpc_aio from ._call import AioRpcError diff --git a/src/python/grpcio_tests/tests_aio/unit/init_test.py b/src/python/grpcio_tests/tests_aio/unit/init_test.py index 300e2d92766..9f5d8bb0d85 100644 --- a/src/python/grpcio_tests/tests_aio/unit/init_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/init_test.py @@ -19,45 +19,6 @@ from tests_aio.unit._test_server import start_test_server from tests_aio.unit._test_base import AioTestBase -class TestAioRpcError(unittest.TestCase): - _TEST_INITIAL_METADATA = ("initial metadata",) - _TEST_TRAILING_METADATA = ("trailing metadata",) - - def test_attributes(self): - aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, - "details", self._TEST_TRAILING_METADATA) - self.assertEqual(aio_rpc_error.initial_metadata(), - self._TEST_INITIAL_METADATA) - self.assertEqual(aio_rpc_error.code(), grpc.StatusCode.OK) - self.assertEqual(aio_rpc_error.details(), "details") - self.assertEqual(aio_rpc_error.trailing_metadata(), - self._TEST_TRAILING_METADATA) - - def test_class_hierarchy(self): - aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, - "details", self._TEST_TRAILING_METADATA) - - self.assertIsInstance(aio_rpc_error, grpc.RpcError) - - def test_class_attributes(self): - aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, - "details", self._TEST_TRAILING_METADATA) - self.assertEqual(aio_rpc_error.__class__.__name__, "AioRpcError") - self.assertEqual(aio_rpc_error.__class__.__doc__, - aio.AioRpcError.__doc__) - - def test_class_singleton(self): - first_aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, - "details", - self._TEST_TRAILING_METADATA) - second_aio_rpc_error = aio.AioRpcError(self._TEST_INITIAL_METADATA, 0, - "details", - self._TEST_TRAILING_METADATA) - - self.assertIs(first_aio_rpc_error.__class__, - second_aio_rpc_error.__class__) - - class TestInsecureChannel(AioTestBase): def test_insecure_channel(self): diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py index 776d432bffc..1e86de65404 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import logging import unittest import time @@ -110,8 +111,7 @@ class TestServer(AioTestBase): server_target, server, generic_handler = await _start_test_server() channel = aio.insecure_channel(server_target) - call_task = self.loop.create_task( - channel.unary_unary(_BLOCK_BRIEFLY)(_REQUEST)) + call = channel.unary_unary(_BLOCK_BRIEFLY)(_REQUEST) await generic_handler.wait_for_call() shutdown_start_time = time.time() @@ -122,8 +122,8 @@ class TestServer(AioTestBase): # Validates the states. await channel.close() - self.assertEqual(_RESPONSE, await call_task) - self.assertTrue(call_task.done()) + self.assertEqual(_RESPONSE, await call) + self.assertTrue(call.done()) self.loop.run_until_complete(test_graceful_shutdown_success_body()) @@ -133,14 +133,13 @@ class TestServer(AioTestBase): server_target, server, generic_handler = await _start_test_server() channel = aio.insecure_channel(server_target) - call_task = self.loop.create_task( - channel.unary_unary(_BLOCK_FOREVER)(_REQUEST)) + call = channel.unary_unary(_BLOCK_FOREVER)(_REQUEST) await generic_handler.wait_for_call() await server.stop(test_constants.SHORT_TIMEOUT) with self.assertRaises(aio.AioRpcError) as exception_context: - await call_task + await call self.assertEqual(grpc.StatusCode.UNAVAILABLE, exception_context.exception.code()) self.assertIn('GOAWAY', exception_context.exception.details()) @@ -154,8 +153,7 @@ class TestServer(AioTestBase): server_target, server, generic_handler = await _start_test_server() channel = aio.insecure_channel(server_target) - call_task = self.loop.create_task( - channel.unary_unary(_BLOCK_BRIEFLY)(_REQUEST)) + call = channel.unary_unary(_BLOCK_BRIEFLY)(_REQUEST) await generic_handler.wait_for_call() # Expects the shortest grace period to be effective. @@ -170,8 +168,8 @@ class TestServer(AioTestBase): test_constants.SHORT_TIMEOUT / 3) await channel.close() - self.assertEqual(_RESPONSE, await call_task) - self.assertTrue(call_task.done()) + self.assertEqual(_RESPONSE, await call) + self.assertTrue(call.done()) self.loop.run_until_complete(test_concurrent_graceful_shutdown_body()) @@ -181,8 +179,7 @@ class TestServer(AioTestBase): server_target, server, generic_handler = await _start_test_server() channel = aio.insecure_channel(server_target) - call_task = self.loop.create_task( - channel.unary_unary(_BLOCK_FOREVER)(_REQUEST)) + call = channel.unary_unary(_BLOCK_FOREVER)(_REQUEST) await generic_handler.wait_for_call() # Expects no grace period, due to the "server.stop(None)". @@ -194,7 +191,7 @@ class TestServer(AioTestBase): ) with self.assertRaises(aio.AioRpcError) as exception_context: - await call_task + await call self.assertEqual(grpc.StatusCode.UNAVAILABLE, exception_context.exception.code()) self.assertIn('GOAWAY', exception_context.exception.details())