Merge pull request #13722 from mehrdada/ship-py-interceptors

gRPC Python Client and Server Interceptors
pull/13731/head
Mehrdad Afshari 7 years ago committed by GitHub
commit a258a49ca1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 68
      examples/python/interceptors/default_value/default_value_client_interceptor.py
  2. 38
      examples/python/interceptors/default_value/greeter_client.py
  3. 134
      examples/python/interceptors/default_value/helloworld_pb2.py
  4. 46
      examples/python/interceptors/default_value/helloworld_pb2_grpc.py
  5. 55
      examples/python/interceptors/headers/generic_client_interceptor.py
  6. 36
      examples/python/interceptors/headers/greeter_client.py
  7. 52
      examples/python/interceptors/headers/greeter_server.py
  8. 42
      examples/python/interceptors/headers/header_manipulator_client_interceptor.py
  9. 134
      examples/python/interceptors/headers/helloworld_pb2.py
  10. 46
      examples/python/interceptors/headers/helloworld_pb2_grpc.py
  11. 39
      examples/python/interceptors/headers/request_header_validator_interceptor.py
  12. 295
      src/python/grpcio/grpc/__init__.py
  13. 318
      src/python/grpcio/grpc/_interceptor.py
  14. 46
      src/python/grpcio/grpc/_server.py
  15. 1
      src/python/grpcio_tests/tests/tests.json
  16. 13
      src/python/grpcio_tests/tests/unit/_api_test.py
  17. 571
      src/python/grpcio_tests/tests/unit/_interceptor_test.py

@ -0,0 +1,68 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interceptor that adds headers to outgoing requests."""
import collections
import grpc
class _ConcreteValue(grpc.Future):
def __init__(self, result):
self._result = result
def cancel(self):
return False
def cancelled(self):
return False
def running(self):
return False
def done(self):
return True
def result(self, timeout=None):
return self._result
def exception(self, timeout=None):
return None
def traceback(self, timeout=None):
return None
def add_done_callback(self, fn):
fn(self._result)
class DefaultValueClientInterceptor(grpc.UnaryUnaryClientInterceptor,
grpc.StreamUnaryClientInterceptor):
def __init__(self, value):
self._default = _ConcreteValue(value)
def _intercept_call(self, continuation, client_call_details,
request_or_iterator):
response = continuation(client_call_details, request_or_iterator)
return self._default if response.exception() else response
def intercept_unary_unary(self, continuation, client_call_details, request):
return self._intercept_call(continuation, client_call_details, request)
def intercept_stream_unary(self, continuation, client_call_details,
request_iterator):
return self._intercept_call(continuation, client_call_details,
request_iterator)

@ -0,0 +1,38 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Python implementation of the gRPC helloworld.Greeter client."""
from __future__ import print_function
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
import default_value_client_interceptor
def run():
default_value = helloworld_pb2.HelloReply(
message='Hello from your local interceptor!')
default_value_interceptor = default_value_client_interceptor.DefaultValueClientInterceptor(
default_value)
channel = grpc.insecure_channel('localhost:50051')
channel = grpc.intercept_channel(channel, default_value_interceptor)
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
print("Greeter client received: " + response.message)
if __name__ == '__main__':
run()

@ -0,0 +1,134 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: helloworld.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='helloworld.proto',
package='helloworld',
syntax='proto3',
serialized_pb=_b('\n\x10helloworld.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2I\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x42\x36\n\x1bio.grpc.examples.helloworldB\x0fHelloWorldProtoP\x01\xa2\x02\x03HLWb\x06proto3')
)
_HELLOREQUEST = _descriptor.Descriptor(
name='HelloRequest',
full_name='helloworld.HelloRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='name', full_name='helloworld.HelloRequest.name', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=32,
serialized_end=60,
)
_HELLOREPLY = _descriptor.Descriptor(
name='HelloReply',
full_name='helloworld.HelloReply',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='message', full_name='helloworld.HelloReply.message', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=62,
serialized_end=91,
)
DESCRIPTOR.message_types_by_name['HelloRequest'] = _HELLOREQUEST
DESCRIPTOR.message_types_by_name['HelloReply'] = _HELLOREPLY
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
HelloRequest = _reflection.GeneratedProtocolMessageType('HelloRequest', (_message.Message,), dict(
DESCRIPTOR = _HELLOREQUEST,
__module__ = 'helloworld_pb2'
# @@protoc_insertion_point(class_scope:helloworld.HelloRequest)
))
_sym_db.RegisterMessage(HelloRequest)
HelloReply = _reflection.GeneratedProtocolMessageType('HelloReply', (_message.Message,), dict(
DESCRIPTOR = _HELLOREPLY,
__module__ = 'helloworld_pb2'
# @@protoc_insertion_point(class_scope:helloworld.HelloReply)
))
_sym_db.RegisterMessage(HelloReply)
DESCRIPTOR.has_options = True
DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n\033io.grpc.examples.helloworldB\017HelloWorldProtoP\001\242\002\003HLW'))
_GREETER = _descriptor.ServiceDescriptor(
name='Greeter',
full_name='helloworld.Greeter',
file=DESCRIPTOR,
index=0,
options=None,
serialized_start=93,
serialized_end=166,
methods=[
_descriptor.MethodDescriptor(
name='SayHello',
full_name='helloworld.Greeter.SayHello',
index=0,
containing_service=None,
input_type=_HELLOREQUEST,
output_type=_HELLOREPLY,
options=None,
),
])
_sym_db.RegisterServiceDescriptor(_GREETER)
DESCRIPTOR.services_by_name['Greeter'] = _GREETER
# @@protoc_insertion_point(module_scope)

@ -0,0 +1,46 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
import helloworld_pb2 as helloworld__pb2
class GreeterStub(object):
"""The greeting service definition.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.SayHello = channel.unary_unary(
'/helloworld.Greeter/SayHello',
request_serializer=helloworld__pb2.HelloRequest.SerializeToString,
response_deserializer=helloworld__pb2.HelloReply.FromString,
)
class GreeterServicer(object):
"""The greeting service definition.
"""
def SayHello(self, request, context):
"""Sends a greeting
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_GreeterServicer_to_server(servicer, server):
rpc_method_handlers = {
'SayHello': grpc.unary_unary_rpc_method_handler(
servicer.SayHello,
request_deserializer=helloworld__pb2.HelloRequest.FromString,
response_serializer=helloworld__pb2.HelloReply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'helloworld.Greeter', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))

