Add configurable exit grace periods and shutdown handlers

The server cleanup method is untested.

The join() function that exposes it is only called by the internals of threading.py, and we don't hold a reference to the server thread to explicitly join() it, and I'm not sure we should add a reference just for this purpose.

Moreover, the threading.py only calls join(None), the code path in question isn't even exercised by the internals of threading.py. Its just there to make sure we properly follow the join(timeout) semantics.
pull/8707/head
Ken Payson 8 years ago
parent d960c8b166
commit 3045a379aa
  1. 30
      src/python/grpcio/grpc/__init__.py
  2. 86
      src/python/grpcio/grpc/_server.py
  3. 1
      src/python/grpcio_tests/tests/tests.json
  4. 23
      src/python/grpcio_tests/tests/unit/_exit_test.py

@ -904,6 +904,21 @@ class Server(six.with_metaclass(abc.ABCMeta)):
""" """
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod
def add_shutdown_handler(self, shutdown_handler):
"""Adds a handler to be called on server shutdown.
Shutdown handlers are run on server stop() or in the event that a running
server is destroyed unexpectedly. The handlers are run in series before
the stop grace period.
Args:
shutdown_handler: A function taking a single arg, a time in seconds
within which the handler should complete. None indicates the handler can
run for any duration.
"""
raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def start(self): def start(self):
"""Starts this Server's service of RPCs. """Starts this Server's service of RPCs.
@ -914,7 +929,7 @@ class Server(six.with_metaclass(abc.ABCMeta)):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def stop(self, grace): def stop(self, grace, shutdown_handler_grace=None):
"""Stops this Server's service of RPCs. """Stops this Server's service of RPCs.
All calls to this method immediately stop service of new RPCs. When existing All calls to this method immediately stop service of new RPCs. When existing
@ -937,6 +952,8 @@ class Server(six.with_metaclass(abc.ABCMeta)):
aborted by this Server's stopping. If None, all RPCs will be aborted aborted by this Server's stopping. If None, all RPCs will be aborted
immediately and this method will block until this Server is completely immediately and this method will block until this Server is completely
stopped. stopped.
shutdown_handler_grace: A duration of time in seconds or None. This
value is passed to all shutdown handlers.
Returns: Returns:
A threading.Event that will be set when this Server has completely A threading.Event that will be set when this Server has completely
@ -1231,7 +1248,8 @@ def secure_channel(target, credentials, options=None):
credentials._credentials) credentials._credentials)
def server(thread_pool, handlers=None, options=None): def server(thread_pool, handlers=None, options=None, exit_grace=None,
exit_shutdown_handler_grace=None):
"""Creates a Server with which RPCs can be serviced. """Creates a Server with which RPCs can be serviced.
Args: Args:
@ -1244,13 +1262,19 @@ def server(thread_pool, handlers=None, options=None):
returned Server is started. returned Server is started.
options: A sequence of string-value pairs according to which to configure options: A sequence of string-value pairs according to which to configure
the created server. the created server.
exit_grace: The grace period to use when terminating
running servers at interpreter exit. None indicates unspecified.
exit_shutdown_handler_grace: The shutdown handler grace to use when
terminating running servers at interpreter exit. None indicates
unspecified.
Returns: Returns:
A Server with which RPCs can be serviced. A Server with which RPCs can be serviced.
""" """
from grpc import _server from grpc import _server
return _server.Server(thread_pool, () if handlers is None else handlers, return _server.Server(thread_pool, () if handlers is None else handlers,
() if options is None else options) () if options is None else options, exit_grace,
exit_shutdown_handler_grace)
################################### __all__ ################################# ################################### __all__ #################################

