From 8168b9e1c95c6c8260442a68561a5e45f645959b Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 29 Oct 2019 15:11:03 -0700 Subject: [PATCH] Adopt reviewer's advice * Make graceful shutdown support calls from multi-coroutine * Added comments * Make graceful shutdown success test case more strict * Add 2 more concurrent graceful shutdown tests --- .../_cygrpc/aio/callbackcontext.pxd.pxi | 13 ++- .../grpc/_cython/_cygrpc/aio/server.pxd.pxi | 3 + .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 89 +++++++++++-------- .../grpc/_cython/_cygrpc/server.pyx.pxi | 4 +- .../grpcio/grpc/experimental/aio/_server.py | 38 +++----- .../tests_aio/unit/channel_test.py | 2 +- .../tests_aio/unit/server_test.py | 79 ++++++++++++++-- 7 files changed, 153 insertions(+), 75 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi index 38b7e9b3aed..beada919f4d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi @@ -15,6 +15,15 @@ cimport cpython cdef struct CallbackContext: + # C struct to store callback context in the form of pointers. + # + # Attributes: + # functor: A grpc_experimental_completion_queue_functor represents the + # callback function in the only way C-Core understands. + # waiter: An asyncio.Future object that fulfills when the callback is + # invoked by C-Core. + # failure_handler: A CallbackFailureHandler object that called when C-Core + # returns 'success == 0' state. grpc_experimental_completion_queue_functor functor - cpython.PyObject *waiter # asyncio.Future - cpython.PyObject *failure_handler # cygrpc.CallbackFailureHandler + cpython.PyObject *waiter + cpython.PyObject *failure_handler diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi index 0a3c2e4d9af..fdd9541f0eb 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -61,3 +61,6 @@ cdef class AioServer: cdef AioServerStatus _status cdef object _loop # asyncio.EventLoop cdef object _serving_task # asyncio.Task + cdef object _shutdown_lock # asyncio.Lock + cdef object _shutdown_completed # asyncio.Future + cdef CallbackWrapper _shutdown_callback_wrapper diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 413abc24b56..6f3098d619f 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +# TODO(https://github.com/grpc/grpc/issues/20850) refactor this. _LOGGER = logging.getLogger(__name__) @@ -282,6 +283,12 @@ cdef class AioServer: self.add_generic_rpc_handlers(generic_handlers) self._serving_task = None + self._shutdown_lock = asyncio.Lock() + self._shutdown_completed = self._loop.create_future() + self._shutdown_callback_wrapper = CallbackWrapper( + self._shutdown_completed, + SERVER_SHUTDOWN_FAILURE_HANDLER) + if interceptors: raise NotImplementedError() if maximum_concurrent_rpcs: @@ -309,7 +316,7 @@ cdef class AioServer: server_started.set_result(True) while True: - # When shutdown process starts, no more new connections. + # When shutdown begins, no more new connections. if self._status != AIO_SERVER_STATUS_RUNNING: break @@ -336,34 +343,14 @@ cdef class AioServer: # Otherwise, the actual start time of the server is un-controllable. await server_started - async def shutdown(self, grace): - """Gracefully shutdown the C-Core server. - - Application should only call shutdown once. - - Args: - grace: An optional float indicates the length of grace period in - seconds. - """ - if self._status != AIO_SERVER_STATUS_RUNNING: - # The server either is shutting down, or not started. - return - cdef object shutdown_completed = self._loop.create_future() - cdef CallbackWrapper wrapper = CallbackWrapper( - shutdown_completed, - SERVER_SHUTDOWN_FAILURE_HANDLER) - # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed - # when calling "await". This is an over-optimization by Cython. - cpython.Py_INCREF(wrapper) - + async def _start_shutting_down(self): + """Prepares the server to shutting down (NOT coroutine-safe).""" # Starts the shutdown process. - # The shutdown callback won't be called unless there is no live RPC. + # The shutdown callback won't be called until there is no live RPC. grpc_server_shutdown_and_notify( self._server.c_server, self._cq._cq, - wrapper.c_functor()) - self._server.is_shutting_down = True - self._status = AIO_SERVER_STATUS_STOPPING + self._shutdown_callback_wrapper.c_functor()) # Ensures the serving task (coroutine) exits. try: @@ -371,28 +358,56 @@ cdef class AioServer: except _RequestCallError: pass + async def shutdown(self, grace): + """Gracefully shutdown the C-Core server. + + Application should only call shutdown once. + + Args: + grace: An optional float indicating the length of grace period in + seconds. + """ + if self._status == AIO_SERVER_STATUS_READY or self._status == AIO_SERVER_STATUS_STOPPED: + return + + async with self._shutdown_lock: + if self._status == AIO_SERVER_STATUS_RUNNING: + await self._start_shutting_down() + self._server.is_shutting_down = True + self._status = AIO_SERVER_STATUS_STOPPING + if grace is None: # Directly cancels all calls grpc_server_cancel_all_calls(self._server.c_server) - await shutdown_completed + await self._shutdown_completed else: try: - await asyncio.wait_for(asyncio.shield(shutdown_completed), grace) + await asyncio.wait_for(asyncio.shield(self._shutdown_completed), grace) except asyncio.TimeoutError: # Cancels all ongoing calls by the end of grace period. grpc_server_cancel_all_calls(self._server.c_server) - await shutdown_completed + await self._shutdown_completed - # Keeps wrapper object alive until now. - cpython.Py_DECREF(wrapper) - grpc_server_destroy(self._server.c_server) - self._server.c_server = NULL - self._server.is_shutdown = True - self._status = AIO_SERVER_STATUS_STOPPED + async with self._shutdown_lock: + if self._status == AIO_SERVER_STATUS_STOPPING: + grpc_server_destroy(self._server.c_server) + self._server.c_server = NULL + self._server.is_shutdown = True + self._status = AIO_SERVER_STATUS_STOPPED - # Shuts down the completion queue - await self._cq.shutdown() + # Shuts down the completion queue + await self._cq.shutdown() + + async def wait_for_termination(self, float timeout): + if timeout is None: + await self._shutdown_completed + else: + try: + await asyncio.wait_for(self._shutdown_completed, timeout) + except asyncio.TimeoutError: + return False + return True def __dealloc__(self): if self._status != AIO_SERVER_STATUS_STOPPED: - _LOGGER.error('Server is not stopped while deallocation: %d', self._status) + _LOGGER.error('__dealloc__ called on running server: %d', self._status) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 7ca332050b5..4ce554d078f 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -66,8 +66,8 @@ cdef class Server: Args: backup_queue: a bool indicates whether to spawn a backup completion - queue. In case of no CQ is bound to the server, and the shutdown - process of server becomes un-observable. + queue. In the case that no CQ is bound to the server, and the shutdown + of server becomes un-observable. """ if self.is_started: raise ValueError("the server has already started") diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py index d49fe6644c3..86d38e6525d 100644 --- a/src/python/grpcio/grpc/experimental/aio/_server.py +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -29,8 +29,6 @@ class Server: self._server = cygrpc.AioServer(self._loop, thread_pool, generic_handlers, interceptors, options, maximum_concurrent_rpcs, compression) - self._shutdown_started = False - self._shutdown_future = self._loop.create_future() def add_generic_rpc_handlers( self, @@ -92,26 +90,23 @@ class Server: This method immediately stops the server from servicing new RPCs in all cases. - If a grace period is specified, all RPCs active at the end of the grace - period are aborted. + If a grace period is specified, this method returns immediately and all + RPCs active at the end of the grace period are aborted. If a grace + period is not specified (by passing None for grace), all existing RPCs + are aborted immediately and this method blocks until the last RPC + handler terminates. - If a grace period is not specified (by passing None for `grace`), all - existing RPCs are aborted immediately and this method blocks until the - last RPC handler terminates. - - Only the first call to "stop" sets the length of grace period. - Additional calls is allowed and will block until the termination of - the server. + This method is idempotent and may be called at any time. Passing a + smaller grace value in a subsequent call will have the effect of + stopping the Server sooner (passing None will have the effect of + stopping the server immediately). Passing a larger grace value in a + subsequent call will not have the effect of stopping the server later + (i.e. the most restrictive grace value is used). Args: grace: A duration of time in seconds or None. """ - if self._shutdown_started: - await self._shutdown_future - else: - self._shutdown_started = True - await self._server.shutdown(grace) - self._shutdown_future.set_result(None) + await self._server.shutdown(grace) async def wait_for_termination(self, timeout: Optional[float] = None) -> bool: @@ -135,14 +130,7 @@ class Server: Returns: A bool indicates if the operation times out. """ - if timeout is None: - await self._shutdown_future - else: - try: - await asyncio.wait_for(self._shutdown_future, timeout) - except asyncio.TimeoutError: - return False - return True + return await self._server.wait_for_termination(timeout) def server(migration_thread_pool=None, diff --git a/src/python/grpcio_tests/tests_aio/unit/channel_test.py b/src/python/grpcio_tests/tests_aio/unit/channel_test.py index 2809f3b4b37..076300786fb 100644 --- a/src/python/grpcio_tests/tests_aio/unit/channel_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/channel_test.py @@ -108,7 +108,7 @@ class TestChannel(AioTestBase): response_deserializer=messages_pb2.SimpleResponse.FromString) response = await hi(messages_pb2.SimpleRequest()) - self.assertEqual(type(response), messages_pb2.SimpleResponse) + self.assertIs(type(response), messages_pb2.SimpleResponse) await channel.close() diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py index d8f1c384043..6c96d478c64 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -14,6 +14,7 @@ import logging import unittest +import time import grpc from grpc.experimental import aio @@ -22,7 +23,7 @@ from tests.unit.framework.common import test_constants _SIMPLE_UNARY_UNARY = '/test/SimpleUnaryUnary' _BLOCK_FOREVER = '/test/BlockForever' -_BLOCK_SHORTLY = '/test/BlockShortly' +_BLOCK_BRIEFLY = '/test/BlockBriefly' _REQUEST = b'\x00\x00\x00' _RESPONSE = b'\x01\x01\x01' @@ -40,7 +41,7 @@ class _GenericHandler(grpc.GenericRpcHandler): async def _block_forever(self, unused_request, unused_context): await asyncio.get_event_loop().create_future() - async def _block_shortly(self, unused_request, unused_context): + async def _BLOCK_BRIEFLY(self, unused_request, unused_context): await asyncio.sleep(test_constants.SHORT_TIMEOUT / 2) return _RESPONSE @@ -50,8 +51,8 @@ class _GenericHandler(grpc.GenericRpcHandler): return grpc.unary_unary_rpc_method_handler(self._unary_unary) if handler_details.method == _BLOCK_FOREVER: return grpc.unary_unary_rpc_method_handler(self._block_forever) - if handler_details.method == _BLOCK_SHORTLY: - return grpc.unary_unary_rpc_method_handler(self._block_shortly) + if handler_details.method == _BLOCK_BRIEFLY: + return grpc.unary_unary_rpc_method_handler(self._BLOCK_BRIEFLY) async def wait_for_call(self): await self._called @@ -87,6 +88,7 @@ class TestServer(AioTestBase): await server.stop(None) self.loop.run_until_complete(test_shutdown_body()) + # Ensures no SIGSEGV triggered, and ends within timeout. def test_shutdown_after_call(self): @@ -107,12 +109,18 @@ class TestServer(AioTestBase): channel = aio.insecure_channel(server_target) call_task = self.loop.create_task( - channel.unary_unary(_BLOCK_SHORTLY)(_REQUEST)) + channel.unary_unary(_BLOCK_BRIEFLY)(_REQUEST)) await generic_handler.wait_for_call() + shutdown_start_time = time.time() await server.stop(test_constants.SHORT_TIMEOUT) + grace_period_length = time.time() - shutdown_start_time + self.assertGreater(grace_period_length, + test_constants.SHORT_TIMEOUT / 3) + + # Validates the states. await channel.close() - self.assertEqual(await call_task, _RESPONSE) + self.assertEqual(_RESPONSE, await call_task) self.assertTrue(call_task.done()) self.loop.run_until_complete(test_graceful_shutdown_success_body()) @@ -131,13 +139,68 @@ class TestServer(AioTestBase): with self.assertRaises(aio.AioRpcError) as exception_context: await call_task - self.assertEqual(exception_context.exception.code(), - grpc.StatusCode.UNAVAILABLE) + self.assertEqual(grpc.StatusCode.UNAVAILABLE, + exception_context.exception.code()) self.assertIn('GOAWAY', exception_context.exception.details()) await channel.close() self.loop.run_until_complete(test_graceful_shutdown_failed_body()) + def test_concurrent_graceful_shutdown(self): + + async def test_concurrent_graceful_shutdown_body(): + server_target, server, generic_handler = await _start_test_server() + + channel = aio.insecure_channel(server_target) + call_task = self.loop.create_task( + channel.unary_unary(_BLOCK_BRIEFLY)(_REQUEST)) + await generic_handler.wait_for_call() + + # Expects the shortest grace period to be effective. + shutdown_start_time = time.time() + await asyncio.gather( + server.stop(test_constants.LONG_TIMEOUT), + server.stop(test_constants.SHORT_TIMEOUT), + server.stop(test_constants.LONG_TIMEOUT), + ) + grace_period_length = time.time() - shutdown_start_time + self.assertGreater(grace_period_length, + test_constants.SHORT_TIMEOUT / 3) + + await channel.close() + self.assertEqual(_RESPONSE, await call_task) + self.assertTrue(call_task.done()) + + self.loop.run_until_complete(test_concurrent_graceful_shutdown_body()) + + def test_concurrent_graceful_shutdown_immediate(self): + + async def test_concurrent_graceful_shutdown_immediate_body(): + server_target, server, generic_handler = await _start_test_server() + + channel = aio.insecure_channel(server_target) + call_task = self.loop.create_task( + channel.unary_unary(_BLOCK_FOREVER)(_REQUEST)) + await generic_handler.wait_for_call() + + # Expects no grace period, due to the "server.stop(None)". + await asyncio.gather( + server.stop(test_constants.LONG_TIMEOUT), + server.stop(None), + server.stop(test_constants.SHORT_TIMEOUT), + server.stop(test_constants.LONG_TIMEOUT), + ) + + with self.assertRaises(aio.AioRpcError) as exception_context: + await call_task + self.assertEqual(grpc.StatusCode.UNAVAILABLE, + exception_context.exception.code()) + self.assertIn('GOAWAY', exception_context.exception.details()) + await channel.close() + + self.loop.run_until_complete( + test_concurrent_graceful_shutdown_immediate_body()) + @unittest.skip('https://github.com/grpc/grpc/issues/20818') def test_shutdown_before_call(self):