@ -0,0 +1,55 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base class for interceptors that operate on all RPC types."""
import grpc
class _GenericClientInterceptor(
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor):
def __init__(self, interceptor_function):
self._fn = interceptor_function
def intercept_unary_unary(self, continuation, client_call_details, request):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, iter((request,)), False, False)
response = continuation(new_details, next(new_request_iterator))
return postprocess(response) if postprocess else response
def intercept_unary_stream(self, continuation, client_call_details,
request):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, iter((request,)), False, True)
response_it = continuation(new_details, new_request_iterator)
return postprocess(response_it) if postprocess else response_it
def intercept_stream_unary(self, continuation, client_call_details,
request_iterator):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, request_iterator, True, False)
response = continuation(new_details, next(new_request_iterator))
return postprocess(response) if postprocess else response
def intercept_stream_stream(self, continuation, client_call_details,
request_iterator):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, request_iterator, True, True)
response_it = continuation(new_details, new_request_iterator)
return postprocess(response_it) if postprocess else response_it
def create(intercept_call):
return _GenericClientInterceptor(intercept_call)

@ -0,0 +1,36 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Python implementation of the GRPC helloworld.Greeter client."""
from __future__ import print_function
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
import header_manipulator_client_interceptor
def run():
header_adder_interceptor = header_manipulator_client_interceptor.header_adder_interceptor(
'one-time-password', '42')
channel = grpc.insecure_channel('localhost:50051')
channel = grpc.intercept_channel(channel, header_adder_interceptor)
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
print("Greeter client received: " + response.message)
if __name__ == '__main__':
run()

@ -0,0 +1,52 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Python implementation of the GRPC helloworld.Greeter server."""
from concurrent import futures
import time
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
from request_header_validator_interceptor import RequestHeaderValidatorInterceptor
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
class Greeter(helloworld_pb2_grpc.GreeterServicer):
def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
def serve():
header_validator = RequestHeaderValidatorInterceptor(
'one-time-password', '42', grpc.StatusCode.UNAUTHENTICATED,
'Access denied!')
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=(header_validator,))
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port('[::]:50051')
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
serve()

@ -0,0 +1,42 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interceptor that adds headers to outgoing requests."""
import collections
import grpc
import generic_client_interceptor
class _ClientCallDetails(
collections.namedtuple('_ClientCallDetails',
('method', 'timeout', 'metadata',
'credentials')), grpc.ClientCallDetails):
pass
def header_adder_interceptor(header, value):
def intercept_call(client_call_details, request_iterator, request_streaming,
response_streaming):
metadata = []
if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
metadata.append((header, value,))
client_call_details = _ClientCallDetails(
client_call_details.method, client_call_details.timeout, metadata,
client_call_details.credentials)
return client_call_details, request_iterator, None
return generic_client_interceptor.create(intercept_call)

@ -0,0 +1,134 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: helloworld.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='helloworld.proto',
package='helloworld',
syntax='proto3',
serialized_pb=_b('\n\x10helloworld.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2I\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x42\x36\n\x1bio.grpc.examples.helloworldB\x0fHelloWorldProtoP\x01\xa2\x02\x03HLWb\x06proto3')
)
_HELLOREQUEST = _descriptor.Descriptor(
name='HelloRequest',
full_name='helloworld.HelloRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='name', full_name='helloworld.HelloRequest.name', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=32,
serialized_end=60,
)
_HELLOREPLY = _descriptor.Descriptor(
name='HelloReply',
full_name='helloworld.HelloReply',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='message', full_name='helloworld.HelloReply.message', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=62,
serialized_end=91,
)
DESCRIPTOR.message_types_by_name['HelloRequest'] = _HELLOREQUEST
DESCRIPTOR.message_types_by_name['HelloReply'] = _HELLOREPLY
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
HelloRequest = _reflection.GeneratedProtocolMessageType('HelloRequest', (_message.Message,), dict(
DESCRIPTOR = _HELLOREQUEST,
__module__ = 'helloworld_pb2'
# @@protoc_insertion_point(class_scope:helloworld.HelloRequest)
))
_sym_db.RegisterMessage(HelloRequest)
HelloReply = _reflection.GeneratedProtocolMessageType('HelloReply', (_message.Message,), dict(
DESCRIPTOR = _HELLOREPLY,
__module__ = 'helloworld_pb2'
# @@protoc_insertion_point(class_scope:helloworld.HelloReply)
))
_sym_db.RegisterMessage(HelloReply)
DESCRIPTOR.has_options = True
DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n\033io.grpc.examples.helloworldB\017HelloWorldProtoP\001\242\002\003HLW'))
_GREETER = _descriptor.ServiceDescriptor(
name='Greeter',
full_name='helloworld.Greeter',
file=DESCRIPTOR,
index=0,
options=None,
serialized_start=93,
serialized_end=166,
methods=[
_descriptor.MethodDescriptor(
name='SayHello',
full_name='helloworld.Greeter.SayHello',
index=0,
containing_service=None,
input_type=_HELLOREQUEST,
output_type=_HELLOREPLY,
options=None,
),
])
_sym_db.RegisterServiceDescriptor(_GREETER)
DESCRIPTOR.services_by_name['Greeter'] = _GREETER
# @@protoc_insertion_point(module_scope)

@ -0,0 +1,46 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
import helloworld_pb2 as helloworld__pb2
class GreeterStub(object):
"""The greeting service definition.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.SayHello = channel.unary_unary(
'/helloworld.Greeter/SayHello',
request_serializer=helloworld__pb2.HelloRequest.SerializeToString,
response_deserializer=helloworld__pb2.HelloReply.FromString,
)
class GreeterServicer(object):
"""The greeting service definition.
"""
def SayHello(self, request, context):
"""Sends a greeting
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_GreeterServicer_to_server(servicer, server):
rpc_method_handlers = {
'SayHello': grpc.unary_unary_rpc_method_handler(
servicer.SayHello,
request_deserializer=helloworld__pb2.HelloRequest.FromString,
response_serializer=helloworld__pb2.HelloReply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'helloworld.Greeter', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))

