diff --git a/examples/python/interceptors/default_value/default_value_client_interceptor.py b/examples/python/interceptors/default_value/default_value_client_interceptor.py new file mode 100644 index 00000000000..c549f2b8613 --- /dev/null +++ b/examples/python/interceptors/default_value/default_value_client_interceptor.py @@ -0,0 +1,68 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Interceptor that adds headers to outgoing requests.""" + +import collections + +import grpc + + +class _ConcreteValue(grpc.Future): + + def __init__(self, result): + self._result = result + + def cancel(self): + return False + + def cancelled(self): + return False + + def running(self): + return False + + def done(self): + return True + + def result(self, timeout=None): + return self._result + + def exception(self, timeout=None): + return None + + def traceback(self, timeout=None): + return None + + def add_done_callback(self, fn): + fn(self._result) + + +class DefaultValueClientInterceptor(grpc.UnaryUnaryClientInterceptor, + grpc.StreamUnaryClientInterceptor): + + def __init__(self, value): + self._default = _ConcreteValue(value) + + def _intercept_call(self, continuation, client_call_details, + request_or_iterator): + response = continuation(client_call_details, request_or_iterator) + return self._default if response.exception() else response + + def intercept_unary_unary(self, continuation, client_call_details, request): + return self._intercept_call(continuation, client_call_details, request) + + def intercept_stream_unary(self, continuation, client_call_details, + request_iterator): + return self._intercept_call(continuation, client_call_details, + request_iterator) diff --git a/examples/python/interceptors/default_value/greeter_client.py b/examples/python/interceptors/default_value/greeter_client.py new file mode 100644 index 00000000000..aba7571d831 --- /dev/null +++ b/examples/python/interceptors/default_value/greeter_client.py @@ -0,0 +1,38 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""The Python implementation of the gRPC helloworld.Greeter client.""" + +from __future__ import print_function + +import grpc + +import helloworld_pb2 +import helloworld_pb2_grpc +import default_value_client_interceptor + + +def run(): + default_value = helloworld_pb2.HelloReply( + message='Hello from your local interceptor!') + default_value_interceptor = default_value_client_interceptor.DefaultValueClientInterceptor( + default_value) + channel = grpc.insecure_channel('localhost:50051') + channel = grpc.intercept_channel(channel, default_value_interceptor) + stub = helloworld_pb2_grpc.GreeterStub(channel) + response = stub.SayHello(helloworld_pb2.HelloRequest(name='you')) + print("Greeter client received: " + response.message) + + +if __name__ == '__main__': + run() diff --git a/examples/python/interceptors/default_value/helloworld_pb2.py b/examples/python/interceptors/default_value/helloworld_pb2.py new file mode 100644 index 00000000000..e18ab9acc7a --- /dev/null +++ b/examples/python/interceptors/default_value/helloworld_pb2.py @@ -0,0 +1,134 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: helloworld.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='helloworld.proto', + package='helloworld', + syntax='proto3', + serialized_pb=_b('\n\x10helloworld.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2I\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x42\x36\n\x1bio.grpc.examples.helloworldB\x0fHelloWorldProtoP\x01\xa2\x02\x03HLWb\x06proto3') +) + + + + +_HELLOREQUEST = _descriptor.Descriptor( + name='HelloRequest', + full_name='helloworld.HelloRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='helloworld.HelloRequest.name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=32, + serialized_end=60, +) + + +_HELLOREPLY = _descriptor.Descriptor( + name='HelloReply', + full_name='helloworld.HelloReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='message', full_name='helloworld.HelloReply.message', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=62, + serialized_end=91, +) + +DESCRIPTOR.message_types_by_name['HelloRequest'] = _HELLOREQUEST +DESCRIPTOR.message_types_by_name['HelloReply'] = _HELLOREPLY +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +HelloRequest = _reflection.GeneratedProtocolMessageType('HelloRequest', (_message.Message,), dict( + DESCRIPTOR = _HELLOREQUEST, + __module__ = 'helloworld_pb2' + # @@protoc_insertion_point(class_scope:helloworld.HelloRequest) + )) +_sym_db.RegisterMessage(HelloRequest) + +HelloReply = _reflection.GeneratedProtocolMessageType('HelloReply', (_message.Message,), dict( + DESCRIPTOR = _HELLOREPLY, + __module__ = 'helloworld_pb2' + # @@protoc_insertion_point(class_scope:helloworld.HelloReply) + )) +_sym_db.RegisterMessage(HelloReply) + + +DESCRIPTOR.has_options = True +DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n\033io.grpc.examples.helloworldB\017HelloWorldProtoP\001\242\002\003HLW')) + +_GREETER = _descriptor.ServiceDescriptor( + name='Greeter', + full_name='helloworld.Greeter', + file=DESCRIPTOR, + index=0, + options=None, + serialized_start=93, + serialized_end=166, + methods=[ + _descriptor.MethodDescriptor( + name='SayHello', + full_name='helloworld.Greeter.SayHello', + index=0, + containing_service=None, + input_type=_HELLOREQUEST, + output_type=_HELLOREPLY, + options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_GREETER) + +DESCRIPTOR.services_by_name['Greeter'] = _GREETER + +# @@protoc_insertion_point(module_scope) diff --git a/examples/python/interceptors/default_value/helloworld_pb2_grpc.py b/examples/python/interceptors/default_value/helloworld_pb2_grpc.py new file mode 100644 index 00000000000..18e07d16797 --- /dev/null +++ b/examples/python/interceptors/default_value/helloworld_pb2_grpc.py @@ -0,0 +1,46 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +import helloworld_pb2 as helloworld__pb2 + + +class GreeterStub(object): + """The greeting service definition. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SayHello = channel.unary_unary( + '/helloworld.Greeter/SayHello', + request_serializer=helloworld__pb2.HelloRequest.SerializeToString, + response_deserializer=helloworld__pb2.HelloReply.FromString, + ) + + +class GreeterServicer(object): + """The greeting service definition. + """ + + def SayHello(self, request, context): + """Sends a greeting + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_GreeterServicer_to_server(servicer, server): + rpc_method_handlers = { + 'SayHello': grpc.unary_unary_rpc_method_handler( + servicer.SayHello, + request_deserializer=helloworld__pb2.HelloRequest.FromString, + response_serializer=helloworld__pb2.HelloReply.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'helloworld.Greeter', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) diff --git a/examples/python/interceptors/headers/generic_client_interceptor.py b/examples/python/interceptors/headers/generic_client_interceptor.py new file mode 100644 index 00000000000..30b0755aaf9 --- /dev/null +++ b/examples/python/interceptors/headers/generic_client_interceptor.py @@ -0,0 +1,55 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Base class for interceptors that operate on all RPC types.""" + +import grpc + + +class _GenericClientInterceptor( + grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor, + grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor): + + def __init__(self, interceptor_function): + self._fn = interceptor_function + + def intercept_unary_unary(self, continuation, client_call_details, request): + new_details, new_request_iterator, postprocess = self._fn( + client_call_details, iter((request,)), False, False) + response = continuation(new_details, next(new_request_iterator)) + return postprocess(response) if postprocess else response + + def intercept_unary_stream(self, continuation, client_call_details, + request): + new_details, new_request_iterator, postprocess = self._fn( + client_call_details, iter((request,)), False, True) + response_it = continuation(new_details, new_request_iterator) + return postprocess(response_it) if postprocess else response_it + + def intercept_stream_unary(self, continuation, client_call_details, + request_iterator): + new_details, new_request_iterator, postprocess = self._fn( + client_call_details, request_iterator, True, False) + response = continuation(new_details, next(new_request_iterator)) + return postprocess(response) if postprocess else response + + def intercept_stream_stream(self, continuation, client_call_details, + request_iterator): + new_details, new_request_iterator, postprocess = self._fn( + client_call_details, request_iterator, True, True) + response_it = continuation(new_details, new_request_iterator) + return postprocess(response_it) if postprocess else response_it + + +def create(intercept_call): + return _GenericClientInterceptor(intercept_call) diff --git a/examples/python/interceptors/headers/greeter_client.py b/examples/python/interceptors/headers/greeter_client.py new file mode 100644 index 00000000000..2b0dd3e1778 --- /dev/null +++ b/examples/python/interceptors/headers/greeter_client.py @@ -0,0 +1,36 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""The Python implementation of the GRPC helloworld.Greeter client.""" + +from __future__ import print_function + +import grpc + +import helloworld_pb2 +import helloworld_pb2_grpc +import header_manipulator_client_interceptor + + +def run(): + header_adder_interceptor = header_manipulator_client_interceptor.header_adder_interceptor( + 'one-time-password', '42') + channel = grpc.insecure_channel('localhost:50051') + channel = grpc.intercept_channel(channel, header_adder_interceptor) + stub = helloworld_pb2_grpc.GreeterStub(channel) + response = stub.SayHello(helloworld_pb2.HelloRequest(name='you')) + print("Greeter client received: " + response.message) + + +if __name__ == '__main__': + run() diff --git a/examples/python/interceptors/headers/greeter_server.py b/examples/python/interceptors/headers/greeter_server.py new file mode 100644 index 00000000000..01968609b53 --- /dev/null +++ b/examples/python/interceptors/headers/greeter_server.py @@ -0,0 +1,52 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""The Python implementation of the GRPC helloworld.Greeter server.""" + +from concurrent import futures +import time + +import grpc + +import helloworld_pb2 +import helloworld_pb2_grpc +from request_header_validator_interceptor import RequestHeaderValidatorInterceptor + +_ONE_DAY_IN_SECONDS = 60 * 60 * 24 + + +class Greeter(helloworld_pb2_grpc.GreeterServicer): + + def SayHello(self, request, context): + return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name) + + +def serve(): + header_validator = RequestHeaderValidatorInterceptor( + 'one-time-password', '42', grpc.StatusCode.UNAUTHENTICATED, + 'Access denied!') + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=10), + interceptors=(header_validator,)) + helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) + server.add_insecure_port('[::]:50051') + server.start() + try: + while True: + time.sleep(_ONE_DAY_IN_SECONDS) + except KeyboardInterrupt: + server.stop(0) + + +if __name__ == '__main__': + serve() diff --git a/examples/python/interceptors/headers/header_manipulator_client_interceptor.py b/examples/python/interceptors/headers/header_manipulator_client_interceptor.py new file mode 100644 index 00000000000..ac7c605144f --- /dev/null +++ b/examples/python/interceptors/headers/header_manipulator_client_interceptor.py @@ -0,0 +1,42 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Interceptor that adds headers to outgoing requests.""" + +import collections + +import grpc +import generic_client_interceptor + + +class _ClientCallDetails( + collections.namedtuple('_ClientCallDetails', + ('method', 'timeout', 'metadata', + 'credentials')), grpc.ClientCallDetails): + pass + + +def header_adder_interceptor(header, value): + + def intercept_call(client_call_details, request_iterator, request_streaming, + response_streaming): + metadata = [] + if client_call_details.metadata is not None: + metadata = list(client_call_details.metadata) + metadata.append((header, value,)) + client_call_details = _ClientCallDetails( + client_call_details.method, client_call_details.timeout, metadata, + client_call_details.credentials) + return client_call_details, request_iterator, None + + return generic_client_interceptor.create(intercept_call) diff --git a/examples/python/interceptors/headers/helloworld_pb2.py b/examples/python/interceptors/headers/helloworld_pb2.py new file mode 100644 index 00000000000..e18ab9acc7a --- /dev/null +++ b/examples/python/interceptors/headers/helloworld_pb2.py @@ -0,0 +1,134 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: helloworld.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='helloworld.proto', + package='helloworld', + syntax='proto3', + serialized_pb=_b('\n\x10helloworld.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2I\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x42\x36\n\x1bio.grpc.examples.helloworldB\x0fHelloWorldProtoP\x01\xa2\x02\x03HLWb\x06proto3') +) + + + + +_HELLOREQUEST = _descriptor.Descriptor( + name='HelloRequest', + full_name='helloworld.HelloRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='helloworld.HelloRequest.name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=32, + serialized_end=60, +) + + +_HELLOREPLY = _descriptor.Descriptor( + name='HelloReply', + full_name='helloworld.HelloReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='message', full_name='helloworld.HelloReply.message', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=62, + serialized_end=91, +) + +DESCRIPTOR.message_types_by_name['HelloRequest'] = _HELLOREQUEST +DESCRIPTOR.message_types_by_name['HelloReply'] = _HELLOREPLY +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +HelloRequest = _reflection.GeneratedProtocolMessageType('HelloRequest', (_message.Message,), dict( + DESCRIPTOR = _HELLOREQUEST, + __module__ = 'helloworld_pb2' + # @@protoc_insertion_point(class_scope:helloworld.HelloRequest) + )) +_sym_db.RegisterMessage(HelloRequest) + +HelloReply = _reflection.GeneratedProtocolMessageType('HelloReply', (_message.Message,), dict( + DESCRIPTOR = _HELLOREPLY, + __module__ = 'helloworld_pb2' + # @@protoc_insertion_point(class_scope:helloworld.HelloReply) + )) +_sym_db.RegisterMessage(HelloReply) + + +DESCRIPTOR.has_options = True +DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n\033io.grpc.examples.helloworldB\017HelloWorldProtoP\001\242\002\003HLW')) + +_GREETER = _descriptor.ServiceDescriptor( + name='Greeter', + full_name='helloworld.Greeter', + file=DESCRIPTOR, + index=0, + options=None, + serialized_start=93, + serialized_end=166, + methods=[ + _descriptor.MethodDescriptor( + name='SayHello', + full_name='helloworld.Greeter.SayHello', + index=0, + containing_service=None, + input_type=_HELLOREQUEST, + output_type=_HELLOREPLY, + options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_GREETER) + +DESCRIPTOR.services_by_name['Greeter'] = _GREETER + +# @@protoc_insertion_point(module_scope) diff --git a/examples/python/interceptors/headers/helloworld_pb2_grpc.py b/examples/python/interceptors/headers/helloworld_pb2_grpc.py new file mode 100644 index 00000000000..18e07d16797 --- /dev/null +++ b/examples/python/interceptors/headers/helloworld_pb2_grpc.py @@ -0,0 +1,46 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +import helloworld_pb2 as helloworld__pb2 + + +class GreeterStub(object): + """The greeting service definition. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SayHello = channel.unary_unary( + '/helloworld.Greeter/SayHello', + request_serializer=helloworld__pb2.HelloRequest.SerializeToString, + response_deserializer=helloworld__pb2.HelloReply.FromString, + ) + + +class GreeterServicer(object): + """The greeting service definition. + """ + + def SayHello(self, request, context): + """Sends a greeting + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_GreeterServicer_to_server(servicer, server): + rpc_method_handlers = { + 'SayHello': grpc.unary_unary_rpc_method_handler( + servicer.SayHello, + request_deserializer=helloworld__pb2.HelloRequest.FromString, + response_serializer=helloworld__pb2.HelloReply.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'helloworld.Greeter', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) diff --git a/examples/python/interceptors/headers/request_header_validator_interceptor.py b/examples/python/interceptors/headers/request_header_validator_interceptor.py new file mode 100644 index 00000000000..95af4177baf --- /dev/null +++ b/examples/python/interceptors/headers/request_header_validator_interceptor.py @@ -0,0 +1,39 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Interceptor that ensures a specific header is present.""" + +import grpc + + +def _unary_unary_rpc_terminator(code, details): + + def terminate(ignored_request, context): + context.abort(code, details) + + return grpc.unary_unary_rpc_method_handler(terminate) + + +class RequestHeaderValidatorInterceptor(grpc.ServerInterceptor): + + def __init__(self, header, value, code, details): + self._header = header + self._value = value + self._terminator = _unary_unary_rpc_terminator(code, details) + + def intercept_service(self, continuation, handler_call_details): + if (self._header, + self._value) in handler_call_details.invocation_metadata: + return continuation(handler_call_details) + else: + return self._terminator diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 0f6bdfc1c2d..8b913ac9495 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -342,6 +342,170 @@ class Call(six.with_metaclass(abc.ABCMeta, RpcContext)): raise NotImplementedError() +############## Invocation-Side Interceptor Interfaces & Classes ############## + + +class ClientCallDetails(six.with_metaclass(abc.ABCMeta)): + """Describes an RPC to be invoked. + + This is an EXPERIMENTAL API. + + Attributes: + method: The method name of the RPC. + timeout: An optional duration of time in seconds to allow for the RPC. + metadata: Optional :term:`metadata` to be transmitted to + the service-side of the RPC. + credentials: An optional CallCredentials for the RPC. + """ + + +class UnaryUnaryClientInterceptor(six.with_metaclass(abc.ABCMeta)): + """Affords intercepting unary-unary invocations. + + This is an EXPERIMENTAL API. + """ + + @abc.abstractmethod + def intercept_unary_unary(self, continuation, client_call_details, request): + """Intercepts a unary-unary invocation asynchronously. + + Args: + continuation: A function that proceeds with the invocation by + executing the next interceptor in chain or invoking the + actual RPC on the underlying Channel. It is the interceptor's + responsibility to call it if it decides to move the RPC forward. + The interceptor can use + `response_future = continuation(client_call_details, request)` + to continue with the RPC. `continuation` returns an object that is + both a Call for the RPC and a Future. In the event of RPC + completion, the return Call-Future's result value will be + the response message of the RPC. Should the event terminate + with non-OK status, the returned Call-Future's exception value + will be an RpcError. + client_call_details: A ClientCallDetails object describing the + outgoing RPC. + request: The request value for the RPC. + + Returns: + An object that is both a Call for the RPC and a Future. + In the event of RPC completion, the return Call-Future's + result value will be the response message of the RPC. + Should the event terminate with non-OK status, the returned + Call-Future's exception value will be an RpcError. + """ + raise NotImplementedError() + + +class UnaryStreamClientInterceptor(six.with_metaclass(abc.ABCMeta)): + """Affords intercepting unary-stream invocations. + + This is an EXPERIMENTAL API. + """ + + @abc.abstractmethod + def intercept_unary_stream(self, continuation, client_call_details, + request): + """Intercepts a unary-stream invocation. + + Args: + continuation: A function that proceeds with the invocation by + executing the next interceptor in chain or invoking the + actual RPC on the underlying Channel. It is the interceptor's + responsibility to call it if it decides to move the RPC forward. + The interceptor can use + `response_iterator = continuation(client_call_details, request)` + to continue with the RPC. `continuation` returns an object that is + both a Call for the RPC and an iterator for response values. + Drawing response values from the returned Call-iterator may + raise RpcError indicating termination of the RPC with non-OK + status. + client_call_details: A ClientCallDetails object describing the + outgoing RPC. + request: The request value for the RPC. + + Returns: + An object that is both a Call for the RPC and an iterator of + response values. Drawing response values from the returned + Call-iterator may raise RpcError indicating termination of + the RPC with non-OK status. + """ + raise NotImplementedError() + + +class StreamUnaryClientInterceptor(six.with_metaclass(abc.ABCMeta)): + """Affords intercepting stream-unary invocations. + + This is an EXPERIMENTAL API. + """ + + @abc.abstractmethod + def intercept_stream_unary(self, continuation, client_call_details, + request_iterator): + """Intercepts a stream-unary invocation asynchronously. + + Args: + continuation: A function that proceeds with the invocation by + executing the next interceptor in chain or invoking the + actual RPC on the underlying Channel. It is the interceptor's + responsibility to call it if it decides to move the RPC forward. + The interceptor can use + `response_future = continuation(client_call_details, + request_iterator)` + to continue with the RPC. `continuation` returns an object that is + both a Call for the RPC and a Future. In the event of RPC completion, + the return Call-Future's result value will be the response message + of the RPC. Should the event terminate with non-OK status, the + returned Call-Future's exception value will be an RpcError. + client_call_details: A ClientCallDetails object describing the + outgoing RPC. + request_iterator: An iterator that yields request values for the RPC. + + Returns: + An object that is both a Call for the RPC and a Future. + In the event of RPC completion, the return Call-Future's + result value will be the response message of the RPC. + Should the event terminate with non-OK status, the returned + Call-Future's exception value will be an RpcError. + """ + raise NotImplementedError() + + +class StreamStreamClientInterceptor(six.with_metaclass(abc.ABCMeta)): + """Affords intercepting stream-stream invocations. + + This is an EXPERIMENTAL API. + """ + + @abc.abstractmethod + def intercept_stream_stream(self, continuation, client_call_details, + request_iterator): + """Intercepts a stream-stream invocation. + + continuation: A function that proceeds with the invocation by + executing the next interceptor in chain or invoking the + actual RPC on the underlying Channel. It is the interceptor's + responsibility to call it if it decides to move the RPC forward. + The interceptor can use + `response_iterator = continuation(client_call_details, + request_iterator)` + to continue with the RPC. `continuation` returns an object that is + both a Call for the RPC and an iterator for response values. + Drawing response values from the returned Call-iterator may + raise RpcError indicating termination of the RPC with non-OK + status. + client_call_details: A ClientCallDetails object describing the + outgoing RPC. + request_iterator: An iterator that yields request values for the RPC. + + Returns: + An object that is both a Call for the RPC and an iterator of + response values. Drawing response values from the returned + Call-iterator may raise RpcError indicating termination of + the RPC with non-OK status. + """ + raise NotImplementedError() + + ############ Authentication & Authorization Interfaces & Classes ############# @@ -962,6 +1126,34 @@ class ServiceRpcHandler(six.with_metaclass(abc.ABCMeta, GenericRpcHandler)): raise NotImplementedError() +#################### Service-Side Interceptor Interfaces ##################### + + +class ServerInterceptor(six.with_metaclass(abc.ABCMeta)): + """Affords intercepting incoming RPCs on the service-side. + + This is an EXPERIMENTAL API. + """ + + @abc.abstractmethod + def intercept_service(self, continuation, handler_call_details): + """Intercepts incoming RPCs before handing them over to a handler. + + Args: + continuation: A function that takes a HandlerCallDetails and + proceeds to invoke the next interceptor in the chain, if any, + or the RPC handler lookup logic, with the call details passed + as an argument, and returns an RpcMethodHandler instance if + the RPC is considered serviced, or None otherwise. + handler_call_details: A HandlerCallDetails describing the RPC. + + Returns: + An RpcMethodHandler with which the RPC may be serviced if the + interceptor chooses to service this RPC, or None otherwise. + """ + raise NotImplementedError() + + ############################# Server Interface ############################### @@ -1376,53 +1568,88 @@ def secure_channel(target, credentials, options=None): credentials._credentials) +def intercept_channel(channel, *interceptors): + """Intercepts a channel through a set of interceptors. + + This is an EXPERIMENTAL API. + + Args: + channel: A Channel. + interceptors: Zero or more objects of type + UnaryUnaryClientInterceptor, + UnaryStreamClientInterceptor, + StreamUnaryClientInterceptor, or + StreamStreamClientInterceptor. + Interceptors are given control in the order they are listed. + + Returns: + A Channel that intercepts each invocation via the provided interceptors. + + Raises: + TypeError: If interceptor does not derive from any of + UnaryUnaryClientInterceptor, + UnaryStreamClientInterceptor, + StreamUnaryClientInterceptor, or + StreamStreamClientInterceptor. + """ + from grpc import _interceptor # pylint: disable=cyclic-import + return _interceptor.intercept_channel(channel, *interceptors) + + def server(thread_pool, handlers=None, + interceptors=None, options=None, maximum_concurrent_rpcs=None): """Creates a Server with which RPCs can be serviced. - Args: - thread_pool: A futures.ThreadPoolExecutor to be used by the Server - to execute RPC handlers. - handlers: An optional list of GenericRpcHandlers used for executing RPCs. - More handlers may be added by calling add_generic_rpc_handlers any time - before the server is started. - options: An optional list of key-value pairs (channel args in gRPC runtime) - to configure the channel. - maximum_concurrent_rpcs: The maximum number of concurrent RPCs this server - will service before returning RESOURCE_EXHAUSTED status, or None to - indicate no limit. + Args: + thread_pool: A futures.ThreadPoolExecutor to be used by the Server + to execute RPC handlers. + handlers: An optional list of GenericRpcHandlers used for executing RPCs. + More handlers may be added by calling add_generic_rpc_handlers any time + before the server is started. + interceptors: An optional list of ServerInterceptor objects that observe + and optionally manipulate the incoming RPCs before handing them over to + handlers. The interceptors are given control in the order they are + specified. This is an EXPERIMENTAL API. + options: An optional list of key-value pairs (channel args in gRPC runtime) + to configure the channel. + maximum_concurrent_rpcs: The maximum number of concurrent RPCs this server + will service before returning RESOURCE_EXHAUSTED status, or None to + indicate no limit. - Returns: - A Server object. - """ + Returns: + A Server object. + """ from grpc import _server # pylint: disable=cyclic-import return _server.Server(thread_pool, () if handlers is None else handlers, () - if options is None else options, - maximum_concurrent_rpcs) + if interceptors is None else interceptors, () if + options is None else options, maximum_concurrent_rpcs) ################################### __all__ ################################# -__all__ = ('FutureTimeoutError', 'FutureCancelledError', 'Future', - 'ChannelConnectivity', 'StatusCode', 'RpcError', 'RpcContext', - 'Call', 'ChannelCredentials', 'CallCredentials', - 'AuthMetadataContext', 'AuthMetadataPluginCallback', - 'AuthMetadataPlugin', 'ServerCertificateConfiguration', - 'ServerCredentials', 'UnaryUnaryMultiCallable', - 'UnaryStreamMultiCallable', 'StreamUnaryMultiCallable', - 'StreamStreamMultiCallable', 'Channel', 'ServicerContext', - 'RpcMethodHandler', 'HandlerCallDetails', 'GenericRpcHandler', - 'ServiceRpcHandler', 'Server', 'unary_unary_rpc_method_handler', - 'unary_stream_rpc_method_handler', 'stream_unary_rpc_method_handler', - 'stream_stream_rpc_method_handler', - 'method_handlers_generic_handler', 'ssl_channel_credentials', - 'metadata_call_credentials', 'access_token_call_credentials', - 'composite_call_credentials', 'composite_channel_credentials', - 'ssl_server_credentials', 'ssl_server_certificate_configuration', - 'dynamic_ssl_server_credentials', 'channel_ready_future', - 'insecure_channel', 'secure_channel', 'server',) +__all__ = ( + 'FutureTimeoutError', 'FutureCancelledError', 'Future', + 'ChannelConnectivity', 'StatusCode', 'RpcError', 'RpcContext', 'Call', + 'ChannelCredentials', 'CallCredentials', 'AuthMetadataContext', + 'AuthMetadataPluginCallback', 'AuthMetadataPlugin', 'ClientCallDetails', + 'ServerCertificateConfiguration', 'ServerCredentials', + 'UnaryUnaryMultiCallable', 'UnaryStreamMultiCallable', + 'StreamUnaryMultiCallable', 'StreamStreamMultiCallable', + 'UnaryUnaryClientInterceptor', 'UnaryStreamClientInterceptor', + 'StreamUnaryClientInterceptor', 'StreamStreamClientInterceptor', 'Channel', + 'ServicerContext', 'RpcMethodHandler', 'HandlerCallDetails', + 'GenericRpcHandler', 'ServiceRpcHandler', 'Server', 'ServerInterceptor', + 'unary_unary_rpc_method_handler', 'unary_stream_rpc_method_handler', + 'stream_unary_rpc_method_handler', 'stream_stream_rpc_method_handler', + 'method_handlers_generic_handler', 'ssl_channel_credentials', + 'metadata_call_credentials', 'access_token_call_credentials', + 'composite_call_credentials', 'composite_channel_credentials', + 'ssl_server_credentials', 'ssl_server_certificate_configuration', + 'dynamic_ssl_server_credentials', 'channel_ready_future', + 'insecure_channel', 'secure_channel', 'intercept_channel', 'server',) ############################### Extension Shims ################################ diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py new file mode 100644 index 00000000000..fffb2698455 --- /dev/null +++ b/src/python/grpcio/grpc/_interceptor.py @@ -0,0 +1,318 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Implementation of gRPC Python interceptors.""" + +import collections +import sys + +import grpc + + +class _ServicePipeline(object): + + def __init__(self, interceptors): + self.interceptors = tuple(interceptors) + + def _continuation(self, thunk, index): + return lambda context: self._intercept_at(thunk, index, context) + + def _intercept_at(self, thunk, index, context): + if index < len(self.interceptors): + interceptor = self.interceptors[index] + thunk = self._continuation(thunk, index + 1) + return interceptor.intercept_service(thunk, context) + else: + return thunk(context) + + def execute(self, thunk, context): + return self._intercept_at(thunk, 0, context) + + +def service_pipeline(interceptors): + return _ServicePipeline(interceptors) if interceptors else None + + +class _ClientCallDetails( + collections.namedtuple('_ClientCallDetails', + ('method', 'timeout', 'metadata', + 'credentials')), grpc.ClientCallDetails): + pass + + +class _LocalFailure(grpc.RpcError, grpc.Future, grpc.Call): + + def __init__(self, exception, traceback): + super(_LocalFailure, self).__init__() + self._exception = exception + self._traceback = traceback + + def initial_metadata(self): + return None + + def trailing_metadata(self): + return None + + def code(self): + return grpc.StatusCode.INTERNAL + + def details(self): + return 'Exception raised while intercepting the RPC' + + def cancel(self): + return False + + def cancelled(self): + return False + + def running(self): + return False + + def done(self): + return True + + def result(self, ignored_timeout=None): + raise self._exception + + def exception(self, ignored_timeout=None): + return self._exception + + def traceback(self, ignored_timeout=None): + return self._traceback + + def add_done_callback(self, fn): + fn(self) + + def __iter__(self): + return self + + def next(self): + raise self._exception + + +class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): + + def __init__(self, thunk, method, interceptor): + self._thunk = thunk + self._method = method + self._interceptor = interceptor + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + call_future = self.future( + request, + timeout=timeout, + metadata=metadata, + credentials=credentials) + return call_future.result() + + def with_call(self, request, timeout=None, metadata=None, credentials=None): + call_future = self.future( + request, + timeout=timeout, + metadata=metadata, + credentials=credentials) + return call_future.result(), call_future + + def future(self, request, timeout=None, metadata=None, credentials=None): + + def continuation(client_call_details, request): + return self._thunk(client_call_details.method).future( + request, + timeout=client_call_details.timeout, + metadata=client_call_details.metadata, + credentials=client_call_details.credentials) + + client_call_details = _ClientCallDetails(self._method, timeout, + metadata, credentials) + try: + return self._interceptor.intercept_unary_unary( + continuation, client_call_details, request) + except Exception as exception: # pylint:disable=broad-except + return _LocalFailure(exception, sys.exc_info()[2]) + + +class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): + + def __init__(self, thunk, method, interceptor): + self._thunk = thunk + self._method = method + self._interceptor = interceptor + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + + def continuation(client_call_details, request): + return self._thunk(client_call_details.method)( + request, + timeout=client_call_details.timeout, + metadata=client_call_details.metadata, + credentials=client_call_details.credentials) + + client_call_details = _ClientCallDetails(self._method, timeout, + metadata, credentials) + try: + return self._interceptor.intercept_unary_stream( + continuation, client_call_details, request) + except Exception as exception: # pylint:disable=broad-except + return _LocalFailure(exception, sys.exc_info()[2]) + + +class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): + + def __init__(self, thunk, method, interceptor): + self._thunk = thunk + self._method = method + self._interceptor = interceptor + + def __call__(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + call_future = self.future( + request_iterator, + timeout=timeout, + metadata=metadata, + credentials=credentials) + return call_future.result() + + def with_call(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + call_future = self.future( + request_iterator, + timeout=timeout, + metadata=metadata, + credentials=credentials) + return call_future.result(), call_future + + def future(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + + def continuation(client_call_details, request_iterator): + return self._thunk(client_call_details.method).future( + request_iterator, + timeout=client_call_details.timeout, + metadata=client_call_details.metadata, + credentials=client_call_details.credentials) + + client_call_details = _ClientCallDetails(self._method, timeout, + metadata, credentials) + + try: + return self._interceptor.intercept_stream_unary( + continuation, client_call_details, request_iterator) + except Exception as exception: # pylint:disable=broad-except + return _LocalFailure(exception, sys.exc_info()[2]) + + +class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): + + def __init__(self, thunk, method, interceptor): + self._thunk = thunk + self._method = method + self._interceptor = interceptor + + def __call__(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + + def continuation(client_call_details, request_iterator): + return self._thunk(client_call_details.method)( + request_iterator, + timeout=client_call_details.timeout, + metadata=client_call_details.metadata, + credentials=client_call_details.credentials) + + client_call_details = _ClientCallDetails(self._method, timeout, + metadata, credentials) + + try: + return self._interceptor.intercept_stream_stream( + continuation, client_call_details, request_iterator) + except Exception as exception: # pylint:disable=broad-except + return _LocalFailure(exception, sys.exc_info()[2]) + + +class _Channel(grpc.Channel): + + def __init__(self, channel, interceptor): + self._channel = channel + self._interceptor = interceptor + + def subscribe(self, *args, **kwargs): + self._channel.subscribe(*args, **kwargs) + + def unsubscribe(self, *args, **kwargs): + self._channel.unsubscribe(*args, **kwargs) + + def unary_unary(self, + method, + request_serializer=None, + response_deserializer=None): + thunk = lambda m: self._channel.unary_unary(m, request_serializer, response_deserializer) + if isinstance(self._interceptor, grpc.UnaryUnaryClientInterceptor): + return _UnaryUnaryMultiCallable(thunk, method, self._interceptor) + else: + return thunk(method) + + def unary_stream(self, + method, + request_serializer=None, + response_deserializer=None): + thunk = lambda m: self._channel.unary_stream(m, request_serializer, response_deserializer) + if isinstance(self._interceptor, grpc.UnaryStreamClientInterceptor): + return _UnaryStreamMultiCallable(thunk, method, self._interceptor) + else: + return thunk(method) + + def stream_unary(self, + method, + request_serializer=None, + response_deserializer=None): + thunk = lambda m: self._channel.stream_unary(m, request_serializer, response_deserializer) + if isinstance(self._interceptor, grpc.StreamUnaryClientInterceptor): + return _StreamUnaryMultiCallable(thunk, method, self._interceptor) + else: + return thunk(method) + + def stream_stream(self, + method, + request_serializer=None, + response_deserializer=None): + thunk = lambda m: self._channel.stream_stream(m, request_serializer, response_deserializer) + if isinstance(self._interceptor, grpc.StreamStreamClientInterceptor): + return _StreamStreamMultiCallable(thunk, method, self._interceptor) + else: + return thunk(method) + + +def intercept_channel(channel, *interceptors): + for interceptor in reversed(list(interceptors)): + if not isinstance(interceptor, grpc.UnaryUnaryClientInterceptor) and \ + not isinstance(interceptor, grpc.UnaryStreamClientInterceptor) and \ + not isinstance(interceptor, grpc.StreamUnaryClientInterceptor) and \ + not isinstance(interceptor, grpc.StreamStreamClientInterceptor): + raise TypeError('interceptor must be ' + 'grpc.UnaryUnaryClientInterceptor or ' + 'grpc.UnaryStreamClientInterceptor or ' + 'grpc.StreamUnaryClientInterceptor or ' + 'grpc.StreamStreamClientInterceptor or ') + channel = _Channel(channel, interceptor) + return channel diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 7b78a8be736..8857bd3eb54 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -23,6 +23,7 @@ import six import grpc from grpc import _common +from grpc import _interceptor from grpc._cython import cygrpc from grpc.framework.foundation import callable_util @@ -541,17 +542,25 @@ def _handle_stream_stream(rpc_event, state, method_handler, thread_pool): method_handler.request_deserializer, method_handler.response_serializer) -def _find_method_handler(rpc_event, generic_handlers): - for generic_handler in generic_handlers: - method_handler = generic_handler.service( - _HandlerCallDetails( - _common.decode(rpc_event.request_call_details.method), - rpc_event.request_metadata)) - if method_handler is not None: - return method_handler - else: +def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline): + + def query_handlers(handler_call_details): + for generic_handler in generic_handlers: + method_handler = generic_handler.service(handler_call_details) + if method_handler is not None: + return method_handler return None + handler_call_details = _HandlerCallDetails( + _common.decode(rpc_event.request_call_details.method), + rpc_event.request_metadata) + + if interceptor_pipeline is not None: + return interceptor_pipeline.execute(query_handlers, + handler_call_details) + else: + return query_handlers(handler_call_details) + def _reject_rpc(rpc_event, status, details): operations = (cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS), @@ -587,13 +596,14 @@ def _handle_with_method_handler(rpc_event, method_handler, thread_pool): method_handler, thread_pool) -def _handle_call(rpc_event, generic_handlers, thread_pool, +def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool, concurrency_exceeded): if not rpc_event.success: return None, None if rpc_event.request_call_details.method is not None: try: - method_handler = _find_method_handler(rpc_event, generic_handlers) + method_handler = _find_method_handler(rpc_event, generic_handlers, + interceptor_pipeline) except Exception as exception: # pylint: disable=broad-except details = 'Exception servicing handler: {}'.format(exception) logging.exception(details) @@ -621,12 +631,14 @@ class _ServerStage(enum.Enum): class _ServerState(object): - def __init__(self, completion_queue, server, generic_handlers, thread_pool, - maximum_concurrent_rpcs): + # pylint: disable=too-many-arguments + def __init__(self, completion_queue, server, generic_handlers, + interceptor_pipeline, thread_pool, maximum_concurrent_rpcs): self.lock = threading.Lock() self.completion_queue = completion_queue self.server = server self.generic_handlers = list(generic_handlers) + self.interceptor_pipeline = interceptor_pipeline self.thread_pool = thread_pool self.stage = _ServerStage.STOPPED self.shutdown_events = None @@ -691,8 +703,8 @@ def _serve(state): state.maximum_concurrent_rpcs is not None and state.active_rpc_count >= state.maximum_concurrent_rpcs) rpc_state, rpc_future = _handle_call( - event, state.generic_handlers, state.thread_pool, - concurrency_exceeded) + event, state.generic_handlers, state.interceptor_pipeline, + state.thread_pool, concurrency_exceeded) if rpc_state is not None: state.rpc_states.add(rpc_state) if rpc_future is not None: @@ -780,12 +792,14 @@ def _start(state): class Server(grpc.Server): - def __init__(self, thread_pool, generic_handlers, options, + # pylint: disable=too-many-arguments + def __init__(self, thread_pool, generic_handlers, interceptors, options, maximum_concurrent_rpcs): completion_queue = cygrpc.CompletionQueue() server = cygrpc.Server(_common.channel_args(options)) server.register_completion_queue(completion_queue) self._state = _ServerState(completion_queue, server, generic_handlers, + _interceptor.service_pipeline(interceptors), thread_pool, maximum_concurrent_rpcs) def add_generic_rpc_handlers(self, generic_rpc_handlers): diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 34cbade92c3..3bf53087495 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -39,6 +39,7 @@ "unit._cython.cygrpc_test.TypeSmokeTest", "unit._empty_message_test.EmptyMessageTest", "unit._exit_test.ExitTest", + "unit._interceptor_test.InterceptorTest", "unit._invalid_metadata_test.InvalidMetadataTest", "unit._invocation_defects_test.InvocationDefectsTest", "unit._metadata_code_details_test.MetadataCodeDetailsTest", diff --git a/src/python/grpcio_tests/tests/unit/_api_test.py b/src/python/grpcio_tests/tests/unit/_api_test.py index b14e8d5c753..d6f44475324 100644 --- a/src/python/grpcio_tests/tests/unit/_api_test.py +++ b/src/python/grpcio_tests/tests/unit/_api_test.py @@ -33,18 +33,21 @@ class AllTest(unittest.TestCase): 'AuthMetadataPlugin', 'ServerCertificateConfiguration', 'ServerCredentials', 'UnaryUnaryMultiCallable', 'UnaryStreamMultiCallable', 'StreamUnaryMultiCallable', - 'StreamStreamMultiCallable', 'Channel', 'ServicerContext', + 'StreamStreamMultiCallable', 'UnaryUnaryClientInterceptor', + 'UnaryStreamClientInterceptor', 'StreamUnaryClientInterceptor', + 'StreamStreamClientInterceptor', 'Channel', 'ServicerContext', 'RpcMethodHandler', 'HandlerCallDetails', 'GenericRpcHandler', - 'ServiceRpcHandler', 'Server', 'unary_unary_rpc_method_handler', - 'unary_stream_rpc_method_handler', - 'stream_unary_rpc_method_handler', + 'ServiceRpcHandler', 'Server', 'ServerInterceptor', + 'unary_unary_rpc_method_handler', 'unary_stream_rpc_method_handler', + 'stream_unary_rpc_method_handler', 'ClientCallDetails', 'stream_stream_rpc_method_handler', 'method_handlers_generic_handler', 'ssl_channel_credentials', 'metadata_call_credentials', 'access_token_call_credentials', 'composite_call_credentials', 'composite_channel_credentials', 'ssl_server_credentials', 'ssl_server_certificate_configuration', 'dynamic_ssl_server_credentials', 'channel_ready_future', - 'insecure_channel', 'secure_channel', 'server',) + 'insecure_channel', 'secure_channel', 'intercept_channel', + 'server',) six.assertCountEqual(self, expected_grpc_code_elements, _from_grpc_import_star.GRPC_ELEMENTS) diff --git a/src/python/grpcio_tests/tests/unit/_interceptor_test.py b/src/python/grpcio_tests/tests/unit/_interceptor_test.py new file mode 100644 index 00000000000..cf875ed7da4 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_interceptor_test.py @@ -0,0 +1,571 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test of gRPC Python interceptors.""" + +import collections +import itertools +import threading +import unittest +from concurrent import futures + +import grpc +from grpc.framework.foundation import logging_pool + +from tests.unit.framework.common import test_constants +from tests.unit.framework.common import test_control + +_SERIALIZE_REQUEST = lambda bytestring: bytestring * 2 +_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:] +_SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3 +_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3] + +_UNARY_UNARY = '/test/UnaryUnary' +_UNARY_STREAM = '/test/UnaryStream' +_STREAM_UNARY = '/test/StreamUnary' +_STREAM_STREAM = '/test/StreamStream' + + +class _Callback(object): + + def __init__(self): + self._condition = threading.Condition() + self._value = None + self._called = False + + def __call__(self, value): + with self._condition: + self._value = value + self._called = True + self._condition.notify_all() + + def value(self): + with self._condition: + while not self._called: + self._condition.wait() + return self._value + + +class _Handler(object): + + def __init__(self, control): + self._control = control + + def handle_unary_unary(self, request, servicer_context): + self._control.control() + if servicer_context is not None: + servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + return request + + def handle_unary_stream(self, request, servicer_context): + for _ in range(test_constants.STREAM_LENGTH): + self._control.control() + yield request + self._control.control() + if servicer_context is not None: + servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + + def handle_stream_unary(self, request_iterator, servicer_context): + if servicer_context is not None: + servicer_context.invocation_metadata() + self._control.control() + response_elements = [] + for request in request_iterator: + self._control.control() + response_elements.append(request) + self._control.control() + if servicer_context is not None: + servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + return b''.join(response_elements) + + def handle_stream_stream(self, request_iterator, servicer_context): + self._control.control() + if servicer_context is not None: + servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + for request in request_iterator: + self._control.control() + yield request + self._control.control() + + +class _MethodHandler(grpc.RpcMethodHandler): + + def __init__(self, request_streaming, response_streaming, + request_deserializer, response_serializer, unary_unary, + unary_stream, stream_unary, stream_stream): + self.request_streaming = request_streaming + self.response_streaming = response_streaming + self.request_deserializer = request_deserializer + self.response_serializer = response_serializer + self.unary_unary = unary_unary + self.unary_stream = unary_stream + self.stream_unary = stream_unary + self.stream_stream = stream_stream + + +class _GenericHandler(grpc.GenericRpcHandler): + + def __init__(self, handler): + self._handler = handler + + def service(self, handler_call_details): + if handler_call_details.method == _UNARY_UNARY: + return _MethodHandler(False, False, None, None, + self._handler.handle_unary_unary, None, None, + None) + elif handler_call_details.method == _UNARY_STREAM: + return _MethodHandler(False, True, _DESERIALIZE_REQUEST, + _SERIALIZE_RESPONSE, None, + self._handler.handle_unary_stream, None, None) + elif handler_call_details.method == _STREAM_UNARY: + return _MethodHandler(True, False, _DESERIALIZE_REQUEST, + _SERIALIZE_RESPONSE, None, None, + self._handler.handle_stream_unary, None) + elif handler_call_details.method == _STREAM_STREAM: + return _MethodHandler(True, True, None, None, None, None, None, + self._handler.handle_stream_stream) + else: + return None + + +def _unary_unary_multi_callable(channel): + return channel.unary_unary(_UNARY_UNARY) + + +def _unary_stream_multi_callable(channel): + return channel.unary_stream( + _UNARY_STREAM, + request_serializer=_SERIALIZE_REQUEST, + response_deserializer=_DESERIALIZE_RESPONSE) + + +def _stream_unary_multi_callable(channel): + return channel.stream_unary( + _STREAM_UNARY, + request_serializer=_SERIALIZE_REQUEST, + response_deserializer=_DESERIALIZE_RESPONSE) + + +def _stream_stream_multi_callable(channel): + return channel.stream_stream(_STREAM_STREAM) + + +class _ClientCallDetails( + collections.namedtuple('_ClientCallDetails', + ('method', 'timeout', 'metadata', + 'credentials')), grpc.ClientCallDetails): + pass + + +class _GenericClientInterceptor( + grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor, + grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor): + + def __init__(self, interceptor_function): + self._fn = interceptor_function + + def intercept_unary_unary(self, continuation, client_call_details, request): + new_details, new_request_iterator, postprocess = self._fn( + client_call_details, iter((request,)), False, False) + response = continuation(new_details, next(new_request_iterator)) + return postprocess(response) if postprocess else response + + def intercept_unary_stream(self, continuation, client_call_details, + request): + new_details, new_request_iterator, postprocess = self._fn( + client_call_details, iter((request,)), False, True) + response_it = continuation(new_details, new_request_iterator) + return postprocess(response_it) if postprocess else response_it + + def intercept_stream_unary(self, continuation, client_call_details, + request_iterator): + new_details, new_request_iterator, postprocess = self._fn( + client_call_details, request_iterator, True, False) + response = continuation(new_details, next(new_request_iterator)) + return postprocess(response) if postprocess else response + + def intercept_stream_stream(self, continuation, client_call_details, + request_iterator): + new_details, new_request_iterator, postprocess = self._fn( + client_call_details, request_iterator, True, True) + response_it = continuation(new_details, new_request_iterator) + return postprocess(response_it) if postprocess else response_it + + +class _LoggingInterceptor( + grpc.ServerInterceptor, grpc.UnaryUnaryClientInterceptor, + grpc.UnaryStreamClientInterceptor, grpc.StreamUnaryClientInterceptor, + grpc.StreamStreamClientInterceptor): + + def __init__(self, tag, record): + self.tag = tag + self.record = record + + def intercept_service(self, continuation, handler_call_details): + self.record.append(self.tag + ':intercept_service') + return continuation(handler_call_details) + + def intercept_unary_unary(self, continuation, client_call_details, request): + self.record.append(self.tag + ':intercept_unary_unary') + return continuation(client_call_details, request) + + def intercept_unary_stream(self, continuation, client_call_details, + request): + self.record.append(self.tag + ':intercept_unary_stream') + return continuation(client_call_details, request) + + def intercept_stream_unary(self, continuation, client_call_details, + request_iterator): + self.record.append(self.tag + ':intercept_stream_unary') + return continuation(client_call_details, request_iterator) + + def intercept_stream_stream(self, continuation, client_call_details, + request_iterator): + self.record.append(self.tag + ':intercept_stream_stream') + return continuation(client_call_details, request_iterator) + + +class _DefectiveClientInterceptor(grpc.UnaryUnaryClientInterceptor): + + def intercept_unary_unary(self, ignored_continuation, + ignored_client_call_details, ignored_request): + raise test_control.Defect() + + +def _wrap_request_iterator_stream_interceptor(wrapper): + + def intercept_call(client_call_details, request_iterator, request_streaming, + ignored_response_streaming): + if request_streaming: + return client_call_details, wrapper(request_iterator), None + else: + return client_call_details, request_iterator, None + + return _GenericClientInterceptor(intercept_call) + + +def _append_request_header_interceptor(header, value): + + def intercept_call(client_call_details, request_iterator, + ignored_request_streaming, ignored_response_streaming): + metadata = [] + if client_call_details.metadata: + metadata = list(client_call_details.metadata) + metadata.append((header, value,)) + client_call_details = _ClientCallDetails( + client_call_details.method, client_call_details.timeout, metadata, + client_call_details.credentials) + return client_call_details, request_iterator, None + + return _GenericClientInterceptor(intercept_call) + + +class _GenericServerInterceptor(grpc.ServerInterceptor): + + def __init__(self, fn): + self._fn = fn + + def intercept_service(self, continuation, handler_call_details): + return self._fn(continuation, handler_call_details) + + +def _filter_server_interceptor(condition, interceptor): + + def intercept_service(continuation, handler_call_details): + if condition(handler_call_details): + return interceptor.intercept_service(continuation, + handler_call_details) + return continuation(handler_call_details) + + return _GenericServerInterceptor(intercept_service) + + +class InterceptorTest(unittest.TestCase): + + def setUp(self): + self._control = test_control.PauseFailControl() + self._handler = _Handler(self._control) + self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) + + self._record = [] + conditional_interceptor = _filter_server_interceptor( + lambda x: ('secret', '42') in x.invocation_metadata, + _LoggingInterceptor('s3', self._record)) + + self._server = grpc.server( + self._server_pool, + interceptors=(_LoggingInterceptor('s1', self._record), + conditional_interceptor, + _LoggingInterceptor('s2', self._record),)) + port = self._server.add_insecure_port('[::]:0') + self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),)) + self._server.start() + + self._channel = grpc.insecure_channel('localhost:%d' % port) + + def tearDown(self): + self._server.stop(None) + self._server_pool.shutdown(wait=True) + + def testTripleRequestMessagesClientInterceptor(self): + + def triple(request_iterator): + while True: + try: + item = next(request_iterator) + yield item + yield item + yield item + except StopIteration: + break + + interceptor = _wrap_request_iterator_stream_interceptor(triple) + channel = grpc.intercept_channel(self._channel, interceptor) + requests = tuple(b'\x07\x08' + for _ in range(test_constants.STREAM_LENGTH)) + + multi_callable = _stream_stream_multi_callable(channel) + response_iterator = multi_callable( + iter(requests), + metadata=( + ('test', + 'InterceptedStreamRequestBlockingUnaryResponseWithCall'),)) + + responses = tuple(response_iterator) + self.assertEqual(len(responses), 3 * test_constants.STREAM_LENGTH) + + multi_callable = _stream_stream_multi_callable(self._channel) + response_iterator = multi_callable( + iter(requests), + metadata=( + ('test', + 'InterceptedStreamRequestBlockingUnaryResponseWithCall'),)) + + responses = tuple(response_iterator) + self.assertEqual(len(responses), test_constants.STREAM_LENGTH) + + def testDefectiveClientInterceptor(self): + interceptor = _DefectiveClientInterceptor() + defective_channel = grpc.intercept_channel(self._channel, interceptor) + + request = b'\x07\x08' + + multi_callable = _unary_unary_multi_callable(defective_channel) + call_future = multi_callable.future( + request, + metadata=( + ('test', 'InterceptedUnaryRequestBlockingUnaryResponse'),)) + + self.assertIsNotNone(call_future.exception()) + self.assertEqual(call_future.code(), grpc.StatusCode.INTERNAL) + + def testInterceptedHeaderManipulationWithServerSideVerification(self): + request = b'\x07\x08' + + channel = grpc.intercept_channel( + self._channel, _append_request_header_interceptor('secret', '42')) + channel = grpc.intercept_channel( + channel, + _LoggingInterceptor('c1', self._record), + _LoggingInterceptor('c2', self._record)) + + self._record[:] = [] + + multi_callable = _unary_unary_multi_callable(channel) + multi_callable.with_call( + request, + metadata=( + ('test', + 'InterceptedUnaryRequestBlockingUnaryResponseWithCall'),)) + + self.assertSequenceEqual(self._record, [ + 'c1:intercept_unary_unary', 'c2:intercept_unary_unary', + 's1:intercept_service', 's3:intercept_service', + 's2:intercept_service' + ]) + + def testInterceptedUnaryRequestBlockingUnaryResponse(self): + request = b'\x07\x08' + + self._record[:] = [] + + channel = grpc.intercept_channel( + self._channel, + _LoggingInterceptor('c1', self._record), + _LoggingInterceptor('c2', self._record)) + + multi_callable = _unary_unary_multi_callable(channel) + multi_callable( + request, + metadata=( + ('test', 'InterceptedUnaryRequestBlockingUnaryResponse'),)) + + self.assertSequenceEqual(self._record, [ + 'c1:intercept_unary_unary', 'c2:intercept_unary_unary', + 's1:intercept_service', 's2:intercept_service' + ]) + + def testInterceptedUnaryRequestBlockingUnaryResponseWithCall(self): + request = b'\x07\x08' + + channel = grpc.intercept_channel( + self._channel, + _LoggingInterceptor('c1', self._record), + _LoggingInterceptor('c2', self._record)) + + self._record[:] = [] + + multi_callable = _unary_unary_multi_callable(channel) + multi_callable.with_call( + request, + metadata=( + ('test', + 'InterceptedUnaryRequestBlockingUnaryResponseWithCall'),)) + + self.assertSequenceEqual(self._record, [ + 'c1:intercept_unary_unary', 'c2:intercept_unary_unary', + 's1:intercept_service', 's2:intercept_service' + ]) + + def testInterceptedUnaryRequestFutureUnaryResponse(self): + request = b'\x07\x08' + + self._record[:] = [] + channel = grpc.intercept_channel( + self._channel, + _LoggingInterceptor('c1', self._record), + _LoggingInterceptor('c2', self._record)) + + multi_callable = _unary_unary_multi_callable(channel) + response_future = multi_callable.future( + request, + metadata=(('test', 'InterceptedUnaryRequestFutureUnaryResponse'),)) + response_future.result() + + self.assertSequenceEqual(self._record, [ + 'c1:intercept_unary_unary', 'c2:intercept_unary_unary', + 's1:intercept_service', 's2:intercept_service' + ]) + + def testInterceptedUnaryRequestStreamResponse(self): + request = b'\x37\x58' + + self._record[:] = [] + channel = grpc.intercept_channel( + self._channel, + _LoggingInterceptor('c1', self._record), + _LoggingInterceptor('c2', self._record)) + + multi_callable = _unary_stream_multi_callable(channel) + response_iterator = multi_callable( + request, + metadata=(('test', 'InterceptedUnaryRequestStreamResponse'),)) + tuple(response_iterator) + + self.assertSequenceEqual(self._record, [ + 'c1:intercept_unary_stream', 'c2:intercept_unary_stream', + 's1:intercept_service', 's2:intercept_service' + ]) + + def testInterceptedStreamRequestBlockingUnaryResponse(self): + requests = tuple(b'\x07\x08' + for _ in range(test_constants.STREAM_LENGTH)) + request_iterator = iter(requests) + + self._record[:] = [] + channel = grpc.intercept_channel( + self._channel, + _LoggingInterceptor('c1', self._record), + _LoggingInterceptor('c2', self._record)) + + multi_callable = _stream_unary_multi_callable(channel) + multi_callable( + request_iterator, + metadata=( + ('test', 'InterceptedStreamRequestBlockingUnaryResponse'),)) + + self.assertSequenceEqual(self._record, [ + 'c1:intercept_stream_unary', 'c2:intercept_stream_unary', + 's1:intercept_service', 's2:intercept_service' + ]) + + def testInterceptedStreamRequestBlockingUnaryResponseWithCall(self): + requests = tuple(b'\x07\x08' + for _ in range(test_constants.STREAM_LENGTH)) + request_iterator = iter(requests) + + self._record[:] = [] + channel = grpc.intercept_channel( + self._channel, + _LoggingInterceptor('c1', self._record), + _LoggingInterceptor('c2', self._record)) + + multi_callable = _stream_unary_multi_callable(channel) + multi_callable.with_call( + request_iterator, + metadata=( + ('test', + 'InterceptedStreamRequestBlockingUnaryResponseWithCall'),)) + + self.assertSequenceEqual(self._record, [ + 'c1:intercept_stream_unary', 'c2:intercept_stream_unary', + 's1:intercept_service', 's2:intercept_service' + ]) + + def testInterceptedStreamRequestFutureUnaryResponse(self): + requests = tuple(b'\x07\x08' + for _ in range(test_constants.STREAM_LENGTH)) + request_iterator = iter(requests) + + self._record[:] = [] + channel = grpc.intercept_channel( + self._channel, + _LoggingInterceptor('c1', self._record), + _LoggingInterceptor('c2', self._record)) + + multi_callable = _stream_unary_multi_callable(channel) + response_future = multi_callable.future( + request_iterator, + metadata=(('test', 'InterceptedStreamRequestFutureUnaryResponse'),)) + response_future.result() + + self.assertSequenceEqual(self._record, [ + 'c1:intercept_stream_unary', 'c2:intercept_stream_unary', + 's1:intercept_service', 's2:intercept_service' + ]) + + def testInterceptedStreamRequestStreamResponse(self): + requests = tuple(b'\x77\x58' + for _ in range(test_constants.STREAM_LENGTH)) + request_iterator = iter(requests) + + self._record[:] = [] + channel = grpc.intercept_channel( + self._channel, + _LoggingInterceptor('c1', self._record), + _LoggingInterceptor('c2', self._record)) + + multi_callable = _stream_stream_multi_callable(channel) + response_iterator = multi_callable( + request_iterator, + metadata=(('test', 'InterceptedStreamRequestStreamResponse'),)) + tuple(response_iterator) + + self.assertSequenceEqual(self._record, [ + 'c1:intercept_stream_stream', 'c2:intercept_stream_stream', + 's1:intercept_service', 's2:intercept_service' + ]) + + +if __name__ == '__main__': + unittest.main(verbosity=2)