From 9f5dbf70dcf089fe1bdd7a246e64e217f8f29ec0 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 24 Oct 2019 18:38:20 -0700 Subject: [PATCH] Add shutdown test --- .../grpc/_cython/_cygrpc/aio/server.pxd.pxi | 2 + .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 82 +++++++++++++++---- .../grpc/_cython/_cygrpc/server.pyx.pxi | 32 +++++--- .../grpcio/grpc/experimental/aio/_server.py | 64 ++++++++------- .../tests_aio/unit/server_test.py | 10 +++ 5 files changed, 137 insertions(+), 53 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi index 1906463d088..99a1af09792 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -30,6 +30,7 @@ cdef enum AioServerStatus: AIO_SERVER_STATUS_READY AIO_SERVER_STATUS_RUNNING AIO_SERVER_STATUS_STOPPED + AIO_SERVER_STATUS_STOPPING cdef class _CallbackCompletionQueue: @@ -42,3 +43,4 @@ cdef class AioServer: cdef _CallbackCompletionQueue _cq cdef list _generic_handlers cdef AioServerStatus _status + cdef object _loop diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index dd6ff8b29d8..4a1c5aec48c 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +_LOGGER = logging.getLogger(__name__) + cdef class _HandlerCallDetails: def __cinit__(self, str method, tuple invocation_metadata): self.method = method @@ -186,10 +188,10 @@ async def _server_call_request_call(Server server, return rpc_state -async def _server_main_loop(Server server, +async def _server_main_loop(object loop, + Server server, _CallbackCompletionQueue cq, list generic_handlers): - cdef object loop = asyncio.get_event_loop() cdef RPCState rpc_state while True: @@ -201,11 +203,12 @@ async def _server_main_loop(Server server, loop.create_task(_handle_rpc(generic_handlers, rpc_state, loop)) -async def _server_start(Server server, +async def _server_start(object loop, + Server server, _CallbackCompletionQueue cq, list generic_handlers): - server.start() - await _server_main_loop(server, cq, generic_handlers) + server.start(backup_queue=False) + await _server_main_loop(loop, server, cq, generic_handlers) cdef class _CallbackCompletionQueue: @@ -222,17 +225,18 @@ cdef class _CallbackCompletionQueue: cdef class AioServer: - def __init__(self, thread_pool, generic_handlers, interceptors, options, - maximum_concurrent_rpcs, compression): + def __init__(self, loop, thread_pool, generic_handlers, interceptors, + options, maximum_concurrent_rpcs, compression): + self._loop = loop self._server = Server(options) self._cq = _CallbackCompletionQueue() - self._status = AIO_SERVER_STATUS_READY - self._generic_handlers = [] grpc_server_register_completion_queue( self._server.c_server, self._cq.c_ptr(), NULL ) + self._status = AIO_SERVER_STATUS_READY + self._generic_handlers = [] self.add_generic_rpc_handlers(generic_handlers) if interceptors: @@ -262,14 +266,62 @@ cdef class AioServer: raise RuntimeError('Server not in ready state') self._status = AIO_SERVER_STATUS_RUNNING - loop = asyncio.get_event_loop() - loop.create_task(_server_start( + self._loop.create_task(_server_start( + self._loop, self._server, self._cq, self._generic_handlers, )) - # TODO(https://github.com/grpc/grpc/issues/20668) - # Implement Destruction Methods for AsyncIO Server - def stop(self, unused_grace): - pass + async def shutdown(self, grace): + """Gracefully shutdown the C-Core server. + + Application should only call shutdown once. + + Args: + grace: An optional float indicates the length of grace period in + seconds. + """ + if self._status != AIO_SERVER_STATUS_RUNNING: + # The server either is shutting down, or not started. + return + cdef object shutdown_completed = self._loop.create_future() + cdef CallbackWrapper wrapper = CallbackWrapper(shutdown_completed) + # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed + # when calling "await". This is an over-optimization by Cython. + cpython.Py_INCREF(wrapper) + + # Starts the shutdown process. + # The shutdown callback won't be called unless there is no live RPC. + grpc_server_shutdown_and_notify( + self._server.c_server, + self._cq._cq, + wrapper.c_functor()) + self._server.is_shutting_down = True + self._status = AIO_SERVER_STATUS_STOPPING + + if grace is None: + # Directly cancels all calls + grpc_server_cancel_all_calls(self._server.c_server) + await shutdown_completed + else: + try: + await asyncio.wait_for(shutdown_completed, grace) + except asyncio.TimeoutError: + # Cancels all ongoing calls by the end of grace period. + grpc_server_cancel_all_calls(self._server.c_server) + await shutdown_completed + + # Keeps wrapper object alive until now. + cpython.Py_DECREF(wrapper) + grpc_server_destroy(self._server.c_server) + self._server.c_server = NULL + self._server.is_shutdown = True + self._status = AIO_SERVER_STATUS_STOPPED + + def __dealloc__(self): + if self._status == AIO_SERVER_STATUS_STOPPED: + grpc_completion_queue_shutdown(self._cq._cq) + grpc_completion_queue_destroy(self._cq._cq) + else: + _LOGGER.error('Server is not stopped while deallocation: %d', self._status) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 67b2e9d4e88..7ca332050b5 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -61,16 +61,25 @@ cdef class Server: self.c_server, queue.c_completion_queue, NULL) self.registered_completion_queues.append(queue) - def start(self): + def start(self, backup_queue=True): + """Start the Cython gRPC Server. + + Args: + backup_queue: a bool indicates whether to spawn a backup completion + queue. In case of no CQ is bound to the server, and the shutdown + process of server becomes un-observable. + """ if self.is_started: raise ValueError("the server has already started") - self.backup_shutdown_queue = CompletionQueue(shutdown_cq=True) - self.register_completion_queue(self.backup_shutdown_queue) + if backup_queue: + self.backup_shutdown_queue = CompletionQueue(shutdown_cq=True) + self.register_completion_queue(self.backup_shutdown_queue) self.is_started = True with nogil: grpc_server_start(self.c_server) - # Ensure the core has gotten a chance to do the start-up work - self.backup_shutdown_queue.poll(deadline=time.time()) + if backup_queue: + # Ensure the core has gotten a chance to do the start-up work + self.backup_shutdown_queue.poll(deadline=time.time()) def add_http2_port(self, bytes address, ServerCredentials server_credentials=None): @@ -134,11 +143,14 @@ cdef class Server: elif self.is_shutdown: pass elif not self.is_shutting_down: - # the user didn't call shutdown - use our backup queue - self._c_shutdown(self.backup_shutdown_queue, None) - # and now we wait - while not self.is_shutdown: - self.backup_shutdown_queue.poll() + if self.backup_shutdown_queue is None: + raise RuntimeError('Server shutdown failed: no completion queue.') + else: + # the user didn't call shutdown - use our backup queue + self._c_shutdown(self.backup_shutdown_queue, None) + # and now we wait + while not self.is_shutdown: + self.backup_shutdown_queue.poll() else: # We're in the process of shutting down, but have not shutdown; can't do # much but repeatedly release the GIL and wait diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py index 6bc3d210aed..ca39cef905f 100644 --- a/src/python/grpcio/grpc/experimental/aio/_server.py +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -25,9 +25,17 @@ class Server: def __init__(self, thread_pool, generic_handlers, interceptors, options, maximum_concurrent_rpcs, compression): - self._server = cygrpc.AioServer(thread_pool, generic_handlers, - interceptors, options, - maximum_concurrent_rpcs, compression) + self._loop = asyncio.get_event_loop() + self._server = cygrpc.AioServer( + self._loop, + thread_pool, + generic_handlers, + interceptors, + options, + maximum_concurrent_rpcs, + compression) + self._shutdown_started = False + self._shutdown_future = self._loop.create_future() def add_generic_rpc_handlers( self, @@ -83,35 +91,32 @@ class Server: """ await self._server.start() - def stop(self, grace: Optional[float]) -> asyncio.Event: + async def stop(self, grace: Optional[float]) -> None: """Stops this Server. - "This method immediately stops the server from servicing new RPCs in + This method immediately stops the server from servicing new RPCs in all cases. - If a grace period is specified, this method returns immediately - and all RPCs active at the end of the grace period are aborted. - If a grace period is not specified (by passing None for `grace`), - all existing RPCs are aborted immediately and this method - blocks until the last RPC handler terminates. + If a grace period is specified, all RPCs active at the end of the grace + period are aborted. + + If a grace period is not specified (by passing None for `grace`), all + existing RPCs are aborted immediately and this method blocks until the + last RPC handler terminates. - This method is idempotent and may be called at any time. - Passing a smaller grace value in a subsequent call will have - the effect of stopping the Server sooner (passing None will - have the effect of stopping the server immediately). Passing - a larger grace value in a subsequent call *will not* have the - effect of stopping the server later (i.e. the most restrictive - grace value is used). + Only the first call to "stop" sets the length of grace period. + Additional calls is allowed and will block until the termination of + the server. Args: grace: A duration of time in seconds or None. - - Returns: - A threading.Event that will be set when this Server has completely - stopped, i.e. when running RPCs either complete or are aborted and - all handlers have terminated. """ - raise NotImplementedError() + if self._shutdown_started: + await self._shutdown_future + else: + self._shutdown_started = True + await self._server.shutdown(grace) + self._shutdown_future.set_result(None) async def wait_for_termination(self, timeout: Optional[float] = None) -> bool: @@ -135,11 +140,14 @@ class Server: Returns: A bool indicates if the operation times out. """ - if timeout: - raise NotImplementedError() - # TODO(lidiz) replace this wait forever logic - future = asyncio.get_event_loop().create_future() - await future + if timeout == None: + await self._shutdown_future + else: + try: + await asyncio.wait_for(self._shutdown_future, timeout) + except asyncio.TimeoutError: + return False + return True def server(migration_thread_pool=None, diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py index 937cce9eebb..924dff836f4 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -51,6 +51,16 @@ class TestServer(AioTestBase): self.assertEqual(response, _RESPONSE) self.loop.run_until_complete(test_unary_unary_body()) + + def test_shutdown(self): + + async def test_shutdown_body(): + server = aio.server() + port = server.add_insecure_port('[::]:0') + server.add_generic_rpc_handlers((GenericHandler(),)) + await server.start() + await server.stop(None) + asyncio.get_event_loop().run_until_complete(test_shutdown_body()) if __name__ == '__main__':