@ -0,0 +1,39 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interceptor that ensures a specific header is present."""
import grpc
def _unary_unary_rpc_terminator(code, details):
def terminate(ignored_request, context):
context.abort(code, details)
return grpc.unary_unary_rpc_method_handler(terminate)
class RequestHeaderValidatorInterceptor(grpc.ServerInterceptor):
def __init__(self, header, value, code, details):
self._header = header
self._value = value
self._terminator = _unary_unary_rpc_terminator(code, details)
def intercept_service(self, continuation, handler_call_details):
if (self._header,
self._value) in handler_call_details.invocation_metadata:
return continuation(handler_call_details)
else:
return self._terminator

@ -342,6 +342,170 @@ class Call(six.with_metaclass(abc.ABCMeta, RpcContext)):
raise NotImplementedError()
############## Invocation-Side Interceptor Interfaces & Classes ##############
class ClientCallDetails(six.with_metaclass(abc.ABCMeta)):
"""Describes an RPC to be invoked.
This is an EXPERIMENTAL API.
Attributes:
method: The method name of the RPC.
timeout: An optional duration of time in seconds to allow for the RPC.
metadata: Optional :term:`metadata` to be transmitted to
the service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
"""
class UnaryUnaryClientInterceptor(six.with_metaclass(abc.ABCMeta)):
"""Affords intercepting unary-unary invocations.
This is an EXPERIMENTAL API.
"""
@abc.abstractmethod
def intercept_unary_unary(self, continuation, client_call_details, request):
"""Intercepts a unary-unary invocation asynchronously.
Args:
continuation: A function that proceeds with the invocation by
executing the next interceptor in chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
`response_future = continuation(client_call_details, request)`
to continue with the RPC. `continuation` returns an object that is
both a Call for the RPC and a Future. In the event of RPC
completion, the return Call-Future's result value will be
the response message of the RPC. Should the event terminate
with non-OK status, the returned Call-Future's exception value
will be an RpcError.
client_call_details: A ClientCallDetails object describing the
outgoing RPC.
request: The request value for the RPC.
Returns:
An object that is both a Call for the RPC and a Future.
In the event of RPC completion, the return Call-Future's
result value will be the response message of the RPC.
Should the event terminate with non-OK status, the returned
Call-Future's exception value will be an RpcError.
"""
raise NotImplementedError()
class UnaryStreamClientInterceptor(six.with_metaclass(abc.ABCMeta)):
"""Affords intercepting unary-stream invocations.
This is an EXPERIMENTAL API.
"""
@abc.abstractmethod
def intercept_unary_stream(self, continuation, client_call_details,
request):
"""Intercepts a unary-stream invocation.
Args:
continuation: A function that proceeds with the invocation by
executing the next interceptor in chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
`response_iterator = continuation(client_call_details, request)`
to continue with the RPC. `continuation` returns an object that is
both a Call for the RPC and an iterator for response values.
Drawing response values from the returned Call-iterator may
raise RpcError indicating termination of the RPC with non-OK
status.
client_call_details: A ClientCallDetails object describing the
outgoing RPC.
request: The request value for the RPC.
Returns:
An object that is both a Call for the RPC and an iterator of
response values. Drawing response values from the returned
Call-iterator may raise RpcError indicating termination of
the RPC with non-OK status.
"""
raise NotImplementedError()
class StreamUnaryClientInterceptor(six.with_metaclass(abc.ABCMeta)):
"""Affords intercepting stream-unary invocations.
This is an EXPERIMENTAL API.
"""
@abc.abstractmethod
def intercept_stream_unary(self, continuation, client_call_details,
request_iterator):
"""Intercepts a stream-unary invocation asynchronously.
Args:
continuation: A function that proceeds with the invocation by
executing the next interceptor in chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
`response_future = continuation(client_call_details,
request_iterator)`
to continue with the RPC. `continuation` returns an object that is
both a Call for the RPC and a Future. In the event of RPC completion,
the return Call-Future's result value will be the response message
of the RPC. Should the event terminate with non-OK status, the
returned Call-Future's exception value will be an RpcError.
client_call_details: A ClientCallDetails object describing the
outgoing RPC.
request_iterator: An iterator that yields request values for the RPC.
Returns:
An object that is both a Call for the RPC and a Future.
In the event of RPC completion, the return Call-Future's
result value will be the response message of the RPC.
Should the event terminate with non-OK status, the returned
Call-Future's exception value will be an RpcError.
"""
raise NotImplementedError()
class StreamStreamClientInterceptor(six.with_metaclass(abc.ABCMeta)):
"""Affords intercepting stream-stream invocations.
This is an EXPERIMENTAL API.
"""
@abc.abstractmethod
def intercept_stream_stream(self, continuation, client_call_details,
request_iterator):
"""Intercepts a stream-stream invocation.
continuation: A function that proceeds with the invocation by
executing the next interceptor in chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
`response_iterator = continuation(client_call_details,
request_iterator)`
to continue with the RPC. `continuation` returns an object that is
both a Call for the RPC and an iterator for response values.
Drawing response values from the returned Call-iterator may
raise RpcError indicating termination of the RPC with non-OK
status.
client_call_details: A ClientCallDetails object describing the
outgoing RPC.
request_iterator: An iterator that yields request values for the RPC.
Returns:
An object that is both a Call for the RPC and an iterator of
response values. Drawing response values from the returned
Call-iterator may raise RpcError indicating termination of
the RPC with non-OK status.
"""
raise NotImplementedError()
############ Authentication & Authorization Interfaces & Classes #############
@ -962,6 +1126,34 @@ class ServiceRpcHandler(six.with_metaclass(abc.ABCMeta, GenericRpcHandler)):
raise NotImplementedError()
#################### Service-Side Interceptor Interfaces #####################
class ServerInterceptor(six.with_metaclass(abc.ABCMeta)):
"""Affords intercepting incoming RPCs on the service-side.
This is an EXPERIMENTAL API.
"""
@abc.abstractmethod
def intercept_service(self, continuation, handler_call_details):
"""Intercepts incoming RPCs before handing them over to a handler.
Args:
continuation: A function that takes a HandlerCallDetails and
proceeds to invoke the next interceptor in the chain, if any,
or the RPC handler lookup logic, with the call details passed
as an argument, and returns an RpcMethodHandler instance if
the RPC is considered serviced, or None otherwise.
handler_call_details: A HandlerCallDetails describing the RPC.
Returns:
An RpcMethodHandler with which the RPC may be serviced if the
interceptor chooses to service this RPC, or None otherwise.
"""
raise NotImplementedError()
############################# Server Interface ###############################
@ -1376,53 +1568,88 @@ def secure_channel(target, credentials, options=None):
credentials._credentials)
def intercept_channel(channel, *interceptors):
"""Intercepts a channel through a set of interceptors.
This is an EXPERIMENTAL API.
Args:
channel: A Channel.
interceptors: Zero or more objects of type
UnaryUnaryClientInterceptor,
UnaryStreamClientInterceptor,
StreamUnaryClientInterceptor, or
StreamStreamClientInterceptor.
Interceptors are given control in the order they are listed.
Returns:
A Channel that intercepts each invocation via the provided interceptors.
Raises:
TypeError: If interceptor does not derive from any of
UnaryUnaryClientInterceptor,
UnaryStreamClientInterceptor,
StreamUnaryClientInterceptor, or
StreamStreamClientInterceptor.
"""
from grpc import _interceptor # pylint: disable=cyclic-import
return _interceptor.intercept_channel(channel, *interceptors)
def server(thread_pool,
handlers=None,
interceptors=None,
options=None,
maximum_concurrent_rpcs=None):
"""Creates a Server with which RPCs can be serviced.
Args:
thread_pool: A futures.ThreadPoolExecutor to be used by the Server
to execute RPC handlers.
handlers: An optional list of GenericRpcHandlers used for executing RPCs.
More handlers may be added by calling add_generic_rpc_handlers any time
before the server is started.
options: An optional list of key-value pairs (channel args in gRPC runtime)
to configure the channel.
maximum_concurrent_rpcs: The maximum number of concurrent RPCs this server
will service before returning RESOURCE_EXHAUSTED status, or None to
indicate no limit.
Args:
thread_pool: A futures.ThreadPoolExecutor to be used by the Server
to execute RPC handlers.
handlers: An optional list of GenericRpcHandlers used for executing RPCs.
More handlers may be added by calling add_generic_rpc_handlers any time
before the server is started.
interceptors: An optional list of ServerInterceptor objects that observe
and optionally manipulate the incoming RPCs before handing them over to
handlers. The interceptors are given control in the order they are
specified. This is an EXPERIMENTAL API.
options: An optional list of key-value pairs (channel args in gRPC runtime)
to configure the channel.
maximum_concurrent_rpcs: The maximum number of concurrent RPCs this server
will service before returning RESOURCE_EXHAUSTED status, or None to
indicate no limit.
Returns:
A Server object.
"""
Returns:
A Server object.
"""
from grpc import _server # pylint: disable=cyclic-import
return _server.Server(thread_pool, () if handlers is None else handlers, ()
if options is None else options,
maximum_concurrent_rpcs)
if interceptors is None else interceptors, () if
options is None else options, maximum_concurrent_rpcs)
################################### __all__ #################################
__all__ = ('FutureTimeoutError', 'FutureCancelledError', 'Future',
'ChannelConnectivity', 'StatusCode', 'RpcError', 'RpcContext',
'Call', 'ChannelCredentials', 'CallCredentials',
'AuthMetadataContext', 'AuthMetadataPluginCallback',
'AuthMetadataPlugin', 'ServerCertificateConfiguration',
'ServerCredentials', 'UnaryUnaryMultiCallable',
'UnaryStreamMultiCallable', 'StreamUnaryMultiCallable',
'StreamStreamMultiCallable', 'Channel', 'ServicerContext',
'RpcMethodHandler', 'HandlerCallDetails', 'GenericRpcHandler',
'ServiceRpcHandler', 'Server', 'unary_unary_rpc_method_handler',
'unary_stream_rpc_method_handler', 'stream_unary_rpc_method_handler',
'stream_stream_rpc_method_handler',
'method_handlers_generic_handler', 'ssl_channel_credentials',
'metadata_call_credentials', 'access_token_call_credentials',
'composite_call_credentials', 'composite_channel_credentials',
'ssl_server_credentials', 'ssl_server_certificate_configuration',
'dynamic_ssl_server_credentials', 'channel_ready_future',
'insecure_channel', 'secure_channel', 'server',)
__all__ = (
'FutureTimeoutError', 'FutureCancelledError', 'Future',
'ChannelConnectivity', 'StatusCode', 'RpcError', 'RpcContext', 'Call',
'ChannelCredentials', 'CallCredentials', 'AuthMetadataContext',
'AuthMetadataPluginCallback', 'AuthMetadataPlugin', 'ClientCallDetails',
'ServerCertificateConfiguration', 'ServerCredentials',
'UnaryUnaryMultiCallable', 'UnaryStreamMultiCallable',
'StreamUnaryMultiCallable', 'StreamStreamMultiCallable',
'UnaryUnaryClientInterceptor', 'UnaryStreamClientInterceptor',
'StreamUnaryClientInterceptor', 'StreamStreamClientInterceptor', 'Channel',
'ServicerContext', 'RpcMethodHandler', 'HandlerCallDetails',
'GenericRpcHandler', 'ServiceRpcHandler', 'Server', 'ServerInterceptor',
'unary_unary_rpc_method_handler', 'unary_stream_rpc_method_handler',
'stream_unary_rpc_method_handler', 'stream_stream_rpc_method_handler',
'method_handlers_generic_handler', 'ssl_channel_credentials',
'metadata_call_credentials', 'access_token_call_credentials',
'composite_call_credentials', 'composite_channel_credentials',
'ssl_server_credentials', 'ssl_server_certificate_configuration',
'dynamic_ssl_server_credentials', 'channel_ready_future',
'insecure_channel', 'secure_channel', 'intercept_channel', 'server',)
############################### Extension Shims ################################

