diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 37071909c8c..11921d78838 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -442,15 +442,112 @@ class _Rendezvous(grpc.RpcError, grpc.RpcContext): self._state.condition.notify_all() -class _SingleThreadedRendezvous(_Rendezvous, grpc.Call): # pylint: disable=too-many-ancestors +class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors """An RPC iterator operating entirely on a single thread. The __next__ method of _SingleThreadedRendezvous does not depend on the existence of any other thread, including the "channel spin thread". However, this means that its interface is entirely synchronous. So this - class cannot fulfill the grpc.Future interface. + class cannot completely fulfill the grpc.Future interface. The result, + exception, and traceback methods will never block and will instead raise + an exception if calling the method would result in blocking. + + This means that these methods are safe to call from add_done_callback + handlers. """ + def _is_complete(self): + return self._state.code is not None + + def cancelled(self): + with self._state.condition: + return self._state.cancelled + + def running(self): + with self._state.condition: + return self._state.code is None + + def done(self): + with self._state.condition: + return self._state.code is not None + + def result(self, timeout=None): + """Returns the result of the computation or raises its exception. + + This method will never block. Instead, it will raise an exception + if calling this method would otherwise result in blocking. + + Since this method will never block, any `timeout` argument passed will + be ignored. + """ + del timeout + with self._state.condition: + if not self._is_complete(): + raise grpc.experimental.UsageError( + "_SingleThreadedRendezvous only supports result() when the RPC is complete." + ) + if self._state.code is grpc.StatusCode.OK: + return self._state.response + elif self._state.cancelled: + raise grpc.FutureCancelledError() + else: + raise self + + def exception(self, timeout=None): + """Return the exception raised by the computation. + + This method will never block. Instead, it will raise an exception + if calling this method would otherwise result in blocking. + + Since this method will never block, any `timeout` argument passed will + be ignored. + """ + del timeout + with self._state.condition: + if not self._is_complete(): + raise grpc.experimental.UsageError( + "_SingleThreadedRendezvous only supports exception() when the RPC is complete." + ) + if self._state.code is grpc.StatusCode.OK: + return None + elif self._state.cancelled: + raise grpc.FutureCancelledError() + else: + return self + + def traceback(self, timeout=None): + """Access the traceback of the exception raised by the computation. + + This method will never block. Instead, it will raise an exception + if calling this method would otherwise result in blocking. + + Since this method will never block, any `timeout` argument passed will + be ignored. + """ + del timeout + with self._state.condition: + if not self._is_complete(): + raise grpc.experimental.UsageError( + "_SingleThreadedRendezvous only supports traceback() when the RPC is complete." + ) + if self._state.code is grpc.StatusCode.OK: + return None + elif self._state.cancelled: + raise grpc.FutureCancelledError() + else: + try: + raise self + except grpc.RpcError: + return sys.exc_info()[2] + + def add_done_callback(self, fn): + with self._state.condition: + if self._state.code is None: + self._state.callbacks.append(functools.partial(fn, self)) + return + + fn(self) + def initial_metadata(self): """See grpc.Call.initial_metadata""" with self._state.condition: diff --git a/src/python/grpcio_tests/tests/unit/_interceptor_test.py b/src/python/grpcio_tests/tests/unit/_interceptor_test.py index d5a217b6fcd..619db7b3ffd 100644 --- a/src/python/grpcio_tests/tests/unit/_interceptor_test.py +++ b/src/python/grpcio_tests/tests/unit/_interceptor_test.py @@ -547,10 +547,6 @@ class InterceptorTest(unittest.TestCase): 's1:intercept_service', 's2:intercept_service' ]) - # NOTE: The single-threaded unary-stream path does not support the - # grpc.Future interface, so this test does not apply. - @unittest.skipIf(os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM"), - "Not supported.") def testInterceptedUnaryRequestStreamResponseWithError(self): request = _EXCEPTION_REQUEST