From 32919791c74e5592a7c2f3f4e9c528992b8aa32a Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Thu, 8 Mar 2018 07:50:26 +0100 Subject: [PATCH 1/6] Optimize blocking intercepted unary-unary calls Change the blocking unary-unary call code path to rely on the underlying synchronous API, as opposed to calling the Future-based underlying async API and invoking `.result()` on the returned Future object immediately, which can be resource-intensive. --- src/python/grpcio/grpc/_interceptor.py | 80 +++++++++++++++++++++++--- 1 file changed, 72 insertions(+), 8 deletions(-) diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py index 6b7a912a941..f11f55cd4cf 100644 --- a/src/python/grpcio/grpc/_interceptor.py +++ b/src/python/grpcio/grpc/_interceptor.py @@ -134,6 +134,58 @@ class _LocalFailure(grpc.RpcError, grpc.Future, grpc.Call): raise self._exception +class _UnaryOutcome(grpc.Call, grpc.Future): + + def __init__(self, response, call): + self._response = response + self._call = call + + def initial_metadata(self): + return self._call.initial_metadata() + + def trailing_metadata(self): + return self._call.trailing_metadata() + + def code(self): + return self._call.code() + + def details(self): + return self._call.details() + + def is_active(self): + return self._call.is_active() + + def time_remaining(self): + return self._call.time_remaining() + + def cancel(self): + return self._call.cancel() + + def add_callback(self, callback): + return self._call.add_callback(callback) + + def cancelled(self): + return False + + def running(self): + return False + + def done(self): + return True + + def result(self, ignored_timeout=None): + return self._response + + def exception(self, ignored_timeout=None): + return None + + def traceback(self, ignored_timeout=None): + return None + + def add_done_callback(self, fn): + fn(self) + + class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): def __init__(self, thunk, method, interceptor): @@ -142,23 +194,35 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): self._interceptor = interceptor def __call__(self, request, timeout=None, metadata=None, credentials=None): - call_future = self.future( + response, ignored_call = self.with_call( request, timeout=timeout, metadata=metadata, credentials=credentials) - return call_future.result() + return response def with_call(self, request, timeout=None, metadata=None, credentials=None): - call_future = self.future( - request, - timeout=timeout, - metadata=metadata, - credentials=credentials) + client_call_details = _ClientCallDetails(self._method, timeout, + metadata, credentials) + + def continuation(new_details, request): + new_method, new_timeout, new_metadata, new_credentials = ( + _unwrap_client_call_details(new_details, client_call_details)) + try: + response, call = self._thunk(new_method).with_call( + request, + timeout=new_timeout, + metadata=new_metadata, + credentials=new_credentials) + return _UnaryOutcome(response, call) + except Exception as exception: # pylint:disable=broad-except + return _LocalFailure(exception, sys.exc_info()[2]) + + call_future = self._interceptor.intercept_unary_unary( + continuation, client_call_details, request) return call_future.result(), call_future def future(self, request, timeout=None, metadata=None, credentials=None): - client_call_details = _ClientCallDetails(self._method, timeout, metadata, credentials) From e2ebd89a5f8784505a69943570757b21e0642875 Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Thu, 8 Mar 2018 12:31:14 +0100 Subject: [PATCH 2/6] Optimize blocking intercepted stream-unary calls Change the blocking stream-unary call code path to rely on the underlying synchronous API, as opposed to calling the Future-based underlying async API and invoking `.result()` on the returned Future object immediately, which can be resource-intensive. --- src/python/grpcio/grpc/_interceptor.py | 33 ++++++++++++++++++-------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py index f11f55cd4cf..36611fe5012 100644 --- a/src/python/grpcio/grpc/_interceptor.py +++ b/src/python/grpcio/grpc/_interceptor.py @@ -218,9 +218,9 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): except Exception as exception: # pylint:disable=broad-except return _LocalFailure(exception, sys.exc_info()[2]) - call_future = self._interceptor.intercept_unary_unary( + call = self._interceptor.intercept_unary_unary( continuation, client_call_details, request) - return call_future.result(), call_future + return call.result(), call def future(self, request, timeout=None, metadata=None, credentials=None): client_call_details = _ClientCallDetails(self._method, timeout, @@ -281,24 +281,37 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): timeout=None, metadata=None, credentials=None): - call_future = self.future( + response, ignored_call = self.with_call( request_iterator, timeout=timeout, metadata=metadata, credentials=credentials) - return call_future.result() + return response def with_call(self, request_iterator, timeout=None, metadata=None, credentials=None): - call_future = self.future( - request_iterator, - timeout=timeout, - metadata=metadata, - credentials=credentials) - return call_future.result(), call_future + client_call_details = _ClientCallDetails(self._method, timeout, + metadata, credentials) + + def continuation(new_details, request_iterator): + new_method, new_timeout, new_metadata, new_credentials = ( + _unwrap_client_call_details(new_details, client_call_details)) + try: + response, call = self._thunk(new_method).with_call( + request_iterator, + timeout=new_timeout, + metadata=new_metadata, + credentials=new_credentials) + return _UnaryOutcome(response, call) + except Exception as exception: # pylint:disable=broad-except + return _LocalFailure(exception, sys.exc_info()[2]) + + call = self._interceptor.intercept_stream_unary( + continuation, client_call_details, request_iterator) + return call.result(), call def future(self, request_iterator, From e9cbad592ed50511569fbc1c52b232341bbe1bd9 Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Thu, 8 Mar 2018 12:32:07 +0100 Subject: [PATCH 3/6] Refactor: rename _LocalFailure to _FailureOutcome --- src/python/grpcio/grpc/_interceptor.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py index 36611fe5012..0980a4c6529 100644 --- a/src/python/grpcio/grpc/_interceptor.py +++ b/src/python/grpcio/grpc/_interceptor.py @@ -75,10 +75,10 @@ def _unwrap_client_call_details(call_details, default_details): return method, timeout, metadata, credentials -class _LocalFailure(grpc.RpcError, grpc.Future, grpc.Call): +class _FailureOutcome(grpc.RpcError, grpc.Future, grpc.Call): def __init__(self, exception, traceback): - super(_LocalFailure, self).__init__() + super(_FailureOutcome, self).__init__() self._exception = exception self._traceback = traceback @@ -216,7 +216,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): credentials=new_credentials) return _UnaryOutcome(response, call) except Exception as exception: # pylint:disable=broad-except - return _LocalFailure(exception, sys.exc_info()[2]) + return _FailureOutcome(exception, sys.exc_info()[2]) call = self._interceptor.intercept_unary_unary( continuation, client_call_details, request) @@ -239,7 +239,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): return self._interceptor.intercept_unary_unary( continuation, client_call_details, request) except Exception as exception: # pylint:disable=broad-except - return _LocalFailure(exception, sys.exc_info()[2]) + return _FailureOutcome(exception, sys.exc_info()[2]) class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): @@ -266,7 +266,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): return self._interceptor.intercept_unary_stream( continuation, client_call_details, request) except Exception as exception: # pylint:disable=broad-except - return _LocalFailure(exception, sys.exc_info()[2]) + return _FailureOutcome(exception, sys.exc_info()[2]) class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): @@ -307,7 +307,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): credentials=new_credentials) return _UnaryOutcome(response, call) except Exception as exception: # pylint:disable=broad-except - return _LocalFailure(exception, sys.exc_info()[2]) + return _FailureOutcome(exception, sys.exc_info()[2]) call = self._interceptor.intercept_stream_unary( continuation, client_call_details, request_iterator) @@ -334,7 +334,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): return self._interceptor.intercept_stream_unary( continuation, client_call_details, request_iterator) except Exception as exception: # pylint:disable=broad-except - return _LocalFailure(exception, sys.exc_info()[2]) + return _FailureOutcome(exception, sys.exc_info()[2]) class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): @@ -365,7 +365,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): return self._interceptor.intercept_stream_stream( continuation, client_call_details, request_iterator) except Exception as exception: # pylint:disable=broad-except - return _LocalFailure(exception, sys.exc_info()[2]) + return _FailureOutcome(exception, sys.exc_info()[2]) class _Channel(grpc.Channel): From a33b8075076eb2a542212effa54d45c1d9c3b9ba Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Wed, 13 Jun 2018 16:32:00 -0700 Subject: [PATCH 4/6] Re-raise grpc.RpcError instead of eating it --- src/python/grpcio/grpc/_interceptor.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py index 0980a4c6529..64203c9d27c 100644 --- a/src/python/grpcio/grpc/_interceptor.py +++ b/src/python/grpcio/grpc/_interceptor.py @@ -215,6 +215,8 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): metadata=new_metadata, credentials=new_credentials) return _UnaryOutcome(response, call) + except grpc.RpcError: + raise except Exception as exception: # pylint:disable=broad-except return _FailureOutcome(exception, sys.exc_info()[2]) @@ -306,6 +308,8 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): metadata=new_metadata, credentials=new_credentials) return _UnaryOutcome(response, call) + except grpc.RpcError: + raise except Exception as exception: # pylint:disable=broad-except return _FailureOutcome(exception, sys.exc_info()[2]) From 56142a5dbf5fea060781c7ad36b0410cc0d6ffa7 Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Thu, 14 Jun 2018 13:17:00 -0700 Subject: [PATCH 5/6] Refactor: avoid calling with_call API Avoid calling the public self.with_call API internally and opt for sharing the method body in a separate private method and calling that instead. --- src/python/grpcio/grpc/_interceptor.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py index 64203c9d27c..30c98add108 100644 --- a/src/python/grpcio/grpc/_interceptor.py +++ b/src/python/grpcio/grpc/_interceptor.py @@ -194,7 +194,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): self._interceptor = interceptor def __call__(self, request, timeout=None, metadata=None, credentials=None): - response, ignored_call = self.with_call( + response, ignored_call = self._with_call( request, timeout=timeout, metadata=metadata, @@ -202,6 +202,14 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): return response def with_call(self, request, timeout=None, metadata=None, credentials=None): + return self._with_call( + request, + timeout=timeout, + metadata=metadata, + credentials=credentials) + + def _with_call(self, request, timeout=None, metadata=None, + credentials=None): client_call_details = _ClientCallDetails(self._method, timeout, metadata, credentials) @@ -283,7 +291,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): timeout=None, metadata=None, credentials=None): - response, ignored_call = self.with_call( + response, ignored_call = self._with_call( request_iterator, timeout=timeout, metadata=metadata, @@ -295,6 +303,17 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): timeout=None, metadata=None, credentials=None): + return self._with_call( + request_iterator, + timeout=timeout, + metadata=metadata, + credentials=credentials) + + def _with_call(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): client_call_details = _ClientCallDetails(self._method, timeout, metadata, credentials) From b291f186be78dc8bd8c31b301fb8795a106b3ff4 Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Thu, 14 Jun 2018 13:57:50 -0700 Subject: [PATCH 6/6] Refactor: reorder --- src/python/grpcio/grpc/_interceptor.py | 36 +++++++++++++------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py index 30c98add108..1d2d374ad19 100644 --- a/src/python/grpcio/grpc/_interceptor.py +++ b/src/python/grpcio/grpc/_interceptor.py @@ -201,13 +201,6 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): credentials=credentials) return response - def with_call(self, request, timeout=None, metadata=None, credentials=None): - return self._with_call( - request, - timeout=timeout, - metadata=metadata, - credentials=credentials) - def _with_call(self, request, timeout=None, metadata=None, credentials=None): client_call_details = _ClientCallDetails(self._method, timeout, @@ -232,6 +225,13 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): continuation, client_call_details, request) return call.result(), call + def with_call(self, request, timeout=None, metadata=None, credentials=None): + return self._with_call( + request, + timeout=timeout, + metadata=metadata, + credentials=credentials) + def future(self, request, timeout=None, metadata=None, credentials=None): client_call_details = _ClientCallDetails(self._method, timeout, metadata, credentials) @@ -298,17 +298,6 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): credentials=credentials) return response - def with_call(self, - request_iterator, - timeout=None, - metadata=None, - credentials=None): - return self._with_call( - request_iterator, - timeout=timeout, - metadata=metadata, - credentials=credentials) - def _with_call(self, request_iterator, timeout=None, @@ -336,6 +325,17 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): continuation, client_call_details, request_iterator) return call.result(), call + def with_call(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + return self._with_call( + request_iterator, + timeout=timeout, + metadata=metadata, + credentials=credentials) + def future(self, request_iterator, timeout=None,