mirror of https://github.com/grpc/grpc.git
[Python fix-it][Aysnc Example] Adding examples for aysnc interceptor context propagaton (#32840)
### Description Fix https://github.com/grpc/grpc/issues/24470. Adding one example which demonstrate the following use cases: * Generate RPC ID on client side and propagate to server. * Context propagation from client to server. * Context propagation between different server interceptors and the server handler. ## Use: 1. Start server: `python3 -m async_greeter_server_with_interceptor` 2. Start client: `python3 -m async_greeter_client` ### Expected Logs: * On client side: ``` Sending request with rpc id: 73bb98beff10c2dd7b9f2252a1e2039e Greeter client received: Hello, you! ``` * On server side: ``` INFO:root:Starting server on [::]:50051 INFO:root:Interceptor1 called with rpc_id: default INFO:root:Interceptor2 called with rpc_id: Interceptor1-default INFO:root:Handle rpc with id Interceptor2-Interceptor1-73bb98beff10c2dd7b9f2252a1e2039e in server handler. ```pull/33019/head
parent
0e7cc360eb
commit
40f20c0f48
8 changed files with 286 additions and 2 deletions
@ -0,0 +1,43 @@ |
|||||||
|
# gRPC Python Async Interceptor Example |
||||||
|
|
||||||
|
This example demonstrate the usage of Async interceptors and context propagation using [contextvars](https://docs.python.org/3/library/contextvars.html#module-contextvars). |
||||||
|
|
||||||
|
## When to use contextvars |
||||||
|
|
||||||
|
`Contextvars` can be used to propagate context in a same thread or coroutine, some example usage include: |
||||||
|
|
||||||
|
1. Propagate from interceptor to another interceptor. |
||||||
|
2. Propagate from interceptor to the server handler. |
||||||
|
3. Propagate from client to server. |
||||||
|
|
||||||
|
## How does this example works |
||||||
|
|
||||||
|
This example have the following steps: |
||||||
|
1. Generate RPC ID on client side and propagate to server using `metadata`. |
||||||
|
* `contextvars` can be used here if client and server is running in a same coroutine (or same thead for Sync). |
||||||
|
2. Server interceptor1 intercept the request, it checks `rpc_id_var` and decorate it with it's tag `Interceptor1`. |
||||||
|
3. Server interceptor2 intercept the request, it checks `rpc_id_var` and decorate it with it's tag `Interceptor2`. |
||||||
|
4. Server handler receives the request with `rpc_id_var` decorated by both interceptor1 and interceptor2. |
||||||
|
|
||||||
|
## How to run this example |
||||||
|
|
||||||
|
1. Start server: `python3 -m async_greeter_server_with_interceptor` |
||||||
|
2. Start client: `python3 -m async_greeter_client` |
||||||
|
|
||||||
|
### Expected outcome |
||||||
|
|
||||||
|
* On client side, you should see logs similar to: |
||||||
|
|
||||||
|
``` |
||||||
|
Sending request with rpc id: 59ac966558b3d7d11a06bd45f1a0f89d |
||||||
|
Greeter client received: Hello, you! |
||||||
|
``` |
||||||
|
|
||||||
|
* On server side, you should see logs similar to: |
||||||
|
|
||||||
|
``` |
||||||
|
INFO:root:Starting server on [::]:50051 |
||||||
|
INFO:root:Interceptor1 called with rpc_id: default |
||||||
|
INFO:root:Interceptor2 called with rpc_id: Interceptor1-59ac966558b3d7d11a06bd45f1a0f89d |
||||||
|
INFO:root:Handle rpc with id Interceptor2-Interceptor1-59ac966558b3d7d11a06bd45f1a0f89d in server handler. |
||||||
|
``` |
@ -0,0 +1,41 @@ |
|||||||
|
# Copyright 2023 gRPC authors. |
||||||
|
# |
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
# you may not use this file except in compliance with the License. |
||||||
|
# You may obtain a copy of the License at |
||||||
|
# |
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
# |
||||||
|
# Unless required by applicable law or agreed to in writing, software |
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
# See the License for the specific language governing permissions and |
||||||
|
# limitations under the License. |
||||||
|
"""The Python AsyncIO implementation of the GRPC helloworld.Greeter client.""" |
||||||
|
|
||||||
|
import asyncio |
||||||
|
import contextvars |
||||||
|
import logging |
||||||
|
import random |
||||||
|
|
||||||
|
import grpc |
||||||
|
import helloworld_pb2 |
||||||
|
import helloworld_pb2_grpc |
||||||
|
|
||||||
|
test_var = contextvars.ContextVar('test', default='test') |
||||||
|
|
||||||
|
|
||||||
|
async def run() -> None: |
||||||
|
async with grpc.aio.insecure_channel('localhost:50051') as channel: |
||||||
|
stub = helloworld_pb2_grpc.GreeterStub(channel) |
||||||
|
rpc_id = '{:032x}'.format(random.getrandbits(128)) |
||||||
|
metadata = grpc.aio.Metadata(('client-rpc-id', rpc_id),) |
||||||
|
print(f"Sending request with rpc id: {rpc_id}") |
||||||
|
response = await stub.SayHello(helloworld_pb2.HelloRequest(name='you'), |
||||||
|
metadata=metadata) |
||||||
|
print("Greeter client received: " + response.message) |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__': |
||||||
|
logging.basicConfig() |
||||||
|
asyncio.run(run()) |
@ -0,0 +1,83 @@ |
|||||||
|
# Copyright 2023 gRPC authors. |
||||||
|
# |
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
# you may not use this file except in compliance with the License. |
||||||
|
# You may obtain a copy of the License at |
||||||
|
# |
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
# |
||||||
|
# Unless required by applicable law or agreed to in writing, software |
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
# See the License for the specific language governing permissions and |
||||||
|
# limitations under the License. |
||||||
|
"""The Python AsyncIO implementation of the GRPC helloworld.Greeter server.""" |
||||||
|
|
||||||
|
import asyncio |
||||||
|
import contextvars |
||||||
|
import logging |
||||||
|
from typing import Awaitable, Callable, Optional |
||||||
|
|
||||||
|
import grpc |
||||||
|
import helloworld_pb2 |
||||||
|
import helloworld_pb2_grpc |
||||||
|
|
||||||
|
rpc_id_var = contextvars.ContextVar('rpc_id', default='default') |
||||||
|
|
||||||
|
|
||||||
|
class RPCIdInterceptor(grpc.aio.ServerInterceptor): |
||||||
|
|
||||||
|
def __init__(self, tag: str, rpc_id: Optional[str] = None) -> None: |
||||||
|
self.tag = tag |
||||||
|
self.rpc_id = rpc_id |
||||||
|
|
||||||
|
async def intercept_service( |
||||||
|
self, continuation: Callable[[grpc.HandlerCallDetails], |
||||||
|
Awaitable[grpc.RpcMethodHandler]], |
||||||
|
handler_call_details: grpc.HandlerCallDetails |
||||||
|
) -> grpc.RpcMethodHandler: |
||||||
|
""" |
||||||
|
This interceptor prepends its tag to the rpc_id. |
||||||
|
If two of these interceptors are chained together, the resulting rpc_id |
||||||
|
will be something like this: Interceptor2-Interceptor1-RPC_ID. |
||||||
|
""" |
||||||
|
logging.info("%s called with rpc_id: %s", self.tag, rpc_id_var.get()) |
||||||
|
if rpc_id_var.get() == 'default': |
||||||
|
_metadata = dict(handler_call_details.invocation_metadata) |
||||||
|
rpc_id_var.set(self.decorate(_metadata['client-rpc-id'])) |
||||||
|
else: |
||||||
|
rpc_id_var.set(self.decorate(rpc_id_var.get())) |
||||||
|
return await continuation(handler_call_details) |
||||||
|
|
||||||
|
def decorate(self, rpc_id: str): |
||||||
|
return f"{self.tag}-{rpc_id}" |
||||||
|
|
||||||
|
|
||||||
|
class Greeter(helloworld_pb2_grpc.GreeterServicer): |
||||||
|
|
||||||
|
async def SayHello( |
||||||
|
self, request: helloworld_pb2.HelloRequest, |
||||||
|
context: grpc.aio.ServicerContext) -> helloworld_pb2.HelloReply: |
||||||
|
logging.info("Handle rpc with id %s in server handler.", |
||||||
|
rpc_id_var.get()) |
||||||
|
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name) |
||||||
|
|
||||||
|
|
||||||
|
async def serve() -> None: |
||||||
|
interceptors = [ |
||||||
|
RPCIdInterceptor('Interceptor1'), |
||||||
|
RPCIdInterceptor('Interceptor2') |
||||||
|
] |
||||||
|
|
||||||
|
server = grpc.aio.server(interceptors=interceptors) |
||||||
|
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) |
||||||
|
listen_addr = '[::]:50051' |
||||||
|
server.add_insecure_port(listen_addr) |
||||||
|
logging.info("Starting server on %s", listen_addr) |
||||||
|
await server.start() |
||||||
|
await server.wait_for_termination() |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__': |
||||||
|
logging.basicConfig(level=logging.INFO) |
||||||
|
asyncio.run(serve()) |
@ -0,0 +1,30 @@ |
|||||||
|
# -*- coding: utf-8 -*- |
||||||
|
# Generated by the protocol buffer compiler. DO NOT EDIT! |
||||||
|
# source: helloworld.proto |
||||||
|
"""Generated protocol buffer code.""" |
||||||
|
from google.protobuf.internal import builder as _builder |
||||||
|
from google.protobuf import descriptor as _descriptor |
||||||
|
from google.protobuf import descriptor_pool as _descriptor_pool |
||||||
|
from google.protobuf import symbol_database as _symbol_database |
||||||
|
# @@protoc_insertion_point(imports) |
||||||
|
|
||||||
|
_sym_db = _symbol_database.Default() |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10helloworld.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2I\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x42\x36\n\x1bio.grpc.examples.helloworldB\x0fHelloWorldProtoP\x01\xa2\x02\x03HLWb\x06proto3') |
||||||
|
|
||||||
|
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) |
||||||
|
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'helloworld_pb2', globals()) |
||||||
|
if _descriptor._USE_C_DESCRIPTORS == False: |
||||||
|
|
||||||
|
DESCRIPTOR._options = None |
||||||
|
DESCRIPTOR._serialized_options = b'\n\033io.grpc.examples.helloworldB\017HelloWorldProtoP\001\242\002\003HLW' |
||||||
|
_HELLOREQUEST._serialized_start=32 |
||||||
|
_HELLOREQUEST._serialized_end=60 |
||||||
|
_HELLOREPLY._serialized_start=62 |
||||||
|
_HELLOREPLY._serialized_end=91 |
||||||
|
_GREETER._serialized_start=93 |
||||||
|
_GREETER._serialized_end=166 |
||||||
|
# @@protoc_insertion_point(module_scope) |
@ -0,0 +1,17 @@ |
|||||||
|
from google.protobuf import descriptor as _descriptor |
||||||
|
from google.protobuf import message as _message |
||||||
|
from typing import ClassVar as _ClassVar, Optional as _Optional |
||||||
|
|
||||||
|
DESCRIPTOR: _descriptor.FileDescriptor |
||||||
|
|
||||||
|
class HelloReply(_message.Message): |
||||||
|
__slots__ = ["message"] |
||||||
|
MESSAGE_FIELD_NUMBER: _ClassVar[int] |
||||||
|
message: str |
||||||
|
def __init__(self, message: _Optional[str] = ...) -> None: ... |
||||||
|
|
||||||
|
class HelloRequest(_message.Message): |
||||||
|
__slots__ = ["name"] |
||||||
|
NAME_FIELD_NUMBER: _ClassVar[int] |
||||||
|
name: str |
||||||
|
def __init__(self, name: _Optional[str] = ...) -> None: ... |
@ -0,0 +1,70 @@ |
|||||||
|
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! |
||||||
|
"""Client and server classes corresponding to protobuf-defined services.""" |
||||||
|
import grpc |
||||||
|
|
||||||
|
import helloworld_pb2 as helloworld__pb2 |
||||||
|
|
||||||
|
|
||||||
|
class GreeterStub(object): |
||||||
|
"""The greeting service definition. |
||||||
|
""" |
||||||
|
|
||||||
|
def __init__(self, channel): |
||||||
|
"""Constructor. |
||||||
|
|
||||||
|
Args: |
||||||
|
channel: A grpc.Channel. |
||||||
|
""" |
||||||
|
self.SayHello = channel.unary_unary( |
||||||
|
'/helloworld.Greeter/SayHello', |
||||||
|
request_serializer=helloworld__pb2.HelloRequest.SerializeToString, |
||||||
|
response_deserializer=helloworld__pb2.HelloReply.FromString, |
||||||
|
) |
||||||
|
|
||||||
|
|
||||||
|
class GreeterServicer(object): |
||||||
|
"""The greeting service definition. |
||||||
|
""" |
||||||
|
|
||||||
|
def SayHello(self, request, context): |
||||||
|
"""Sends a greeting |
||||||
|
""" |
||||||
|
context.set_code(grpc.StatusCode.UNIMPLEMENTED) |
||||||
|
context.set_details('Method not implemented!') |
||||||
|
raise NotImplementedError('Method not implemented!') |
||||||
|
|
||||||
|
|
||||||
|
def add_GreeterServicer_to_server(servicer, server): |
||||||
|
rpc_method_handlers = { |
||||||
|
'SayHello': grpc.unary_unary_rpc_method_handler( |
||||||
|
servicer.SayHello, |
||||||
|
request_deserializer=helloworld__pb2.HelloRequest.FromString, |
||||||
|
response_serializer=helloworld__pb2.HelloReply.SerializeToString, |
||||||
|
), |
||||||
|
} |
||||||
|
generic_handler = grpc.method_handlers_generic_handler( |
||||||
|
'helloworld.Greeter', rpc_method_handlers) |
||||||
|
server.add_generic_rpc_handlers((generic_handler,)) |
||||||
|
|
||||||
|
|
||||||
|
# This class is part of an EXPERIMENTAL API. |
||||||
|
class Greeter(object): |
||||||
|
"""The greeting service definition. |
||||||
|
""" |
||||||
|
|
||||||
|
@staticmethod |
||||||
|
def SayHello(request, |
||||||
|
target, |
||||||
|
options=(), |
||||||
|
channel_credentials=None, |
||||||
|
call_credentials=None, |
||||||
|
insecure=False, |
||||||
|
compression=None, |
||||||
|
wait_for_ready=None, |
||||||
|
timeout=None, |
||||||
|
metadata=None): |
||||||
|
return grpc.experimental.unary_unary(request, target, '/helloworld.Greeter/SayHello', |
||||||
|
helloworld__pb2.HelloRequest.SerializeToString, |
||||||
|
helloworld__pb2.HelloReply.FromString, |
||||||
|
options, channel_credentials, |
||||||
|
insecure, call_credentials, compression, wait_for_ready, timeout, metadata) |
Loading…
Reference in new issue