Add gRPC Python client-side interceptor machinery

pull/13722/head
Mehrdad Afshari 7 years ago
parent 68e1978e9e
commit 108500f194
  1. 200
      src/python/grpcio/grpc/__init__.py
  2. 280
      src/python/grpcio/grpc/_interceptor.py
  3. 9
      src/python/grpcio_tests/tests/unit/_api_test.py

@ -342,6 +342,170 @@ class Call(six.with_metaclass(abc.ABCMeta, RpcContext)):
raise NotImplementedError()
############## Invocation-Side Interceptor Interfaces & Classes ##############
class ClientCallDetails(six.with_metaclass(abc.ABCMeta)):
"""Describes an RPC to be invoked.
This is an EXPERIMENTAL API.
Attributes:
method: The method name of the RPC.
timeout: An optional duration of time in seconds to allow for the RPC.
metadata: Optional :term:`metadata` to be transmitted to
the service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
"""
class UnaryUnaryClientInterceptor(six.with_metaclass(abc.ABCMeta)):
"""Affords intercepting unary-unary invocations.
This is an EXPERIMENTAL API.
"""
@abc.abstractmethod
def intercept_unary_unary(self, continuation, client_call_details, request):
"""Intercepts a unary-unary invocation asynchronously.
Args:
continuation: A function that proceeds with the invocation by
executing the next interceptor in chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
`response_future = continuation(client_call_details, request)`
to continue with the RPC. `continuation` returns an object that is
both a Call for the RPC and a Future. In the event of RPC
completion, the return Call-Future's result value will be
the response message of the RPC. Should the event terminate
with non-OK status, the returned Call-Future's exception value
will be an RpcError.
client_call_details: A ClientCallDetails object describing the
outgoing RPC.
request: The request value for the RPC.
Returns:
An object that is both a Call for the RPC and a Future.
In the event of RPC completion, the return Call-Future's
result value will be the response message of the RPC.
Should the event terminate with non-OK status, the returned
Call-Future's exception value will be an RpcError.
"""
raise NotImplementedError()
class UnaryStreamClientInterceptor(six.with_metaclass(abc.ABCMeta)):
"""Affords intercepting unary-stream invocations.
This is an EXPERIMENTAL API.
"""
@abc.abstractmethod
def intercept_unary_stream(self, continuation, client_call_details,
request):
"""Intercepts a unary-stream invocation.
Args:
continuation: A function that proceeds with the invocation by
executing the next interceptor in chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
`response_iterator = continuation(client_call_details, request)`
to continue with the RPC. `continuation` returns an object that is
both a Call for the RPC and an iterator for response values.
Drawing response values from the returned Call-iterator may
raise RpcError indicating termination of the RPC with non-OK
status.
client_call_details: A ClientCallDetails object describing the
outgoing RPC.
request: The request value for the RPC.
Returns:
An object that is both a Call for the RPC and an iterator of
response values. Drawing response values from the returned
Call-iterator may raise RpcError indicating termination of
the RPC with non-OK status.
"""
raise NotImplementedError()
class StreamUnaryClientInterceptor(six.with_metaclass(abc.ABCMeta)):
"""Affords intercepting stream-unary invocations.
This is an EXPERIMENTAL API.
"""
@abc.abstractmethod
def intercept_stream_unary(self, continuation, client_call_details,
request_iterator):
"""Intercepts a stream-unary invocation asynchronously.
Args:
continuation: A function that proceeds with the invocation by
executing the next interceptor in chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
`response_future = continuation(client_call_details,
request_iterator)`
to continue with the RPC. `continuation` returns an object that is
both a Call for the RPC and a Future. In the event of RPC completion,
the return Call-Future's result value will be the response message
of the RPC. Should the event terminate with non-OK status, the
returned Call-Future's exception value will be an RpcError.
client_call_details: A ClientCallDetails object describing the
outgoing RPC.
request_iterator: An iterator that yields request values for the RPC.
Returns:
An object that is both a Call for the RPC and a Future.
In the event of RPC completion, the return Call-Future's
result value will be the response message of the RPC.
Should the event terminate with non-OK status, the returned
Call-Future's exception value will be an RpcError.
"""
raise NotImplementedError()
class StreamStreamClientInterceptor(six.with_metaclass(abc.ABCMeta)):
"""Affords intercepting stream-stream invocations.
This is an EXPERIMENTAL API.
"""
@abc.abstractmethod
def intercept_stream_stream(self, continuation, client_call_details,
request_iterator):
"""Intercepts a stream-stream invocation.
continuation: A function that proceeds with the invocation by
executing the next interceptor in chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
`response_iterator = continuation(client_call_details,
request_iterator)`
to continue with the RPC. `continuation` returns an object that is
both a Call for the RPC and an iterator for response values.
Drawing response values from the returned Call-iterator may
raise RpcError indicating termination of the RPC with non-OK
status.
client_call_details: A ClientCallDetails object describing the
outgoing RPC.
request_iterator: An iterator that yields request values for the RPC.
Returns:
An object that is both a Call for the RPC and an iterator of
response values. Drawing response values from the returned
Call-iterator may raise RpcError indicating termination of
the RPC with non-OK status.
"""
raise NotImplementedError()
############ Authentication & Authorization Interfaces & Classes #############
@ -1404,6 +1568,34 @@ def secure_channel(target, credentials, options=None):
credentials._credentials)
def intercept_channel(channel, *interceptors):
"""Intercepts a channel through a set of interceptors.
This is an EXPERIMENTAL API.
Args:
channel: A Channel.
interceptors: Zero or more objects of type
UnaryUnaryClientInterceptor,
UnaryStreamClientInterceptor,
StreamUnaryClientInterceptor, or
StreamStreamClientInterceptor.
Interceptors are given control in the order they are listed.
Returns:
A Channel that intercepts each invocation via the provided interceptors.
Raises:
TypeError: If interceptor does not derive from any of
UnaryUnaryClientInterceptor,
UnaryStreamClientInterceptor,
StreamUnaryClientInterceptor, or
StreamStreamClientInterceptor.
"""
from grpc import _interceptor # pylint: disable=cyclic-import
return _interceptor.intercept_channel(channel, *interceptors)
def server(thread_pool,
handlers=None,
interceptors=None,
@ -1442,10 +1634,12 @@ __all__ = (
'FutureTimeoutError', 'FutureCancelledError', 'Future',
'ChannelConnectivity', 'StatusCode', 'RpcError', 'RpcContext', 'Call',
'ChannelCredentials', 'CallCredentials', 'AuthMetadataContext',
'AuthMetadataPluginCallback', 'AuthMetadataPlugin',
'AuthMetadataPluginCallback', 'AuthMetadataPlugin', 'ClientCallDetails',
'ServerCertificateConfiguration', 'ServerCredentials',
'UnaryUnaryMultiCallable', 'UnaryStreamMultiCallable',
'StreamUnaryMultiCallable', 'StreamStreamMultiCallable', 'Channel',
'StreamUnaryMultiCallable', 'StreamStreamMultiCallable',
'UnaryUnaryClientInterceptor', 'UnaryStreamClientInterceptor',
'StreamUnaryClientInterceptor', 'StreamStreamClientInterceptor', 'Channel',
'ServicerContext', 'RpcMethodHandler', 'HandlerCallDetails',
'GenericRpcHandler', 'ServiceRpcHandler', 'Server', 'ServerInterceptor',
'unary_unary_rpc_method_handler', 'unary_stream_rpc_method_handler',
@ -1455,7 +1649,7 @@ __all__ = (
'composite_call_credentials', 'composite_channel_credentials',
'ssl_server_credentials', 'ssl_server_certificate_configuration',
'dynamic_ssl_server_credentials', 'channel_ready_future',
'insecure_channel', 'secure_channel', 'server',)
'insecure_channel', 'secure_channel', 'intercept_channel', 'server',)
############################### Extension Shims ################################

@ -13,6 +13,11 @@
# limitations under the License.
"""Implementation of gRPC Python interceptors."""
import collections
import sys
import grpc
class _ServicePipeline(object):
@ -36,3 +41,278 @@ class _ServicePipeline(object):
def service_pipeline(interceptors):
return _ServicePipeline(interceptors) if interceptors else None
class _ClientCallDetails(
collections.namedtuple('_ClientCallDetails',
('method', 'timeout', 'metadata',
'credentials')), grpc.ClientCallDetails):
pass
class _LocalFailure(grpc.RpcError, grpc.Future, grpc.Call):
def __init__(self, exception, traceback):
super(_LocalFailure, self).__init__()
self._exception = exception
self._traceback = traceback
def initial_metadata(self):
return None
def trailing_metadata(self):
return None
def code(self):
return grpc.StatusCode.INTERNAL
def details(self):
return 'Exception raised while intercepting the RPC'
def cancel(self):
return False
def cancelled(self):
return False
def running(self):
return False
def done(self):
return True
def result(self, ignored_timeout=None):
raise self._exception
def exception(self, ignored_timeout=None):
return self._exception
def traceback(self, ignored_timeout=None):
return self._traceback
def add_done_callback(self, fn):
fn(self)
def __iter__(self):
return self
def next(self):
raise self._exception
class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
def __init__(self, thunk, method, interceptor):
self._thunk = thunk
self._method = method
self._interceptor = interceptor
def __call__(self, request, timeout=None, metadata=None, credentials=None):
call_future = self.future(
request,
timeout=timeout,
metadata=metadata,
credentials=credentials)
return call_future.result()
def with_call(self, request, timeout=None, metadata=None, credentials=None):
call_future = self.future(
request,
timeout=timeout,
metadata=metadata,
credentials=credentials)
return call_future.result(), call_future
def future(self, request, timeout=None, metadata=None, credentials=None):
def continuation(client_call_details, request):
return self._thunk(client_call_details.method).future(
request,
timeout=client_call_details.timeout,
metadata=client_call_details.metadata,
credentials=client_call_details.credentials)
client_call_details = _ClientCallDetails(self._method, timeout,
metadata, credentials)
try:
return self._interceptor.intercept_unary_unary(
continuation, client_call_details, request)
except Exception as exception: # pylint:disable=broad-except
return _LocalFailure(exception, sys.exc_info()[2])
class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
def __init__(self, thunk, method, interceptor):
self._thunk = thunk
self._method = method
self._interceptor = interceptor
def __call__(self, request, timeout=None, metadata=None, credentials=None):
def continuation(client_call_details, request):
return self._thunk(client_call_details.method)(
request,
timeout=client_call_details.timeout,
metadata=client_call_details.metadata,
credentials=client_call_details.credentials)
client_call_details = _ClientCallDetails(self._method, timeout,
metadata, credentials)
try:
return self._interceptor.intercept_unary_stream(
continuation, client_call_details, request)
except Exception as exception: # pylint:disable=broad-except
return _LocalFailure(exception, sys.exc_info()[2])
class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
def __init__(self, thunk, method, interceptor):
self._thunk = thunk
self._method = method
self._interceptor = interceptor
def __call__(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
call_future = self.future(
request_iterator,
timeout=timeout,
metadata=metadata,
credentials=credentials)
return call_future.result()
def with_call(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
call_future = self.future(
request_iterator,
timeout=timeout,
metadata=metadata,
credentials=credentials)
return call_future.result(), call_future
def future(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
def continuation(client_call_details, request_iterator):
return self._thunk(client_call_details.method).future(
request_iterator,
timeout=client_call_details.timeout,
metadata=client_call_details.metadata,
credentials=client_call_details.credentials)
client_call_details = _ClientCallDetails(self._method, timeout,
metadata, credentials)
try:
return self._interceptor.intercept_stream_unary(
continuation, client_call_details, request_iterator)
except Exception as exception: # pylint:disable=broad-except
return _LocalFailure(exception, sys.exc_info()[2])
class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
def __init__(self, thunk, method, interceptor):
self._thunk = thunk
self._method = method
self._interceptor = interceptor
def __call__(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
def continuation(client_call_details, request_iterator):
return self._thunk(client_call_details.method)(
request_iterator,
timeout=client_call_details.timeout,
metadata=client_call_details.metadata,
credentials=client_call_details.credentials)
client_call_details = _ClientCallDetails(self._method, timeout,
metadata, credentials)
try:
return self._interceptor.intercept_stream_stream(
continuation, client_call_details, request_iterator)
except Exception as exception: # pylint:disable=broad-except
return _LocalFailure(exception, sys.exc_info()[2])
class _Channel(grpc.Channel):
def __init__(self, channel, interceptor):
self._channel = channel
self._interceptor = interceptor
def subscribe(self, *args, **kwargs):
self._channel.subscribe(*args, **kwargs)
def unsubscribe(self, *args, **kwargs):
self._channel.unsubscribe(*args, **kwargs)
def unary_unary(self,
method,
request_serializer=None,
response_deserializer=None):
thunk = lambda m: self._channel.unary_unary(m, request_serializer, response_deserializer)
if isinstance(self._interceptor, grpc.UnaryUnaryClientInterceptor):
return _UnaryUnaryMultiCallable(thunk, method, self._interceptor)
else:
return thunk(method)
def unary_stream(self,
method,
request_serializer=None,
response_deserializer=None):
thunk = lambda m: self._channel.unary_stream(m, request_serializer, response_deserializer)
if isinstance(self._interceptor, grpc.UnaryStreamClientInterceptor):
return _UnaryStreamMultiCallable(thunk, method, self._interceptor)
else:
return thunk(method)
def stream_unary(self,
method,
request_serializer=None,
response_deserializer=None):
thunk = lambda m: self._channel.stream_unary(m, request_serializer, response_deserializer)
if isinstance(self._interceptor, grpc.StreamUnaryClientInterceptor):
return _StreamUnaryMultiCallable(thunk, method, self._interceptor)
else:
return thunk(method)
def stream_stream(self,
method,
request_serializer=None,
response_deserializer=None):
thunk = lambda m: self._channel.stream_stream(m, request_serializer, response_deserializer)
if isinstance(self._interceptor, grpc.StreamStreamClientInterceptor):
return _StreamStreamMultiCallable(thunk, method, self._interceptor)
else:
return thunk(method)
def intercept_channel(channel, *interceptors):
for interceptor in reversed(list(interceptors)):
if not isinstance(interceptor, grpc.UnaryUnaryClientInterceptor) and \
not isinstance(interceptor, grpc.UnaryStreamClientInterceptor) and \
not isinstance(interceptor, grpc.StreamUnaryClientInterceptor) and \
not isinstance(interceptor, grpc.StreamStreamClientInterceptor):
raise TypeError('interceptor must be '
'grpc.UnaryUnaryClientInterceptor or '
'grpc.UnaryStreamClientInterceptor or '
'grpc.StreamUnaryClientInterceptor or '
'grpc.StreamStreamClientInterceptor or ')
channel = _Channel(channel, interceptor)
return channel

@ -33,18 +33,21 @@ class AllTest(unittest.TestCase):
'AuthMetadataPlugin', 'ServerCertificateConfiguration',
'ServerCredentials', 'UnaryUnaryMultiCallable',
'UnaryStreamMultiCallable', 'StreamUnaryMultiCallable',
'StreamStreamMultiCallable', 'Channel', 'ServicerContext',
'StreamStreamMultiCallable', 'UnaryUnaryClientInterceptor',
'UnaryStreamClientInterceptor', 'StreamUnaryClientInterceptor',
'StreamStreamClientInterceptor', 'Channel', 'ServicerContext',
'RpcMethodHandler', 'HandlerCallDetails', 'GenericRpcHandler',
'ServiceRpcHandler', 'Server', 'ServerInterceptor',
'unary_unary_rpc_method_handler', 'unary_stream_rpc_method_handler',
'stream_unary_rpc_method_handler',
'stream_unary_rpc_method_handler', 'ClientCallDetails',
'stream_stream_rpc_method_handler',
'method_handlers_generic_handler', 'ssl_channel_credentials',
'metadata_call_credentials', 'access_token_call_credentials',
'composite_call_credentials', 'composite_channel_credentials',
'ssl_server_credentials', 'ssl_server_certificate_configuration',
'dynamic_ssl_server_credentials', 'channel_ready_future',
'insecure_channel', 'secure_channel', 'server',)
'insecure_channel', 'secure_channel', 'intercept_channel',
'server',)
six.assertCountEqual(self, expected_grpc_code_elements,
_from_grpc_import_star.GRPC_ELEMENTS)

Loading…
Cancel
Save