@ -60,7 +60,8 @@ _CANCELLED = 'cancelled'
_EMPTY_FLAGS = 0 _EMPTY_FLAGS = 0
_EMPTY_METADATA = cygrpc.Metadata(()) _EMPTY_METADATA = cygrpc.Metadata(())
_UNEXPECTED_EXIT_SERVER_GRACE = 1.0 _DEFAULT_EXIT_GRACE = 1.0
_DEFAULT_EXIT_SHUTDOWN_HANDLER_GRACE = 5.0
def _serialized_request(request_event): def _serialized_request(request_event):
@ -595,14 +596,18 @@ class _ServerStage(enum.Enum):
class _ServerState(object): class _ServerState(object):
def __init__(self, completion_queue, server, generic_handlers, thread_pool): def __init__(self, completion_queue, server, generic_handlers, thread_pool,
exit_grace, exit_shutdown_handler_grace):
self.lock = threading.Lock() self.lock = threading.Lock()
self.completion_queue = completion_queue self.completion_queue = completion_queue
self.server = server self.server = server
self.generic_handlers = list(generic_handlers) self.generic_handlers = list(generic_handlers)
self.thread_pool = thread_pool self.thread_pool = thread_pool
self.exit_grace = exit_grace
self.exit_shutdown_handler_grace = exit_shutdown_handler_grace
self.stage = _ServerStage.STOPPED self.stage = _ServerStage.STOPPED
self.shutdown_events = None self.shutdown_events = None
self.shutdown_handlers = []
# TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields. # TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields.
self.rpc_states = set() self.rpc_states = set()
@ -672,41 +677,45 @@ def _serve(state):
return return
def _stop(state, grace): def _stop(state, grace, shutdown_handler_grace):
with state.lock: shutdown_event = threading.Event()
if state.stage is _ServerStage.STOPPED:
shutdown_event = threading.Event() def cancel_all_calls_after_grace():
shutdown_event.set() with state.lock:
return shutdown_event if state.stage is _ServerStage.STOPPED:
else: shutdown_event.set()
if state.stage is _ServerStage.STARTED: return
state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG) elif state.stage is _ServerStage.STARTED:
do_shutdown = True
state.stage = _ServerStage.GRACE state.stage = _ServerStage.GRACE
state.shutdown_events = [] state.shutdown_events = []
state.due.add(_SHUTDOWN_TAG) else:
shutdown_event = threading.Event() do_shutdown = False
state.shutdown_events.append(shutdown_event) state.shutdown_events.append(shutdown_event)
if grace is None:
if do_shutdown:
# Run Shutdown Handlers without the lock
for handler in state.shutdown_handlers:
handler(shutdown_handler_grace)
with state.lock:
state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG)
state.stage = _ServerStage.GRACE
state.due.add(_SHUTDOWN_TAG)
if not shutdown_event.wait(timeout=grace):
with state.lock:
state.server.cancel_all_calls() state.server.cancel_all_calls()
# TODO(https://github.com/grpc/grpc/issues/6597): delete this loop. # TODO(https://github.com/grpc/grpc/issues/6597): delete this loop.
for rpc_state in state.rpc_states: for rpc_state in state.rpc_states:
with rpc_state.condition: with rpc_state.condition:
rpc_state.client = _CANCELLED rpc_state.client = _CANCELLED
rpc_state.condition.notify_all() rpc_state.condition.notify_all()
else:
def cancel_all_calls_after_grace(): if grace is None:
shutdown_event.wait(timeout=grace) cancel_all_calls_after_grace()
with state.lock: else:
state.server.cancel_all_calls() threading.Thread(target=cancel_all_calls_after_grace).start()
# TODO(https://github.com/grpc/grpc/issues/6597): delete this loop.
for rpc_state in state.rpc_states:
with rpc_state.condition:
rpc_state.client = _CANCELLED
rpc_state.condition.notify_all()
thread = threading.Thread(target=cancel_all_calls_after_grace)
thread.start()
return shutdown_event
shutdown_event.wait()
return shutdown_event return shutdown_event
@ -719,9 +728,9 @@ def _start(state):
_request_call(state) _request_call(state)
def cleanup_server(timeout): def cleanup_server(timeout):
if timeout is None: if timeout is None:
_stop(state, _UNEXPECTED_EXIT_SERVER_GRACE).wait() _stop(state, state.exit_grace, state.exit_shutdown_handler_grace).wait()
else: else:
_stop(state, timeout).wait() _stop(state, timeout, 0).wait()
thread = _common.CleanupThread( thread = _common.CleanupThread(
cleanup_server, target=_serve, args=(state,)) cleanup_server, target=_serve, args=(state,))
@ -729,12 +738,16 @@ def _start(state):
class Server(grpc.Server): class Server(grpc.Server):
def __init__(self, thread_pool, generic_handlers, options): def __init__(self, thread_pool, generic_handlers, options, exit_grace,
exit_shutdown_handler_grace):
completion_queue = cygrpc.CompletionQueue() completion_queue = cygrpc.CompletionQueue()
server = cygrpc.Server(_common.channel_args(options)) server = cygrpc.Server(_common.channel_args(options))
server.register_completion_queue(completion_queue) server.register_completion_queue(completion_queue)
self._state = _ServerState( self._state = _ServerState(
completion_queue, server, generic_handlers, thread_pool) completion_queue, server, generic_handlers, thread_pool,
_DEFAULT_EXIT_GRACE if exit_grace is None else exit_grace,
_DEFAULT_EXIT_SHUTDOWN_HANDLER_GRACE if exit_shutdown_handler_grace
is None else exit_shutdown_handler_grace)
def add_generic_rpc_handlers(self, generic_rpc_handlers): def add_generic_rpc_handlers(self, generic_rpc_handlers):
_add_generic_handlers(self._state, generic_rpc_handlers) _add_generic_handlers(self._state, generic_rpc_handlers)
@ -745,11 +758,14 @@ class Server(grpc.Server):
def add_secure_port(self, address, server_credentials): def add_secure_port(self, address, server_credentials):
return _add_secure_port(self._state, _common.encode(address), server_credentials) return _add_secure_port(self._state, _common.encode(address), server_credentials)
def add_shutdown_handler(self, handler):
self._state.shutdown_handlers.append(handler)
def start(self): def start(self):
_start(self._state) _start(self._state)
def stop(self, grace): def stop(self, grace, shutdown_handler_grace=None):
return _stop(self._state, grace) return _stop(self._state, grace, shutdown_handler_grace)
def __del__(self): def __del__(self):
_stop(self._state, None) _stop(self._state, None, None)