@ -0,0 +1,318 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of gRPC Python interceptors."""
import collections
import sys
import grpc
class _ServicePipeline(object):
def __init__(self, interceptors):
self.interceptors = tuple(interceptors)
def _continuation(self, thunk, index):
return lambda context: self._intercept_at(thunk, index, context)
def _intercept_at(self, thunk, index, context):
if index < len(self.interceptors):
interceptor = self.interceptors[index]
thunk = self._continuation(thunk, index + 1)
return interceptor.intercept_service(thunk, context)
else:
return thunk(context)
def execute(self, thunk, context):
return self._intercept_at(thunk, 0, context)
def service_pipeline(interceptors):
return _ServicePipeline(interceptors) if interceptors else None
class _ClientCallDetails(
collections.namedtuple('_ClientCallDetails',
('method', 'timeout', 'metadata',
'credentials')), grpc.ClientCallDetails):
pass
class _LocalFailure(grpc.RpcError, grpc.Future, grpc.Call):
def __init__(self, exception, traceback):
super(_LocalFailure, self).__init__()
self._exception = exception
self._traceback = traceback
def initial_metadata(self):
return None
def trailing_metadata(self):
return None
def code(self):
return grpc.StatusCode.INTERNAL
def details(self):
return 'Exception raised while intercepting the RPC'
def cancel(self):
return False
def cancelled(self):
return False
def running(self):
return False
def done(self):
return True
def result(self, ignored_timeout=None):
raise self._exception
def exception(self, ignored_timeout=None):
return self._exception
def traceback(self, ignored_timeout=None):
return self._traceback
def add_done_callback(self, fn):
fn(self)
def __iter__(self):
return self
def next(self):
raise self._exception
class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
def __init__(self, thunk, method, interceptor):
self._thunk = thunk
self._method = method
self._interceptor = interceptor
def __call__(self, request, timeout=None, metadata=None, credentials=None):
call_future = self.future(
request,
timeout=timeout,
metadata=metadata,
credentials=credentials)
return call_future.result()
def with_call(self, request, timeout=None, metadata=None, credentials=None):
call_future = self.future(
request,
timeout=timeout,
metadata=metadata,
credentials=credentials)
return call_future.result(), call_future
def future(self, request, timeout=None, metadata=None, credentials=None):
def continuation(client_call_details, request):
return self._thunk(client_call_details.method).future(
request,
timeout=client_call_details.timeout,
metadata=client_call_details.metadata,
credentials=client_call_details.credentials)
client_call_details = _ClientCallDetails(self._method, timeout,
metadata, credentials)
try:
return self._interceptor.intercept_unary_unary(
continuation, client_call_details, request)
except Exception as exception: # pylint:disable=broad-except
return _LocalFailure(exception, sys.exc_info()[2])
class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
def __init__(self, thunk, method, interceptor):
self._thunk = thunk
self._method = method
self._interceptor = interceptor
def __call__(self, request, timeout=None, metadata=None, credentials=None):
def continuation(client_call_details, request):
return self._thunk(client_call_details.method)(
request,
timeout=client_call_details.timeout,
metadata=client_call_details.metadata,
credentials=client_call_details.credentials)
client_call_details = _ClientCallDetails(self._method, timeout,
metadata, credentials)
try:
return self._interceptor.intercept_unary_stream(
continuation, client_call_details, request)
except Exception as exception: # pylint:disable=broad-except
return _LocalFailure(exception, sys.exc_info()[2])
class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
def __init__(self, thunk, method, interceptor):
self._thunk = thunk
self._method = method
self._interceptor = interceptor
def __call__(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
call_future = self.future(
request_iterator,
timeout=timeout,
metadata=metadata,
credentials=credentials)
return call_future.result()
def with_call(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
call_future = self.future(
request_iterator,
timeout=timeout,
metadata=metadata,
credentials=credentials)
return call_future.result(), call_future
def future(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
def continuation(client_call_details, request_iterator):
return self._thunk(client_call_details.method).future(
request_iterator,
timeout=client_call_details.timeout,
metadata=client_call_details.metadata,
credentials=client_call_details.credentials)
client_call_details = _ClientCallDetails(self._method, timeout,
metadata, credentials)
try:
return self._interceptor.intercept_stream_unary(
continuation, client_call_details, request_iterator)
except Exception as exception: # pylint:disable=broad-except
return _LocalFailure(exception, sys.exc_info()[2])
class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
def __init__(self, thunk, method, interceptor):
self._thunk = thunk
self._method = method
self._interceptor = interceptor
def __call__(self,
request_iterator,
timeout=None,
metadata=None,
credentials=None):
def continuation(client_call_details, request_iterator):
return self._thunk(client_call_details.method)(
request_iterator,
timeout=client_call_details.timeout,
metadata=client_call_details.metadata,
credentials=client_call_details.credentials)
client_call_details = _ClientCallDetails(self._method, timeout,
metadata, credentials)
try:
return self._interceptor.intercept_stream_stream(
continuation, client_call_details, request_iterator)
except Exception as exception: # pylint:disable=broad-except
return _LocalFailure(exception, sys.exc_info()[2])
class _Channel(grpc.Channel):
def __init__(self, channel, interceptor):
self._channel = channel
self._interceptor = interceptor
def subscribe(self, *args, **kwargs):
self._channel.subscribe(*args, **kwargs)
def unsubscribe(self, *args, **kwargs):
self._channel.unsubscribe(*args, **kwargs)
def unary_unary(self,
method,
request_serializer=None,
response_deserializer=None):
thunk = lambda m: self._channel.unary_unary(m, request_serializer, response_deserializer)
if isinstance(self._interceptor, grpc.UnaryUnaryClientInterceptor):
return _UnaryUnaryMultiCallable(thunk, method, self._interceptor)
else:
return thunk(method)
def unary_stream(self,
method,
request_serializer=None,
response_deserializer=None):
thunk = lambda m: self._channel.unary_stream(m, request_serializer, response_deserializer)
if isinstance(self._interceptor, grpc.UnaryStreamClientInterceptor):
return _UnaryStreamMultiCallable(thunk, method, self._interceptor)
else:
return thunk(method)
def stream_unary(self,
method,
request_serializer=None,
response_deserializer=None):
thunk = lambda m: self._channel.stream_unary(m, request_serializer, response_deserializer)
if isinstance(self._interceptor, grpc.StreamUnaryClientInterceptor):
return _StreamUnaryMultiCallable(thunk, method, self._interceptor)
else:
return thunk(method)
def stream_stream(self,
method,
request_serializer=None,
response_deserializer=None):
thunk = lambda m: self._channel.stream_stream(m, request_serializer, response_deserializer)
if isinstance(self._interceptor, grpc.StreamStreamClientInterceptor):
return _StreamStreamMultiCallable(thunk, method, self._interceptor)
else:
return thunk(method)
def intercept_channel(channel, *interceptors):
for interceptor in reversed(list(interceptors)):
if not isinstance(interceptor, grpc.UnaryUnaryClientInterceptor) and \
not isinstance(interceptor, grpc.UnaryStreamClientInterceptor) and \
not isinstance(interceptor, grpc.StreamUnaryClientInterceptor) and \
not isinstance(interceptor, grpc.StreamStreamClientInterceptor):
raise TypeError('interceptor must be '
'grpc.UnaryUnaryClientInterceptor or '
'grpc.UnaryStreamClientInterceptor or '
'grpc.StreamUnaryClientInterceptor or '
'grpc.StreamStreamClientInterceptor or ')
channel = _Channel(channel, interceptor)
return channel

@ -23,6 +23,7 @@ import six
import grpc
from grpc import _common
from grpc import _interceptor
from grpc._cython import cygrpc
from grpc.framework.foundation import callable_util
@ -541,17 +542,25 @@ def _handle_stream_stream(rpc_event, state, method_handler, thread_pool):
method_handler.request_deserializer, method_handler.response_serializer)
def _find_method_handler(rpc_event, generic_handlers):
for generic_handler in generic_handlers:
method_handler = generic_handler.service(
_HandlerCallDetails(
_common.decode(rpc_event.request_call_details.method),
rpc_event.request_metadata))
if method_handler is not None:
return method_handler
else:
def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline):
def query_handlers(handler_call_details):
for generic_handler in generic_handlers:
method_handler = generic_handler.service(handler_call_details)
if method_handler is not None:
return method_handler
return None
handler_call_details = _HandlerCallDetails(
_common.decode(rpc_event.request_call_details.method),
rpc_event.request_metadata)
if interceptor_pipeline is not None:
return interceptor_pipeline.execute(query_handlers,
handler_call_details)
else:
return query_handlers(handler_call_details)
def _reject_rpc(rpc_event, status, details):
operations = (cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS),
@ -587,13 +596,14 @@ def _handle_with_method_handler(rpc_event, method_handler, thread_pool):
method_handler, thread_pool)
def _handle_call(rpc_event, generic_handlers, thread_pool,
def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool,
concurrency_exceeded):
if not rpc_event.success:
return None, None
if rpc_event.request_call_details.method is not None:
try:
method_handler = _find_method_handler(rpc_event, generic_handlers)
method_handler = _find_method_handler(rpc_event, generic_handlers,
interceptor_pipeline)
except Exception as exception: # pylint: disable=broad-except
details = 'Exception servicing handler: {}'.format(exception)
logging.exception(details)
@ -621,12 +631,14 @@ class _ServerStage(enum.Enum):
class _ServerState(object):
def __init__(self, completion_queue, server, generic_handlers, thread_pool,
maximum_concurrent_rpcs):
# pylint: disable=too-many-arguments
def __init__(self, completion_queue, server, generic_handlers,
interceptor_pipeline, thread_pool, maximum_concurrent_rpcs):
self.lock = threading.Lock()
self.completion_queue = completion_queue
self.server = server
self.generic_handlers = list(generic_handlers)
self.interceptor_pipeline = interceptor_pipeline
self.thread_pool = thread_pool
self.stage = _ServerStage.STOPPED
self.shutdown_events = None
@ -691,8 +703,8 @@ def _serve(state):
state.maximum_concurrent_rpcs is not None and
state.active_rpc_count >= state.maximum_concurrent_rpcs)
rpc_state, rpc_future = _handle_call(
event, state.generic_handlers, state.thread_pool,
concurrency_exceeded)
event, state.generic_handlers, state.interceptor_pipeline,
state.thread_pool, concurrency_exceeded)
if rpc_state is not None:
state.rpc_states.add(rpc_state)
if rpc_future is not None:
@ -780,12 +792,14 @@ def _start(state):
class Server(grpc.Server):
def __init__(self, thread_pool, generic_handlers, options,
# pylint: disable=too-many-arguments
def __init__(self, thread_pool, generic_handlers, interceptors, options,
maximum_concurrent_rpcs):
completion_queue = cygrpc.CompletionQueue()
server = cygrpc.Server(_common.channel_args(options))
server.register_completion_queue(completion_queue)
self._state = _ServerState(completion_queue, server, generic_handlers,
_interceptor.service_pipeline(interceptors),
thread_pool, maximum_concurrent_rpcs)
def add_generic_rpc_handlers(self, generic_rpc_handlers):

@ -39,6 +39,7 @@
"unit._cython.cygrpc_test.TypeSmokeTest",
"unit._empty_message_test.EmptyMessageTest",
"unit._exit_test.ExitTest",
"unit._interceptor_test.InterceptorTest",
"unit._invalid_metadata_test.InvalidMetadataTest",
"unit._invocation_defects_test.InvocationDefectsTest",
"unit._metadata_code_details_test.MetadataCodeDetailsTest",

@ -33,18 +33,21 @@ class AllTest(unittest.TestCase):
'AuthMetadataPlugin', 'ServerCertificateConfiguration',
'ServerCredentials', 'UnaryUnaryMultiCallable',
'UnaryStreamMultiCallable', 'StreamUnaryMultiCallable',
'StreamStreamMultiCallable', 'Channel', 'ServicerContext',
'StreamStreamMultiCallable', 'UnaryUnaryClientInterceptor',
'UnaryStreamClientInterceptor', 'StreamUnaryClientInterceptor',
'StreamStreamClientInterceptor', 'Channel', 'ServicerContext',
'RpcMethodHandler', 'HandlerCallDetails', 'GenericRpcHandler',
'ServiceRpcHandler', 'Server', 'unary_unary_rpc_method_handler',
'unary_stream_rpc_method_handler',
'stream_unary_rpc_method_handler',
'ServiceRpcHandler', 'Server', 'ServerInterceptor',
'unary_unary_rpc_method_handler', 'unary_stream_rpc_method_handler',
'stream_unary_rpc_method_handler', 'ClientCallDetails',
'stream_stream_rpc_method_handler',
'method_handlers_generic_handler', 'ssl_channel_credentials',
'metadata_call_credentials', 'access_token_call_credentials',
'composite_call_credentials', 'composite_channel_credentials',
'ssl_server_credentials', 'ssl_server_certificate_configuration',
'dynamic_ssl_server_credentials', 'channel_ready_future',
'insecure_channel', 'secure_channel', 'server',)
'insecure_channel', 'secure_channel', 'intercept_channel',
'server',)
six.assertCountEqual(self, expected_grpc_code_elements,
_from_grpc_import_star.GRPC_ELEMENTS)

@ -0,0 +1,571 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test of gRPC Python interceptors."""
import collections
import itertools
import threading
import unittest
from concurrent import futures
import grpc
from grpc.framework.foundation import logging_pool
from tests.unit.framework.common import test_constants
from tests.unit.framework.common import test_control
_SERIALIZE_REQUEST = lambda bytestring: bytestring * 2
_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:]
_SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3
_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3]
_UNARY_UNARY = '/test/UnaryUnary'
_UNARY_STREAM = '/test/UnaryStream'
_STREAM_UNARY = '/test/StreamUnary'
_STREAM_STREAM = '/test/StreamStream'
class _Callback(object):
def __init__(self):
self._condition = threading.Condition()
self._value = None
self._called = False
def __call__(self, value):
with self._condition:
self._value = value
self._called = True
self._condition.notify_all()
def value(self):
with self._condition:
while not self._called:
self._condition.wait()
return self._value
class _Handler(object):
def __init__(self, control):
self._control = control
def handle_unary_unary(self, request, servicer_context):
self._control.control()
if servicer_context is not None:
servicer_context.set_trailing_metadata((('testkey', 'testvalue',),))
return request
def handle_unary_stream(self, request, servicer_context):
for _ in range(test_constants.STREAM_LENGTH):
self._control.control()
yield request
self._control.control()
if servicer_context is not None:
servicer_context.set_trailing_metadata((('testkey', 'testvalue',),))
def handle_stream_unary(self, request_iterator, servicer_context):
if servicer_context is not None:
servicer_context.invocation_metadata()
self._control.control()
response_elements = []
for request in request_iterator:
self._control.control()
response_elements.append(request)
self._control.control()
if servicer_context is not None:
servicer_context.set_trailing_metadata((('testkey', 'testvalue',),))
return b''.join(response_elements)
def handle_stream_stream(self, request_iterator, servicer_context):
self._control.control()
if servicer_context is not None:
servicer_context.set_trailing_metadata((('testkey', 'testvalue',),))
for request in request_iterator:
self._control.control()
yield request
self._control.control()
class _MethodHandler(grpc.RpcMethodHandler):
def __init__(self, request_streaming, response_streaming,
request_deserializer, response_serializer, unary_unary,
unary_stream, stream_unary, stream_stream):
self.request_streaming = request_streaming
self.response_streaming = response_streaming
self.request_deserializer = request_deserializer
self.response_serializer = response_serializer
self.unary_unary = unary_unary
self.unary_stream = unary_stream
self.stream_unary = stream_unary
self.stream_stream = stream_stream
class _GenericHandler(grpc.GenericRpcHandler):
def __init__(self, handler):
self._handler = handler
def service(self, handler_call_details):
if handler_call_details.method == _UNARY_UNARY:
return _MethodHandler(False, False, None, None,
self._handler.handle_unary_unary, None, None,
None)
elif handler_call_details.method == _UNARY_STREAM:
return _MethodHandler(False, True, _DESERIALIZE_REQUEST,
_SERIALIZE_RESPONSE, None,
self._handler.handle_unary_stream, None, None)
elif handler_call_details.method == _STREAM_UNARY:
return _MethodHandler(True, False, _DESERIALIZE_REQUEST,
_SERIALIZE_RESPONSE, None, None,
self._handler.handle_stream_unary, None)
elif handler_call_details.method == _STREAM_STREAM:
return _MethodHandler(True, True, None, None, None, None, None,
self._handler.handle_stream_stream)
else:
return None
def _unary_unary_multi_callable(channel):
return channel.unary_unary(_UNARY_UNARY)
def _unary_stream_multi_callable(channel):
return channel.unary_stream(
_UNARY_STREAM,
request_serializer=_SERIALIZE_REQUEST,
response_deserializer=_DESERIALIZE_RESPONSE)
def _stream_unary_multi_callable(channel):
return channel.stream_unary(
_STREAM_UNARY,
request_serializer=_SERIALIZE_REQUEST,
response_deserializer=_DESERIALIZE_RESPONSE)
def _stream_stream_multi_callable(channel):
return channel.stream_stream(_STREAM_STREAM)
class _ClientCallDetails(
collections.namedtuple('_ClientCallDetails',
('method', 'timeout', 'metadata',
'credentials')), grpc.ClientCallDetails):
pass
class _GenericClientInterceptor(
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor):
def __init__(self, interceptor_function):
self._fn = interceptor_function
def intercept_unary_unary(self, continuation, client_call_details, request):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, iter((request,)), False, False)
response = continuation(new_details, next(new_request_iterator))
return postprocess(response) if postprocess else response
def intercept_unary_stream(self, continuation, client_call_details,
request):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, iter((request,)), False, True)
response_it = continuation(new_details, new_request_iterator)
return postprocess(response_it) if postprocess else response_it
def intercept_stream_unary(self, continuation, client_call_details,
request_iterator):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, request_iterator, True, False)
response = continuation(new_details, next(new_request_iterator))
return postprocess(response) if postprocess else response
def intercept_stream_stream(self, continuation, client_call_details,
request_iterator):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, request_iterator, True, True)
response_it = continuation(new_details, new_request_iterator)
return postprocess(response_it) if postprocess else response_it
class _LoggingInterceptor(
grpc.ServerInterceptor, grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor, grpc.StreamUnaryClientInterceptor,
grpc.StreamStreamClientInterceptor):
def __init__(self, tag, record):
self.tag = tag
self.record = record
def intercept_service(self, continuation, handler_call_details):
self.record.append(self.tag + ':intercept_service')
return continuation(handler_call_details)
def intercept_unary_unary(self, continuation, client_call_details, request):
self.record.append(self.tag + ':intercept_unary_unary')
return continuation(client_call_details, request)
def intercept_unary_stream(self, continuation, client_call_details,
request):
self.record.append(self.tag + ':intercept_unary_stream')
return continuation(client_call_details, request)
def intercept_stream_unary(self, continuation, client_call_details,
request_iterator):
self.record.append(self.tag + ':intercept_stream_unary')
return continuation(client_call_details, request_iterator)
def intercept_stream_stream(self, continuation, client_call_details,
request_iterator):
self.record.append(self.tag + ':intercept_stream_stream')
return continuation(client_call_details, request_iterator)
class _DefectiveClientInterceptor(grpc.UnaryUnaryClientInterceptor):
def intercept_unary_unary(self, ignored_continuation,
ignored_client_call_details, ignored_request):
raise test_control.Defect()
def _wrap_request_iterator_stream_interceptor(wrapper):
def intercept_call(client_call_details, request_iterator, request_streaming,
ignored_response_streaming):
if request_streaming:
return client_call_details, wrapper(request_iterator), None
else:
return client_call_details, request_iterator, None
return _GenericClientInterceptor(intercept_call)
def _append_request_header_interceptor(header, value):
def intercept_call(client_call_details, request_iterator,
ignored_request_streaming, ignored_response_streaming):
metadata = []
if client_call_details.metadata:
metadata = list(client_call_details.metadata)
metadata.append((header, value,))
client_call_details = _ClientCallDetails(
client_call_details.method, client_call_details.timeout, metadata,
client_call_details.credentials)
return client_call_details, request_iterator, None
return _GenericClientInterceptor(intercept_call)
class _GenericServerInterceptor(grpc.ServerInterceptor):
def __init__(self, fn):
self._fn = fn
def intercept_service(self, continuation, handler_call_details):
return self._fn(continuation, handler_call_details)
def _filter_server_interceptor(condition, interceptor):
def intercept_service(continuation, handler_call_details):
if condition(handler_call_details):
return interceptor.intercept_service(continuation,
handler_call_details)
return continuation(handler_call_details)
return _GenericServerInterceptor(intercept_service)
class InterceptorTest(unittest.TestCase):
def setUp(self):
self._control = test_control.PauseFailControl()
self._handler = _Handler(self._control)
self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
self._record = []
conditional_interceptor = _filter_server_interceptor(
lambda x: ('secret', '42') in x.invocation_metadata,
_LoggingInterceptor('s3', self._record))
self._server = grpc.server(
self._server_pool,
interceptors=(_LoggingInterceptor('s1', self._record),
conditional_interceptor,
_LoggingInterceptor('s2', self._record),))
port = self._server.add_insecure_port('[::]:0')
self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),))
self._server.start()
self._channel = grpc.insecure_channel('localhost:%d' % port)
def tearDown(self):
self._server.stop(None)
self._server_pool.shutdown(wait=True)
def testTripleRequestMessagesClientInterceptor(self):
def triple(request_iterator):
while True:
try:
item = next(request_iterator)
yield item
yield item
yield item
except StopIteration:
break
interceptor = _wrap_request_iterator_stream_interceptor(triple)
channel = grpc.intercept_channel(self._channel, interceptor)
requests = tuple(b'\x07\x08'
for _ in range(test_constants.STREAM_LENGTH))
multi_callable = _stream_stream_multi_callable(channel)
response_iterator = multi_callable(
iter(requests),
metadata=(
('test',
'InterceptedStreamRequestBlockingUnaryResponseWithCall'),))
responses = tuple(response_iterator)
self.assertEqual(len(responses), 3 * test_constants.STREAM_LENGTH)
multi_callable = _stream_stream_multi_callable(self._channel)
response_iterator = multi_callable(
iter(requests),
metadata=(
('test',
'InterceptedStreamRequestBlockingUnaryResponseWithCall'),))
responses = tuple(response_iterator)
self.assertEqual(len(responses), test_constants.STREAM_LENGTH)
def testDefectiveClientInterceptor(self):
interceptor = _DefectiveClientInterceptor()
defective_channel = grpc.intercept_channel(self._channel, interceptor)
request = b'\x07\x08'
multi_callable = _unary_unary_multi_callable(defective_channel)
call_future = multi_callable.future(
request,
metadata=(
('test', 'InterceptedUnaryRequestBlockingUnaryResponse'),))
self.assertIsNotNone(call_future.exception())
self.assertEqual(call_future.code(), grpc.StatusCode.INTERNAL)
def testInterceptedHeaderManipulationWithServerSideVerification(self):
request = b'\x07\x08'
channel = grpc.intercept_channel(
self._channel, _append_request_header_interceptor('secret', '42'))
channel = grpc.intercept_channel(
channel,
_LoggingInterceptor('c1', self._record),
_LoggingInterceptor('c2', self._record))
self._record[:] = []
multi_callable = _unary_unary_multi_callable(channel)
multi_callable.with_call(
request,
metadata=(
('test',
'InterceptedUnaryRequestBlockingUnaryResponseWithCall'),))
self.assertSequenceEqual(self._record, [
'c1:intercept_unary_unary', 'c2:intercept_unary_unary',
's1:intercept_service', 's3:intercept_service',
's2:intercept_service'
])
def testInterceptedUnaryRequestBlockingUnaryResponse(self):
request = b'\x07\x08'
self._record[:] = []
channel = grpc.intercept_channel(
self._channel,
_LoggingInterceptor('c1', self._record),
_LoggingInterceptor('c2', self._record))
multi_callable = _unary_unary_multi_callable(channel)
multi_callable(
request,
metadata=(
('test', 'InterceptedUnaryRequestBlockingUnaryResponse'),))
self.assertSequenceEqual(self._record, [
'c1:intercept_unary_unary', 'c2:intercept_unary_unary',
's1:intercept_service', 's2:intercept_service'
])
def testInterceptedUnaryRequestBlockingUnaryResponseWithCall(self):
request = b'\x07\x08'
channel = grpc.intercept_channel(
self._channel,
_LoggingInterceptor('c1', self._record),
_LoggingInterceptor('c2', self._record))
self._record[:] = []
multi_callable = _unary_unary_multi_callable(channel)
multi_callable.with_call(
request,
metadata=(
('test',
'InterceptedUnaryRequestBlockingUnaryResponseWithCall'),))
self.assertSequenceEqual(self._record, [
'c1:intercept_unary_unary', 'c2:intercept_unary_unary',
's1:intercept_service', 's2:intercept_service'
])
def testInterceptedUnaryRequestFutureUnaryResponse(self):
request = b'\x07\x08'
self._record[:] = []
channel = grpc.intercept_channel(
self._channel,
_LoggingInterceptor('c1', self._record),
_LoggingInterceptor('c2', self._record))
multi_callable = _unary_unary_multi_callable(channel)
response_future = multi_callable.future(
request,
metadata=(('test', 'InterceptedUnaryRequestFutureUnaryResponse'),))
response_future.result()
self.assertSequenceEqual(self._record, [
'c1:intercept_unary_unary', 'c2:intercept_unary_unary',
's1:intercept_service', 's2:intercept_service'
])
def testInterceptedUnaryRequestStreamResponse(self):
request = b'\x37\x58'
self._record[:] = []
channel = grpc.intercept_channel(
self._channel,
_LoggingInterceptor('c1', self._record),
_LoggingInterceptor('c2', self._record))
multi_callable = _unary_stream_multi_callable(channel)
response_iterator = multi_callable(
request,
metadata=(('test', 'InterceptedUnaryRequestStreamResponse'),))
tuple(response_iterator)
self.assertSequenceEqual(self._record, [
'c1:intercept_unary_stream', 'c2:intercept_unary_stream',
's1:intercept_service', 's2:intercept_service'
])
def testInterceptedStreamRequestBlockingUnaryResponse(self):
requests = tuple(b'\x07\x08'
for _ in range(test_constants.STREAM_LENGTH))
request_iterator = iter(requests)
self._record[:] = []
channel = grpc.intercept_channel(
self._channel,
_LoggingInterceptor('c1', self._record),
_LoggingInterceptor('c2', self._record))
multi_callable = _stream_unary_multi_callable(channel)
multi_callable(
request_iterator,
metadata=(
('test', 'InterceptedStreamRequestBlockingUnaryResponse'),))
self.assertSequenceEqual(self._record, [
'c1:intercept_stream_unary', 'c2:intercept_stream_unary',
's1:intercept_service', 's2:intercept_service'
])
def testInterceptedStreamRequestBlockingUnaryResponseWithCall(self):
requests = tuple(b'\x07\x08'
for _ in range(test_constants.STREAM_LENGTH))
request_iterator = iter(requests)
self._record[:] = []
channel = grpc.intercept_channel(
self._channel,
_LoggingInterceptor('c1', self._record),
_LoggingInterceptor('c2', self._record))
multi_callable = _stream_unary_multi_callable(channel)
multi_callable.with_call(
request_iterator,
metadata=(
('test',
'InterceptedStreamRequestBlockingUnaryResponseWithCall'),))
self.assertSequenceEqual(self._record, [
'c1:intercept_stream_unary', 'c2:intercept_stream_unary',
's1:intercept_service', 's2:intercept_service'
])
def testInterceptedStreamRequestFutureUnaryResponse(self):
requests = tuple(b'\x07\x08'
for _ in range(test_constants.STREAM_LENGTH))
request_iterator = iter(requests)
self._record[:] = []
channel = grpc.intercept_channel(
self._channel,
_LoggingInterceptor('c1', self._record),
_LoggingInterceptor('c2', self._record))
multi_callable = _stream_unary_multi_callable(channel)
response_future = multi_callable.future(
request_iterator,
metadata=(('test', 'InterceptedStreamRequestFutureUnaryResponse'),))
response_future.result()
self.assertSequenceEqual(self._record, [
'c1:intercept_stream_unary', 'c2:intercept_stream_unary',
's1:intercept_service', 's2:intercept_service'
])
def testInterceptedStreamRequestStreamResponse(self):
requests = tuple(b'\x77\x58'
for _ in range(test_constants.STREAM_LENGTH))
request_iterator = iter(requests)
self._record[:] = []
channel = grpc.intercept_channel(
self._channel,
_LoggingInterceptor('c1', self._record),
_LoggingInterceptor('c2', self._record))
multi_callable = _stream_stream_multi_callable(channel)
response_iterator = multi_callable(
request_iterator,
metadata=(('test', 'InterceptedStreamRequestStreamResponse'),))
tuple(response_iterator)
self.assertSequenceEqual(self._record, [
'c1:intercept_stream_stream', 'c2:intercept_stream_stream',
's1:intercept_service', 's2:intercept_service'
])
if __name__ == '__main__':
unittest.main(verbosity=2)
Loading…
Cancel
Save