Fixes race condition in Python server shutdown

When we set the call state to "CANCELLED" after
grpc_cancel_all_calls, we would block other start batch
operations from happening. The rpc_state for the cancelled
call would still be in the server's rpc_states set, but it
would never get removed because there were no active batches
for the call, and the only place we remove from rpc_states is
when a batch completes.

It is better to rely on c-core's cancellation. Once a call
is cancelled, all subsequent ops on that call will return
immediately with a cancellation error.

The RLock() change is due to the possibility that
_on_call_completed
gets invoked immediately when the call has already completed when the
rpc_future callback is created.
pull/13786/head
Ken Payson 7 years ago
parent 04fb1c0b31
commit 8179b9523a
  1. 12
      src/python/grpcio/grpc/_server.py

@ -634,7 +634,7 @@ class _ServerState(object):
# pylint: disable=too-many-arguments # pylint: disable=too-many-arguments
def __init__(self, completion_queue, server, generic_handlers, def __init__(self, completion_queue, server, generic_handlers,
interceptor_pipeline, thread_pool, maximum_concurrent_rpcs): interceptor_pipeline, thread_pool, maximum_concurrent_rpcs):
self.lock = threading.Lock() self.lock = threading.RLock()
self.completion_queue = completion_queue self.completion_queue = completion_queue
self.server = server self.server = server
self.generic_handlers = list(generic_handlers) self.generic_handlers = list(generic_handlers)
@ -747,22 +747,12 @@ def _stop(state, grace):
state.shutdown_events.append(shutdown_event) state.shutdown_events.append(shutdown_event)
if grace is None: if grace is None:
state.server.cancel_all_calls() state.server.cancel_all_calls()
# TODO(https://github.com/grpc/grpc/issues/6597): delete this loop.
for rpc_state in state.rpc_states:
with rpc_state.condition:
rpc_state.client = _CANCELLED
rpc_state.condition.notify_all()
else: else:
def cancel_all_calls_after_grace(): def cancel_all_calls_after_grace():
shutdown_event.wait(timeout=grace) shutdown_event.wait(timeout=grace)
with state.lock: with state.lock:
state.server.cancel_all_calls() state.server.cancel_all_calls()
# TODO(https://github.com/grpc/grpc/issues/6597): delete this loop.
for rpc_state in state.rpc_states:
with rpc_state.condition:
rpc_state.client = _CANCELLED
rpc_state.condition.notify_all()
thread = threading.Thread(target=cancel_all_calls_after_grace) thread = threading.Thread(target=cancel_all_calls_after_grace)
thread.start() thread.start()

Loading…
Cancel
Save