From f2059ee6bebcd65b6a927f517efef359c14f0067 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 21 Oct 2020 20:35:18 -0700 Subject: [PATCH 1/3] Implement grpc.Future interface in SingleThreadedRendezvous --- src/python/grpcio/grpc/_channel.py | 62 ++++++++++++++++++- .../tests/unit/_interceptor_test.py | 4 +- 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 37071909c8c..f33fc8570c4 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -442,7 +442,7 @@ class _Rendezvous(grpc.RpcError, grpc.RpcContext): self._state.condition.notify_all() -class _SingleThreadedRendezvous(_Rendezvous, grpc.Call): # pylint: disable=too-many-ancestors +class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors """An RPC iterator operating entirely on a single thread. The __next__ method of _SingleThreadedRendezvous does not depend on the @@ -451,6 +451,66 @@ class _SingleThreadedRendezvous(_Rendezvous, grpc.Call): # pylint: disable=too- class cannot fulfill the grpc.Future interface. """ + def _is_complete(self): + return self._state.code is not None + + + def cancelled(self): + with self._state.condition: + return self._state.cancelled + + def running(self): + with self._state.condition: + return self._state.code is None + + def done(self): + with self._state.condition: + return self._state.code is not None + + def result(self, timeout=None): + with self._state.condition: + if not self._is_complete(): + raise Exception("_SingleThreadedRendezvous only supports result() when the RPC is complete.") + if self._state.code is grpc.StatusCode.OK: + return self._state.response + elif self._state.cancelled: + raise grpc.FutureCancelledError() + else: + raise self + + def exception(self, timeout=None): + with self._state.condition: + if not self._is_complete(): + raise Exception("_SingleThreadedRendezvous only supports exception() when the RPC is complete.") + if self._state.code is grpc.StatusCode.OK: + return None + elif self._state.cancelled: + raise grpc.FutureCancelledError() + else: + return self + + def traceback(self, timeout=None): + with self._state.condition: + if not self._is_complete(): + raise Exception("_SingleThreadedRendezvous only supports traceback() when the RPC is complete.") + if self._state.code is grpc.StatusCode.OK: + return None + elif self._state.cancelled: + raise grpc.FutureCancelledError() + else: + try: + raise self + except grpc.RpcError: + return sys.exc_info()[2] + + def add_done_callback(self, fn): + with self._state.condition: + if self._state.code is None: + self._state.callbacks.append(functools.partial(fn, self)) + return + + fn(self) + def initial_metadata(self): """See grpc.Call.initial_metadata""" with self._state.condition: diff --git a/src/python/grpcio_tests/tests/unit/_interceptor_test.py b/src/python/grpcio_tests/tests/unit/_interceptor_test.py index d5a217b6fcd..54823d5b463 100644 --- a/src/python/grpcio_tests/tests/unit/_interceptor_test.py +++ b/src/python/grpcio_tests/tests/unit/_interceptor_test.py @@ -549,8 +549,8 @@ class InterceptorTest(unittest.TestCase): # NOTE: The single-threaded unary-stream path does not support the # grpc.Future interface, so this test does not apply. - @unittest.skipIf(os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM"), - "Not supported.") + # @unittest.skipIf(os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM"), + # "Not supported.") def testInterceptedUnaryRequestStreamResponseWithError(self): request = _EXCEPTION_REQUEST From 928e3b2fa57a30131c7a1de894162c8a52596dff Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 21 Oct 2020 21:55:02 -0700 Subject: [PATCH 2/3] Spruce up docstrings --- src/python/grpcio/grpc/_channel.py | 38 ++++++++++++++++--- .../tests/unit/_interceptor_test.py | 4 -- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index f33fc8570c4..9f434e8e724 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -448,13 +448,17 @@ class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: The __next__ method of _SingleThreadedRendezvous does not depend on the existence of any other thread, including the "channel spin thread". However, this means that its interface is entirely synchronous. So this - class cannot fulfill the grpc.Future interface. + class cannot completely fulfill the grpc.Future interface. The result, + exception, and traceback methods will never block and will instead raise + an exception if calling the method would result in blocking. + + This means that these methods are safe to call from add_done_callback + handlers. """ def _is_complete(self): return self._state.code is not None - def cancelled(self): with self._state.condition: return self._state.cancelled @@ -468,9 +472,17 @@ class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: return self._state.code is not None def result(self, timeout=None): + """Returns the result of the computation or raises its exception. + + This method will never block. Instead, it will raise an exception + if calling this method would otherwise result in blocking. + """ + del timeout with self._state.condition: if not self._is_complete(): - raise Exception("_SingleThreadedRendezvous only supports result() when the RPC is complete.") + raise grpc.experimental.UsageError( + "_SingleThreadedRendezvous only supports result() when the RPC is complete." + ) if self._state.code is grpc.StatusCode.OK: return self._state.response elif self._state.cancelled: @@ -479,9 +491,17 @@ class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: raise self def exception(self, timeout=None): + """Return the exception raised by the computation. + + This method will never block. Instead, it will raise an exception + if calling this method would otherwise result in blocking. + """ + del timeout with self._state.condition: if not self._is_complete(): - raise Exception("_SingleThreadedRendezvous only supports exception() when the RPC is complete.") + raise grpc.experimental.UsageError( + "_SingleThreadedRendezvous only supports exception() when the RPC is complete." + ) if self._state.code is grpc.StatusCode.OK: return None elif self._state.cancelled: @@ -490,9 +510,17 @@ class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: return self def traceback(self, timeout=None): + """Access the traceback of the exception raised by the computation. + + This method will never block. Instead, it will raise an exception + if calling this method would otherwise result in blocking. + """ + del timeout with self._state.condition: if not self._is_complete(): - raise Exception("_SingleThreadedRendezvous only supports traceback() when the RPC is complete.") + raise grpc.experimental.UsageError( + "_SingleThreadedRendezvous only supports traceback() when the RPC is complete." + ) if self._state.code is grpc.StatusCode.OK: return None elif self._state.cancelled: diff --git a/src/python/grpcio_tests/tests/unit/_interceptor_test.py b/src/python/grpcio_tests/tests/unit/_interceptor_test.py index 54823d5b463..619db7b3ffd 100644 --- a/src/python/grpcio_tests/tests/unit/_interceptor_test.py +++ b/src/python/grpcio_tests/tests/unit/_interceptor_test.py @@ -547,10 +547,6 @@ class InterceptorTest(unittest.TestCase): 's1:intercept_service', 's2:intercept_service' ]) - # NOTE: The single-threaded unary-stream path does not support the - # grpc.Future interface, so this test does not apply. - # @unittest.skipIf(os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM"), - # "Not supported.") def testInterceptedUnaryRequestStreamResponseWithError(self): request = _EXCEPTION_REQUEST From 9b24faf188cc6d450cf4629f35ac0b38ee74fc90 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Fri, 23 Oct 2020 16:01:19 -0700 Subject: [PATCH 3/3] Add comment about timeout argument --- src/python/grpcio/grpc/_channel.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 9f434e8e724..11921d78838 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -476,6 +476,9 @@ class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: This method will never block. Instead, it will raise an exception if calling this method would otherwise result in blocking. + + Since this method will never block, any `timeout` argument passed will + be ignored. """ del timeout with self._state.condition: @@ -495,6 +498,9 @@ class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: This method will never block. Instead, it will raise an exception if calling this method would otherwise result in blocking. + + Since this method will never block, any `timeout` argument passed will + be ignored. """ del timeout with self._state.condition: @@ -514,6 +520,9 @@ class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: This method will never block. Instead, it will raise an exception if calling this method would otherwise result in blocking. + + Since this method will never block, any `timeout` argument passed will + be ignored. """ del timeout with self._state.condition: