Dedupe between the two rendezvous

pull/20753/head
Richard Belleville 5 years ago
parent d51fa803b9
commit 5782733da0
  1. 180
      src/python/grpcio/grpc/_channel.py

@ -248,18 +248,15 @@ def _consume_request_iterator(request_iterator, state, call, request_serializer,
consumption_thread.start()
# TODO: Docstrings.
class _SingleThreadedRendezvous(grpc.RpcError, grpc.Call): # pylint: disable=too-many-ancestors
# TODO: Docstring.
def __init__(self, state, call, response_deserializer, deadline):
super(_SingleThreadedRendezvous, self).__init__()
# TODO: Is this still needed? Or is it just for inter-thread
# synchronization?
self._state = state
self._call = call
self._response_deserializer = response_deserializer
self._deadline = deadline
# TODO: Dedupe between here and the default Rendezvous.
def is_active(self):
"""See grpc.RpcContext.is_active"""
with self._state.condition:
@ -282,6 +279,7 @@ class _SingleThreadedRendezvous(grpc.RpcError, grpc.Call): # pylint: disable=to
self._call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details)
self._state.cancelled = True
_abort(self._state, code, details)
self._state.condition.notify_all()
return True
else:
return False
@ -336,8 +334,6 @@ class _SingleThreadedRendezvous(grpc.RpcError, grpc.Call): # pylint: disable=to
return _common.decode(self._state.details)
def _next(self):
# TODO(rbellevi): This conditional block is very similar to the one
# below. Dedupe.
with self._state.condition:
if self._state.code is None:
operating = self._call.operate((cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None)
@ -378,66 +374,49 @@ class _SingleThreadedRendezvous(grpc.RpcError, grpc.Call): # pylint: disable=to
def __iter__(self):
return self
def exception(self, timeout=None):
"""Return the exception raised by the computation.
See grpc.Future.exception for the full API contract.
"""
def debug_error_string(self):
with self._state.condition:
timed_out = _common.wait(
self._state.condition.wait, self._is_complete, timeout=timeout)
if timed_out:
raise grpc.FutureTimeoutError()
else:
if self._state.code is grpc.StatusCode.OK:
return None
elif self._state.cancelled:
raise grpc.FutureCancelledError()
else:
return self
def traceback(self, timeout=None):
"""Access the traceback of the exception raised by the computation.
def _done():
return self._state.debug_error_string is not None
See grpc.future.traceback for the full API contract.
"""
_common.wait(self._state.condition.wait, _done)
return _common.decode(self._state.debug_error_string)
def _repr(self):
with self._state.condition:
timed_out = _common.wait(
self._state.condition.wait, self._is_complete, timeout=timeout)
if timed_out:
raise grpc.FutureTimeoutError()
if self._state.code is None:
return '<{} object of in-flight RPC>'.format(self.__class__.__name__)
elif self._state.code is grpc.StatusCode.OK:
return _OK_RENDEZVOUS_REPR_FORMAT.format(
self._state.code, self._state.details)
else:
if self._state.code is grpc.StatusCode.OK:
return None
elif self._state.cancelled:
raise grpc.FutureCancelledError()
else:
try:
raise self
except grpc.RpcError:
return sys.exc_info()[2]
return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
self._state.code, self._state.details,
self._state.debug_error_string)
class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): # pylint: disable=too-many-ancestors
def __repr__(self):
return self._repr()
def __init__(self, state, call, response_deserializer, deadline):
super(_Rendezvous, self).__init__()
self._state = state
self._call = call
self._response_deserializer = response_deserializer
self._deadline = deadline
def __str__(self):
return self._repr()
def cancel(self):
def __del__(self):
with self._state.condition:
if self._state.code is None:
code = grpc.StatusCode.CANCELLED
details = 'Locally cancelled by application!'
self._call.cancel(
_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details)
self._state.code = grpc.StatusCode.CANCELLED
self._state.details = 'Cancelled upon garbage collection!'
self._state.cancelled = True
_abort(self._state, code, details)
self._call.cancel(
_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
self._state.details)
self._state.condition.notify_all()
return False
class _Rendezvous(_SingleThreadedRendezvous, grpc.Future): # pylint: disable=too-many-ancestors
def __init__(self, state, call, response_deserializer, deadline):
super(_Rendezvous, self).__init__(state, call, response_deserializer, deadline)
def cancelled(self):
with self._state.condition:
@ -551,25 +530,6 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): # pylint: disable=too
elif self._state.code is not None:
raise self
def __iter__(self):
return self
def __next__(self):
return self._next()
def next(self):
return self._next()
def is_active(self):
with self._state.condition:
return self._state.code is None
def time_remaining(self):
if self._deadline is None:
return None
else:
return max(self._deadline - time.time(), 0)
def add_callback(self, callback):
with self._state.condition:
if self._state.callbacks is None:
@ -578,80 +538,6 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): # pylint: disable=too
self._state.callbacks.append(callback)
return True
def initial_metadata(self):
with self._state.condition:
def _done():
return self._state.initial_metadata is not None
_common.wait(self._state.condition.wait, _done)
return self._state.initial_metadata
def trailing_metadata(self):
with self._state.condition:
def _done():
return self._state.trailing_metadata is not None
_common.wait(self._state.condition.wait, _done)
return self._state.trailing_metadata
def code(self):
with self._state.condition:
def _done():
return self._state.code is not None
_common.wait(self._state.condition.wait, _done)
return self._state.code
def details(self):
with self._state.condition:
def _done():
return self._state.details is not None
_common.wait(self._state.condition.wait, _done)
return _common.decode(self._state.details)
def debug_error_string(self):
with self._state.condition:
def _done():
return self._state.debug_error_string is not None
_common.wait(self._state.condition.wait, _done)
return _common.decode(self._state.debug_error_string)
def _repr(self):
with self._state.condition:
if self._state.code is None:
return '<_Rendezvous object of in-flight RPC>'
elif self._state.code is grpc.StatusCode.OK:
return _OK_RENDEZVOUS_REPR_FORMAT.format(
self._state.code, self._state.details)
else:
return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
self._state.code, self._state.details,
self._state.debug_error_string)
def __repr__(self):
return self._repr()
def __str__(self):
return self._repr()
def __del__(self):
with self._state.condition:
if self._state.code is None:
self._state.code = grpc.StatusCode.CANCELLED
self._state.details = 'Cancelled upon garbage collection!'
self._state.cancelled = True
self._call.cancel(
_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
self._state.details)
self._state.condition.notify_all()
# TODO: Audit usages of this. The logic is weird. Why not just raise the
# exception right here?

Loading…
Cancel
Save