Adopt reviewer's advice:

1. Refactor (simplify) the failure handler;
2. Fix a memory leak for badly written application code.
pull/20805/head
Lidi Zheng 5 years ago
parent 8168b9e1c9
commit 8af510e1df
  1. 5
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi
  2. 71
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
  3. 8
      src/python/grpcio/grpc/experimental/aio/_server.py
  4. 4
      src/python/grpcio_tests/tests_aio/unit/server_test.py

@ -154,6 +154,11 @@ cdef class _AsyncioSocket:
self._writer.close()
if self._server:
self._server.close()
# NOTE(lidiz) If the asyncio.Server is created from a Python socket,
# the server.close() won't release the fd until the close() is called
# for the Python socket.
if self._py_socket:
self._py_socket.close()
def _new_connection_callback(self, object reader, object writer):
client_socket = _AsyncioSocket.create(

@ -25,44 +25,31 @@ cdef class _HandlerCallDetails:
class _ServicerContextPlaceHolder(object): pass
cdef class CallbackFailureHandler:
cdef str _c_core_api
cdef class _CallbackFailureHandler:
cdef str _core_function_name
cdef object _error_details
cdef object _exception_type
cdef object _callback # Callable[[Future], None]
def __cinit__(self,
str c_core_api="",
object error_details="UNKNOWN",
object exception_type=RuntimeError,
object callback=None):
"""Handles failure by raising exception or execute a callbcak.
The callback accepts a future, returns nothing. The callback is
expected to finish the future either "set_result" or "set_exception".
"""
if callback is None:
self._c_core_api = c_core_api
self._error_details = error_details
self._exception_type = exception_type
self._callback = self._raise_exception
else:
self._callback = callback
str core_function_name,
object error_details,
object exception_type):
"""Handles failure by raising exception."""
self._core_function_name = core_function_name
self._error_details = error_details
self._exception_type = exception_type
def _raise_exception(self, object future):
cdef handle(self, object future):
future.set_exception(self._exception_type(
'Failed "%s": %s' % (self._c_core_api, self._error_details)
'Failed "%s": %s' % (self._core_function_name, self._error_details)
))
cdef handle(self, object future):
self._callback(future)
# TODO(https://github.com/grpc/grpc/issues/20669)
# Apply this to the client-side
cdef class CallbackWrapper:
def __cinit__(self, object future, CallbackFailureHandler failure_handler):
def __cinit__(self, object future, _CallbackFailureHandler failure_handler):
self.context.functor.functor_run = self.functor_run
self.context.waiter = <cpython.PyObject*>future
self.context.failure_handler = <cpython.PyObject*>failure_handler
@ -77,7 +64,7 @@ cdef class CallbackWrapper:
int success):
cdef CallbackContext *context = <CallbackContext *>functor
if succeed == 0:
(<CallbackFailureHandler>context.failure_handler).handle(
(<_CallbackFailureHandler>context.failure_handler).handle(
<object>context.waiter)
else:
(<object>context.waiter).set_result(None)
@ -127,7 +114,7 @@ async def callback_start_batch(RPCState rpc_state,
cdef object future = loop.create_future()
cdef CallbackWrapper wrapper = CallbackWrapper(
future,
CallbackFailureHandler('callback_start_batch', operations))
_CallbackFailureHandler('callback_start_batch', operations, RuntimeError))
# NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed
# when calling "await". This is an over-optimization by Cython.
cpython.Py_INCREF(wrapper)
@ -206,7 +193,7 @@ async def _handle_rpc(list generic_handlers, RPCState rpc_state, object loop):
class _RequestCallError(Exception): pass
cdef CallbackFailureHandler REQUEST_CALL_FAILURE_HANDLER = CallbackFailureHandler(
cdef _CallbackFailureHandler REQUEST_CALL_FAILURE_HANDLER = _CallbackFailureHandler(
'grpc_server_request_call', 'server shutdown', _RequestCallError)
@ -236,7 +223,10 @@ async def _server_call_request_call(Server server,
return rpc_state
cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler('grpc_completion_queue_shutdown')
cdef _CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = _CallbackFailureHandler(
'grpc_completion_queue_shutdown',
'Unknown',
RuntimeError)
cdef class _CallbackCompletionQueue:
@ -261,14 +251,18 @@ cdef class _CallbackCompletionQueue:
grpc_completion_queue_destroy(self._cq)
cdef CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler('grpc_server_shutdown_and_notify')
cdef _CallbackFailureHandler SERVER_SHUTDOWN_FAILURE_HANDLER = _CallbackFailureHandler(
'grpc_server_shutdown_and_notify',
'Unknown',
RuntimeError)
cdef class AioServer:
def __init__(self, loop, thread_pool, generic_handlers, interceptors,
options, maximum_concurrent_rpcs, compression):
# C-Core objects won't be deallocated automatically.
# NOTE(lidiz) Core objects won't be deallocated automatically.
# If AioServer.shutdown is not called, those objects will leak.
self._server = Server(options)
self._cq = _CallbackCompletionQueue(loop)
grpc_server_register_completion_queue(
@ -311,7 +305,7 @@ cdef class AioServer:
async def _server_main_loop(self,
object server_started):
self._server.start(backup_queue=False)
self._server.start()
cdef RPCState rpc_state
server_started.set_result(True)
@ -344,8 +338,10 @@ cdef class AioServer:
await server_started
async def _start_shutting_down(self):
"""Prepares the server to shutting down (NOT coroutine-safe)."""
# Starts the shutdown process.
"""Prepares the server to shutting down.
This coroutine function is NOT coroutine-safe.
"""
# The shutdown callback won't be called until there is no live RPC.
grpc_server_shutdown_and_notify(
self._server.c_server,
@ -409,5 +405,10 @@ cdef class AioServer:
return True
def __dealloc__(self):
"""Deallocation of Core objects are ensured by Python grpc.aio.Server.
If the Cython representation is deallocated without underlying objects
freed, raise an RuntimeError.
"""
if self._status != AIO_SERVER_STATUS_STOPPED:
_LOGGER.error('__dealloc__ called on running server: %d', self._status)
raise RuntimeError('__dealloc__ called on running server: %d', self._status)

@ -132,6 +132,14 @@ class Server:
"""
return await self._server.wait_for_termination(timeout)
def __del__(self):
"""Schedules a graceful shutdown in current event loop.
The Cython AioServer doesn't hold a ref-count to this class. It should
be safe to slightly extend the underlying Cython object's life span.
"""
self._loop.create_task(self._server.shutdown(None))
def server(migration_thread_pool=None,
handlers=None,

@ -15,6 +15,7 @@
import logging
import unittest
import time
import gc
import grpc
from grpc.experimental import aio
@ -72,7 +73,8 @@ class TestServer(AioTestBase):
def test_unary_unary(self):
async def test_unary_unary_body():
server_target, _, _ = await _start_test_server()
result = await _start_test_server()
server_target = result[0]
async with aio.insecure_channel(server_target) as channel:
unary_call = channel.unary_unary(_SIMPLE_UNARY_UNARY)

Loading…
Cancel
Save