|
|
|
@ -60,7 +60,8 @@ _CANCELLED = 'cancelled' |
|
|
|
|
_EMPTY_FLAGS = 0 |
|
|
|
|
_EMPTY_METADATA = cygrpc.Metadata(()) |
|
|
|
|
|
|
|
|
|
_UNEXPECTED_EXIT_SERVER_GRACE = 1.0 |
|
|
|
|
_DEFAULT_EXIT_GRACE = 1.0 |
|
|
|
|
_DEFAULT_EXIT_SHUTDOWN_HANDLER_GRACE = 5.0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _serialized_request(request_event): |
|
|
|
@ -595,14 +596,18 @@ class _ServerStage(enum.Enum): |
|
|
|
|
|
|
|
|
|
class _ServerState(object): |
|
|
|
|
|
|
|
|
|
def __init__(self, completion_queue, server, generic_handlers, thread_pool): |
|
|
|
|
def __init__(self, completion_queue, server, generic_handlers, thread_pool, |
|
|
|
|
exit_grace, exit_shutdown_handler_grace): |
|
|
|
|
self.lock = threading.Lock() |
|
|
|
|
self.completion_queue = completion_queue |
|
|
|
|
self.server = server |
|
|
|
|
self.generic_handlers = list(generic_handlers) |
|
|
|
|
self.thread_pool = thread_pool |
|
|
|
|
self.exit_grace = exit_grace |
|
|
|
|
self.exit_shutdown_handler_grace = exit_shutdown_handler_grace |
|
|
|
|
self.stage = _ServerStage.STOPPED |
|
|
|
|
self.shutdown_events = None |
|
|
|
|
self.shutdown_handlers = [] |
|
|
|
|
|
|
|
|
|
# TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields. |
|
|
|
|
self.rpc_states = set() |
|
|
|
@ -672,41 +677,45 @@ def _serve(state): |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _stop(state, grace): |
|
|
|
|
with state.lock: |
|
|
|
|
if state.stage is _ServerStage.STOPPED: |
|
|
|
|
shutdown_event = threading.Event() |
|
|
|
|
shutdown_event.set() |
|
|
|
|
return shutdown_event |
|
|
|
|
else: |
|
|
|
|
if state.stage is _ServerStage.STARTED: |
|
|
|
|
state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG) |
|
|
|
|
def _stop(state, grace, shutdown_handler_grace): |
|
|
|
|
shutdown_event = threading.Event() |
|
|
|
|
|
|
|
|
|
def cancel_all_calls_after_grace(): |
|
|
|
|
with state.lock: |
|
|
|
|
if state.stage is _ServerStage.STOPPED: |
|
|
|
|
shutdown_event.set() |
|
|
|
|
return |
|
|
|
|
elif state.stage is _ServerStage.STARTED: |
|
|
|
|
do_shutdown = True |
|
|
|
|
state.stage = _ServerStage.GRACE |
|
|
|
|
state.shutdown_events = [] |
|
|
|
|
state.due.add(_SHUTDOWN_TAG) |
|
|
|
|
shutdown_event = threading.Event() |
|
|
|
|
else: |
|
|
|
|
do_shutdown = False |
|
|
|
|
state.shutdown_events.append(shutdown_event) |
|
|
|
|
if grace is None: |
|
|
|
|
|
|
|
|
|
if do_shutdown: |
|
|
|
|
# Run Shutdown Handlers without the lock |
|
|
|
|
for handler in state.shutdown_handlers: |
|
|
|
|
handler(shutdown_handler_grace) |
|
|
|
|
with state.lock: |
|
|
|
|
state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG) |
|
|
|
|
state.stage = _ServerStage.GRACE |
|
|
|
|
state.due.add(_SHUTDOWN_TAG) |
|
|
|
|
|
|
|
|
|
if not shutdown_event.wait(timeout=grace): |
|
|
|
|
with state.lock: |
|
|
|
|
state.server.cancel_all_calls() |
|
|
|
|
# TODO(https://github.com/grpc/grpc/issues/6597): delete this loop. |
|
|
|
|
for rpc_state in state.rpc_states: |
|
|
|
|
with rpc_state.condition: |
|
|
|
|
rpc_state.client = _CANCELLED |
|
|
|
|
rpc_state.condition.notify_all() |
|
|
|
|
else: |
|
|
|
|
def cancel_all_calls_after_grace(): |
|
|
|
|
shutdown_event.wait(timeout=grace) |
|
|
|
|
with state.lock: |
|
|
|
|
state.server.cancel_all_calls() |
|
|
|
|
# TODO(https://github.com/grpc/grpc/issues/6597): delete this loop. |
|
|
|
|
for rpc_state in state.rpc_states: |
|
|
|
|
with rpc_state.condition: |
|
|
|
|
rpc_state.client = _CANCELLED |
|
|
|
|
rpc_state.condition.notify_all() |
|
|
|
|
thread = threading.Thread(target=cancel_all_calls_after_grace) |
|
|
|
|
thread.start() |
|
|
|
|
return shutdown_event |
|
|
|
|
shutdown_event.wait() |
|
|
|
|
|
|
|
|
|
if grace is None: |
|
|
|
|
cancel_all_calls_after_grace() |
|
|
|
|
else: |
|
|
|
|
threading.Thread(target=cancel_all_calls_after_grace).start() |
|
|
|
|
|
|
|
|
|
return shutdown_event |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -719,9 +728,9 @@ def _start(state): |
|
|
|
|
_request_call(state) |
|
|
|
|
def cleanup_server(timeout): |
|
|
|
|
if timeout is None: |
|
|
|
|
_stop(state, _UNEXPECTED_EXIT_SERVER_GRACE).wait() |
|
|
|
|
_stop(state, state.exit_grace, state.exit_shutdown_handler_grace).wait() |
|
|
|
|
else: |
|
|
|
|
_stop(state, timeout).wait() |
|
|
|
|
_stop(state, timeout, 0).wait() |
|
|
|
|
|
|
|
|
|
thread = _common.CleanupThread( |
|
|
|
|
cleanup_server, target=_serve, args=(state,)) |
|
|
|
@ -729,12 +738,16 @@ def _start(state): |
|
|
|
|
|
|
|
|
|
class Server(grpc.Server): |
|
|
|
|
|
|
|
|
|
def __init__(self, thread_pool, generic_handlers, options): |
|
|
|
|
def __init__(self, thread_pool, generic_handlers, options, exit_grace, |
|
|
|
|
exit_shutdown_handler_grace): |
|
|
|
|
completion_queue = cygrpc.CompletionQueue() |
|
|
|
|
server = cygrpc.Server(_common.channel_args(options)) |
|
|
|
|
server.register_completion_queue(completion_queue) |
|
|
|
|
self._state = _ServerState( |
|
|
|
|
completion_queue, server, generic_handlers, thread_pool) |
|
|
|
|
completion_queue, server, generic_handlers, thread_pool, |
|
|
|
|
_DEFAULT_EXIT_GRACE if exit_grace is None else exit_grace, |
|
|
|
|
_DEFAULT_EXIT_SHUTDOWN_HANDLER_GRACE if exit_shutdown_handler_grace |
|
|
|
|
is None else exit_shutdown_handler_grace) |
|
|
|
|
|
|
|
|
|
def add_generic_rpc_handlers(self, generic_rpc_handlers): |
|
|
|
|
_add_generic_handlers(self._state, generic_rpc_handlers) |
|
|
|
@ -745,11 +758,14 @@ class Server(grpc.Server): |
|
|
|
|
def add_secure_port(self, address, server_credentials): |
|
|
|
|
return _add_secure_port(self._state, _common.encode(address), server_credentials) |
|
|
|
|
|
|
|
|
|
def add_shutdown_handler(self, handler): |
|
|
|
|
self._state.shutdown_handlers.append(handler) |
|
|
|
|
|
|
|
|
|
def start(self): |
|
|
|
|
_start(self._state) |
|
|
|
|
|
|
|
|
|
def stop(self, grace): |
|
|
|
|
return _stop(self._state, grace) |
|
|
|
|
def stop(self, grace, shutdown_handler_grace=None): |
|
|
|
|
return _stop(self._state, grace, shutdown_handler_grace) |
|
|
|
|
|
|
|
|
|
def __del__(self): |
|
|
|
|
_stop(self._state, None) |
|
|
|
|
_stop(self._state, None, None) |
|
|
|
|