@ -27,6 +27,7 @@
"unit._cython.cygrpc_test.TypeSmokeTest", "unit._cython.cygrpc_test.TypeSmokeTest",
"unit._empty_message_test.EmptyMessageTest", "unit._empty_message_test.EmptyMessageTest",
"unit._exit_test.ExitTest", "unit._exit_test.ExitTest",
"unit._exit_test.ShutdownHandlerTest",
"unit._metadata_code_details_test.MetadataCodeDetailsTest", "unit._metadata_code_details_test.MetadataCodeDetailsTest",
"unit._metadata_test.MetadataTest", "unit._metadata_test.MetadataTest",
"unit._rpc_test.RPCTest", "unit._rpc_test.RPCTest",

@ -43,6 +43,8 @@ import threading
import time import time
import unittest import unittest
import grpc
from grpc.framework.foundation import logging_pool
from tests.unit import _exit_scenarios from tests.unit import _exit_scenarios
SCENARIO_FILE = os.path.abspath(os.path.join( SCENARIO_FILE = os.path.abspath(os.path.join(
@ -52,7 +54,7 @@ BASE_COMMAND = [INTERPRETER, SCENARIO_FILE]
BASE_SIGTERM_COMMAND = BASE_COMMAND + ['--wait_for_interrupt'] BASE_SIGTERM_COMMAND = BASE_COMMAND + ['--wait_for_interrupt']
INIT_TIME = 1.0 INIT_TIME = 1.0
SHUTDOWN_GRACE = 5.0
processes = [] processes = []
process_lock = threading.Lock() process_lock = threading.Lock()
@ -182,5 +184,24 @@ class ExitTest(unittest.TestCase):
interrupt_and_wait(process) interrupt_and_wait(process)
class _ShutDownHandler(object):
def __init__(self):
self.seen_handler_grace = None
def shutdown_handler(self, handler_grace):
self.seen_handler_grace = handler_grace
class ShutdownHandlerTest(unittest.TestCase):
def test_shutdown_handler(self):
server = grpc.server(logging_pool.pool(1))
handler = _ShutDownHandler()
server.add_shutdown_handler(handler.shutdown_handler)
server.start()
server.stop(0, shutdown_handler_grace=SHUTDOWN_GRACE).wait()
self.assertEqual(SHUTDOWN_GRACE, handler.seen_handler_grace)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main(verbosity=2) unittest.main(verbosity=2)

Loading…
Cancel
Save