diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi index 7c5c3fa2e9a..eed496df7c6 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi @@ -154,6 +154,11 @@ cdef class _AsyncioSocket: self._writer.close() if self._server: self._server.close() + # NOTE(lidiz) If the asyncio.Server is created from a Python socket, + # the server.close() won't release the fd until the close() is called + # for the Python socket. + if self._py_socket: + self._py_socket.close() def _new_connection_callback(self, object reader, object writer): client_socket = _AsyncioSocket.create( diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 6f3098d619f..8b0a4e0a4b3 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -25,44 +25,31 @@ cdef class _HandlerCallDetails: class _ServicerContextPlaceHolder(object): pass -cdef class CallbackFailureHandler: - cdef str _c_core_api +cdef class _CallbackFailureHandler: + cdef str _core_function_name cdef object _error_details cdef object _exception_type - cdef object _callback # Callable[[Future], None] def __cinit__(self, - str c_core_api="", - object error_details="UNKNOWN", - object exception_type=RuntimeError, - object callback=None): - """Handles failure by raising exception or execute a callbcak. - - The callback accepts a future, returns nothing. The callback is - expected to finish the future either "set_result" or "set_exception". - """ - if callback is None: - self._c_core_api = c_core_api - self._error_details = error_details - self._exception_type = exception_type - self._callback = self._raise_exception - else: - self._callback = callback + str core_function_name, + object error_details, + object exception_type): + """Handles failure by raising exception.""" + self._core_function_name = core_function_name + self._error_details = error_details + self._exception_type = exception_type - def _raise_exception(self, object future): + cdef handle(self, object future): future.set_exception(self._exception_type( - 'Failed "%s": %s' % (self._c_core_api, self._error_details) + 'Failed "%s": %s' % (self._core_function_name, self._error_details) )) - cdef handle(self, object future): - self._callback(future) - # TODO(https://github.com/grpc/grpc/issues/20669) # Apply this to the client-side cdef class CallbackWrapper: - def __cinit__(self, object future, CallbackFailureHandler failure_handler): + def __cinit__(self, object future, _CallbackFailureHandler failure_handler): self.context.functor.functor_run = self.functor_run self.context.waiter = future self.context.failure_handler = failure_handler @@ -77,7 +64,7 @@ cdef class CallbackWrapper: int success): cdef CallbackContext *context = functor if succeed == 0: - (context.failure_handler).handle( + (<_CallbackFailureHandler>context.failure_handler).handle( context.waiter) else: (context.waiter).set_result(None) @@ -127,7 +114,7 @@ async def callback_start_batch(RPCState rpc_state, cdef object future = loop.create_future() cdef CallbackWrapper wrapper = CallbackWrapper( future, - CallbackFailureHandler('callback_start_batch', operations)) + _CallbackFailureHandler('callback_start_batch', operations, RuntimeError)) # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed # when calling "await". This is an over-optimization by Cython. cpython.Py_INCREF(wrapper) @@ -206,7 +193,7 @@ async def _handle_rpc(list generic_handlers, RPCState rpc_state, object loop): class _RequestCallError(Exception): pass -cdef CallbackFailureHandler REQUEST_CALL_FAILURE_HANDLER = CallbackFailureHandler( +cdef _CallbackFailureHandler REQUEST_CALL_FAILURE_HANDLER = _CallbackFailureHandler( 'grpc_server_request_call', 'server shutdown', _RequestCallError) @@ -236,7 +223,10 @@ async def _server_call_request_call(Server server, return rpc_state -cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler('grpc_completion_queue_shutdown') +cdef _CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = _CallbackFailureHandler( + 'grpc_completion_queue_shutdown', + 'Unknown', + RuntimeError) cdef class _CallbackCompletionQueue: @@ -261,14 +251,18 @@ cdef class _CallbackCompletionQueue: grpc_completion_queue_destroy(self._cq) -cdef CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler('grpc_server_shutdown_and_notify') +cdef _CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = _CallbackFailureHandler( + 'grpc_server_shutdown_and_notify', + 'Unknown', + RuntimeError) cdef class AioServer: def __init__(self, loop, thread_pool, generic_handlers, interceptors, options, maximum_concurrent_rpcs, compression): - # C-Core objects won't be deallocated automatically. + # NOTE(lidiz) Core objects won't be deallocated automatically. + # If AioServer.shutdown is not called, those objects will leak. self._server = Server(options) self._cq = _CallbackCompletionQueue(loop) grpc_server_register_completion_queue( @@ -311,7 +305,7 @@ cdef class AioServer: async def _server_main_loop(self, object server_started): - self._server.start(backup_queue=False) + self._server.start() cdef RPCState rpc_state server_started.set_result(True) @@ -344,8 +338,10 @@ cdef class AioServer: await server_started async def _start_shutting_down(self): - """Prepares the server to shutting down (NOT coroutine-safe).""" - # Starts the shutdown process. + """Prepares the server to shutting down. + + This coroutine function is NOT coroutine-safe. + """ # The shutdown callback won't be called until there is no live RPC. grpc_server_shutdown_and_notify( self._server.c_server, @@ -409,5 +405,10 @@ cdef class AioServer: return True def __dealloc__(self): + """Deallocation of Core objects are ensured by Python grpc.aio.Server. + + If the Cython representation is deallocated without underlying objects + freed, raise an RuntimeError. + """ if self._status != AIO_SERVER_STATUS_STOPPED: - _LOGGER.error('__dealloc__ called on running server: %d', self._status) + raise RuntimeError('__dealloc__ called on running server: %d', self._status) diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py index 86d38e6525d..479951b1ceb 100644 --- a/src/python/grpcio/grpc/experimental/aio/_server.py +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -132,6 +132,14 @@ class Server: """ return await self._server.wait_for_termination(timeout) + def __del__(self): + """Schedules a graceful shutdown in current event loop. + + The Cython AioServer doesn't hold a ref-count to this class. It should + be safe to slightly extend the underlying Cython object's life span. + """ + self._loop.create_task(self._server.shutdown(None)) + def server(migration_thread_pool=None, handlers=None, diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py index 6c96d478c64..776d432bffc 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -15,6 +15,7 @@ import logging import unittest import time +import gc import grpc from grpc.experimental import aio @@ -72,7 +73,8 @@ class TestServer(AioTestBase): def test_unary_unary(self): async def test_unary_unary_body(): - server_target, _, _ = await _start_test_server() + result = await _start_test_server() + server_target = result[0] async with aio.insecure_channel(server_target) as channel: unary_call = channel.unary_unary(_SIMPLE_UNARY_UNARY)