From ac34f559bbaebf178b0c47f1514564bbb3574bf7 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Sat, 20 Feb 2021 16:12:35 -0800 Subject: [PATCH] Fix Signal Safety Issue (#25394) * Decrease flake rate * Spruce up comments --- src/python/grpcio/grpc/_channel.py | 43 +++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 34c475067c7..b98f3002f4a 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -93,9 +93,16 @@ def _unknown_code_details(unknown_cygrpc_code, details): class _RPCState(object): def __init__(self, due, initial_metadata, trailing_metadata, code, details): + # `condition` guards all members of _RPCState. `notify_all` is called on + # `condition` when the state of the RPC has changed. self.condition = threading.Condition() + # The cygrpc.OperationType objects representing events due from the RPC's - # completion queue. + # completion queue. If an operation is in `due`, it is guaranteed that + # `operate()` has been called on a corresponding operation. But the + # converse is not true. That is, in the case of failed `operate()` + # calls, there may briefly be events in `due` that do not correspond to + # operations submitted to Core. self.due = set(due) self.initial_metadata = initial_metadata self.response = None @@ -103,6 +110,7 @@ class _RPCState(object): self.code = code self.details = details self.debug_error_string = None + # The semantics of grpc.Future.cancel and grpc.Future.cancelled are # slightly wonky, so they have to be tracked separately from the rest of the # result of the RPC. This field tracks whether cancellation was requested @@ -220,12 +228,12 @@ def _consume_request_iterator(request_iterator, state, call, request_serializer, _abort(state, code, details) return else: + state.due.add(cygrpc.OperationType.send_message) operations = (cygrpc.SendMessageOperation( serialized_request, _EMPTY_FLAGS),) operating = call.operate(operations, event_handler) - if operating: - state.due.add(cygrpc.OperationType.send_message) - else: + if not operating: + state.due.remove(cygrpc.OperationType.send_message) return def _done(): @@ -244,11 +252,13 @@ def _consume_request_iterator(request_iterator, state, call, request_serializer, return with state.condition: if state.code is None: + state.due.add(cygrpc.OperationType.send_close_from_client) operations = ( cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),) operating = call.operate(operations, event_handler) - if operating: - state.due.add(cygrpc.OperationType.send_close_from_client) + if not operating: + state.due.remove( + cygrpc.OperationType.send_close_from_client) consumption_thread = cygrpc.ForkManagedThread( target=consume_request_iterator) @@ -609,10 +619,22 @@ class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: def _next(self): with self._state.condition: if self._state.code is None: + # We tentatively add the operation as expected and remove + # it if the enqueue operation fails. This allows us to guarantee that + # if an event has been submitted to the core completion queue, + # it is in `due`. If we waited until after a successful + # enqueue operation then a signal could interrupt this + # thread between the enqueue operation and the addition of the + # operation to `due`. This would cause an exception on the + # channel spin thread when the operation completes and no + # corresponding operation would be present in state.due. + # Note that, since `condition` is held through this block, there is + # no data race on `due`. + self._state.due.add(cygrpc.OperationType.receive_message) operating = self._call.operate( (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None) - if operating: - self._state.due.add(cygrpc.OperationType.receive_message) + if not operating: + self._state.due.remove(cygrpc.OperationType.receive_message) elif self._state.code is grpc.StatusCode.OK: raise StopIteration() else: @@ -775,11 +797,12 @@ class _MultiThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: if self._state.code is None: event_handler = _event_handler(self._state, self._response_deserializer) + self._state.due.add(cygrpc.OperationType.receive_message) operating = self._call.operate( (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), event_handler) - if operating: - self._state.due.add(cygrpc.OperationType.receive_message) + if not operating: + self._state.due.remove(cygrpc.OperationType.receive_message) elif self._state.code is grpc.StatusCode.OK: raise StopIteration() else: