From f2d06b845867b65310250b314fe266be0eba6d60 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 4 Nov 2020 14:33:08 -0800 Subject: [PATCH] Add async bidi-streaming example --- examples/python/async_streaming/README.md | 50 ++++ examples/python/async_streaming/client.py | 119 ++++++++ examples/python/async_streaming/phone.proto | 52 ++++ examples/python/async_streaming/phone_pb2.py | 267 ++++++++++++++++++ .../python/async_streaming/phone_pb2_grpc.py | 65 +++++ examples/python/async_streaming/server.py | 92 ++++++ 6 files changed, 645 insertions(+) create mode 100644 examples/python/async_streaming/README.md create mode 100644 examples/python/async_streaming/client.py create mode 100644 examples/python/async_streaming/phone.proto create mode 100644 examples/python/async_streaming/phone_pb2.py create mode 100644 examples/python/async_streaming/phone_pb2_grpc.py create mode 100644 examples/python/async_streaming/server.py diff --git a/examples/python/async_streaming/README.md b/examples/python/async_streaming/README.md new file mode 100644 index 00000000000..4dac53b0db4 --- /dev/null +++ b/examples/python/async_streaming/README.md @@ -0,0 +1,50 @@ +# gRPC Python Non-Blocking Streaming RPC Client Example + +The goal of this example is to demonstrate how to handle streaming responses +without blocking the current thread. Effectively, this can be achieved by +converting the gRPC Python streaming API into callback-based. + +In this example, the RPC service `Phone` simulates the life cycle of virtual +phone calls. It requires one thread to handle the phone-call session state +changes, and another thread to process the audio stream. In this case, the +normal blocking style API could not fulfill the need easily. Hence, we should +asynchronously execute the streaming RPC. + +## Steps to run this example + +Start the server in one session +``` +python3 server.py +``` + +Start the client in another session +``` +python3 client.py +``` + +## Example Output +``` +$ python3 server.py +INFO:root:Server serving at [::]:50051 +INFO:root:Received a phone call request for number [1415926535] +INFO:root:Created a call session [{ + "sessionId": "0", + "media": "https://link.to.audio.resources" +}] +INFO:root:Call finished [1415926535] +INFO:root:Call session cleaned [{ + "sessionId": "0", + "media": "https://link.to.audio.resources" +}] +``` + +``` +$ python3 client.py +INFO:root:Waiting for peer to connect [1415926535]... +INFO:root:Call toward [1415926535] enters [NEW] state +INFO:root:Call toward [1415926535] enters [ACTIVE] state +INFO:root:Consuming audio resource [https://link.to.audio.resources] +INFO:root:Call toward [1415926535] enters [ENDED] state +INFO:root:Audio session finished [https://link.to.audio.resources] +INFO:root:Call finished! +``` diff --git a/examples/python/async_streaming/client.py b/examples/python/async_streaming/client.py new file mode 100644 index 00000000000..25db69359b6 --- /dev/null +++ b/examples/python/async_streaming/client.py @@ -0,0 +1,119 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import threading +from typing import Iterator +from concurrent.futures import ThreadPoolExecutor + +import grpc + +import phone_pb2 +import phone_pb2_grpc + + +class CallMaker: + + def __init__(self, executor: ThreadPoolExecutor, channel: grpc.Channel, + phone_number: str) -> None: + self._executor = executor + self._channel = channel + self._stub = phone_pb2_grpc.PhoneStub(self._channel) + self._phone_number = phone_number + self._session_id = None + self._audio_session_link = None + self._call_state = None + self._peer_responded = threading.Event() + self._call_finished = threading.Event() + self._consumer_future = None + + def _response_watcher( + self, + response_iterator: Iterator[phone_pb2.StreamCallResponse]) -> None: + try: + for response in response_iterator: + # NOTE: All fields in Proto3 are optional. This is the recommended way + # to check if a field is present or not, or to exam which one-of field is + # fulfilled by this message. + if response.HasField("call_info"): + self._on_call_info(response.call_info) + elif response.HasField("call_state"): + self._on_call_state(response.call_state.state) + else: + raise RuntimeError( + "Received StreamCallResponse without call_info and call_state" + ) + except Exception as e: + self._peer_responded.set() + raise + + def _on_call_info(self, call_info: phone_pb2.CallInfo) -> None: + self._session_id = call_info.session_id + self._audio_session_link = call_info.media + + def _on_call_state(self, call_state: phone_pb2.CallState.State) -> None: + logging.info("Call toward [%s] enters [%s] state", self._phone_number, + phone_pb2.CallState.State.Name(call_state)) + self._call_state = call_state + if call_state is phone_pb2.CallState.State.ACTIVE: + self._peer_responded.set() + if call_state is phone_pb2.CallState.State.ENDED: + self._peer_responded.set() + self._call_finished.set() + + def call(self) -> None: + request = phone_pb2.StreamCallRequest() + request.phone_number = self._phone_number + response_iterator = self._stub.StreamCall(iter((request,))) + # Instead of consuming the response on current thread, spawn a consumption thread. + self._consumer_future = self._executor.submit(self._response_watcher, + response_iterator) + + def wait_peer(self) -> None: + logging.info("Waiting for peer to connect [%s]...", self._phone_number) + self._peer_responded.wait(timeout=None) + if self._consumer_future.done(): + # If the future raises, forwards the exception here + self._consumer_future.result() + return self._call_state is phone_pb2.CallState.State.ACTIVE + + def audio_session(self) -> None: + assert self._audio_session_link is not None + logging.info("Consuming audio resource [%s]", self._audio_session_link) + self._call_finished.wait(timeout=None) + logging.info("Audio session finished [%s]", self._audio_session_link) + + +def process_call(executor: ThreadPoolExecutor, channel: grpc.Channel, + phone_number: str) -> None: + call_maker = CallMaker(executor, channel, phone_number) + call_maker.call() + if call_maker.wait_peer(): + call_maker.audio_session() + logging.info("Call finished!") + else: + logging.info("Call failed: peer didn't answer") + + +def run(): + executor = ThreadPoolExecutor() + with grpc.insecure_channel("localhost:50051") as channel: + future = executor.submit(process_call, executor, channel, + "555-0100-XXXX") + future.result() + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + run() diff --git a/examples/python/async_streaming/phone.proto b/examples/python/async_streaming/phone.proto new file mode 100644 index 00000000000..64c5999316b --- /dev/null +++ b/examples/python/async_streaming/phone.proto @@ -0,0 +1,52 @@ +// Copyright 2020 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package grpc.testing; + +message CallInfo { + string session_id = 1; + string media = 2; +} + +message CallState { + enum State { + // The default state. + UNDEFINED = 0; + // The call is newly created. + NEW = 1; + // The call is connected. + ACTIVE = 6; + // The call is finished. + ENDED = 7; + } + State state = 2; +} + +message StreamCallRequest { + string phone_number = 1; +} + +message StreamCallResponse { + oneof stream_call_response { + CallInfo call_info = 1; + CallState call_state = 2; + } +} + +service Phone { + // Makes a phone call and communicate states via a stream. + rpc StreamCall(stream StreamCallRequest) returns (stream StreamCallResponse); +} diff --git a/examples/python/async_streaming/phone_pb2.py b/examples/python/async_streaming/phone_pb2.py new file mode 100644 index 00000000000..9afdfec6ef9 --- /dev/null +++ b/examples/python/async_streaming/phone_pb2.py @@ -0,0 +1,267 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: phone.proto + +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='phone.proto', + package='grpc.testing', + syntax='proto3', + serialized_options=None, + serialized_pb=b'\n\x0bphone.proto\x12\x0cgrpc.testing\"-\n\x08\x43\x61llInfo\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\r\n\x05media\x18\x02 \x01(\t\"q\n\tCallState\x12,\n\x05state\x18\x02 \x01(\x0e\x32\x1d.grpc.testing.CallState.State\"6\n\x05State\x12\r\n\tUNDEFINED\x10\x00\x12\x07\n\x03NEW\x10\x01\x12\n\n\x06\x41\x43TIVE\x10\x06\x12\t\n\x05\x45NDED\x10\x07\")\n\x11StreamCallRequest\x12\x14\n\x0cphone_number\x18\x01 \x01(\t\"\x88\x01\n\x12StreamCallResponse\x12+\n\tcall_info\x18\x01 \x01(\x0b\x32\x16.grpc.testing.CallInfoH\x00\x12-\n\ncall_state\x18\x02 \x01(\x0b\x32\x17.grpc.testing.CallStateH\x00\x42\x16\n\x14stream_call_response2\\\n\x05Phone\x12S\n\nStreamCall\x12\x1f.grpc.testing.StreamCallRequest\x1a .grpc.testing.StreamCallResponse(\x01\x30\x01\x62\x06proto3' +) + + + +_CALLSTATE_STATE = _descriptor.EnumDescriptor( + name='State', + full_name='grpc.testing.CallState.State', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='UNDEFINED', index=0, number=0, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='NEW', index=1, number=1, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='ACTIVE', index=2, number=6, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='ENDED', index=3, number=7, + serialized_options=None, + type=None), + ], + containing_type=None, + serialized_options=None, + serialized_start=135, + serialized_end=189, +) +_sym_db.RegisterEnumDescriptor(_CALLSTATE_STATE) + + +_CALLINFO = _descriptor.Descriptor( + name='CallInfo', + full_name='grpc.testing.CallInfo', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='session_id', full_name='grpc.testing.CallInfo.session_id', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='media', full_name='grpc.testing.CallInfo.media', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=29, + serialized_end=74, +) + + +_CALLSTATE = _descriptor.Descriptor( + name='CallState', + full_name='grpc.testing.CallState', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='state', full_name='grpc.testing.CallState.state', index=0, + number=2, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + _CALLSTATE_STATE, + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=76, + serialized_end=189, +) + + +_STREAMCALLREQUEST = _descriptor.Descriptor( + name='StreamCallRequest', + full_name='grpc.testing.StreamCallRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='phone_number', full_name='grpc.testing.StreamCallRequest.phone_number', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=191, + serialized_end=232, +) + + +_STREAMCALLRESPONSE = _descriptor.Descriptor( + name='StreamCallResponse', + full_name='grpc.testing.StreamCallResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='call_info', full_name='grpc.testing.StreamCallResponse.call_info', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='call_state', full_name='grpc.testing.StreamCallResponse.call_state', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + _descriptor.OneofDescriptor( + name='stream_call_response', full_name='grpc.testing.StreamCallResponse.stream_call_response', + index=0, containing_type=None, fields=[]), + ], + serialized_start=235, + serialized_end=371, +) + +_CALLSTATE.fields_by_name['state'].enum_type = _CALLSTATE_STATE +_CALLSTATE_STATE.containing_type = _CALLSTATE +_STREAMCALLRESPONSE.fields_by_name['call_info'].message_type = _CALLINFO +_STREAMCALLRESPONSE.fields_by_name['call_state'].message_type = _CALLSTATE +_STREAMCALLRESPONSE.oneofs_by_name['stream_call_response'].fields.append( + _STREAMCALLRESPONSE.fields_by_name['call_info']) +_STREAMCALLRESPONSE.fields_by_name['call_info'].containing_oneof = _STREAMCALLRESPONSE.oneofs_by_name['stream_call_response'] +_STREAMCALLRESPONSE.oneofs_by_name['stream_call_response'].fields.append( + _STREAMCALLRESPONSE.fields_by_name['call_state']) +_STREAMCALLRESPONSE.fields_by_name['call_state'].containing_oneof = _STREAMCALLRESPONSE.oneofs_by_name['stream_call_response'] +DESCRIPTOR.message_types_by_name['CallInfo'] = _CALLINFO +DESCRIPTOR.message_types_by_name['CallState'] = _CALLSTATE +DESCRIPTOR.message_types_by_name['StreamCallRequest'] = _STREAMCALLREQUEST +DESCRIPTOR.message_types_by_name['StreamCallResponse'] = _STREAMCALLRESPONSE +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +CallInfo = _reflection.GeneratedProtocolMessageType('CallInfo', (_message.Message,), { + 'DESCRIPTOR' : _CALLINFO, + '__module__' : 'phone_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.CallInfo) + }) +_sym_db.RegisterMessage(CallInfo) + +CallState = _reflection.GeneratedProtocolMessageType('CallState', (_message.Message,), { + 'DESCRIPTOR' : _CALLSTATE, + '__module__' : 'phone_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.CallState) + }) +_sym_db.RegisterMessage(CallState) + +StreamCallRequest = _reflection.GeneratedProtocolMessageType('StreamCallRequest', (_message.Message,), { + 'DESCRIPTOR' : _STREAMCALLREQUEST, + '__module__' : 'phone_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.StreamCallRequest) + }) +_sym_db.RegisterMessage(StreamCallRequest) + +StreamCallResponse = _reflection.GeneratedProtocolMessageType('StreamCallResponse', (_message.Message,), { + 'DESCRIPTOR' : _STREAMCALLRESPONSE, + '__module__' : 'phone_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.StreamCallResponse) + }) +_sym_db.RegisterMessage(StreamCallResponse) + + + +_PHONE = _descriptor.ServiceDescriptor( + name='Phone', + full_name='grpc.testing.Phone', + file=DESCRIPTOR, + index=0, + serialized_options=None, + serialized_start=373, + serialized_end=465, + methods=[ + _descriptor.MethodDescriptor( + name='StreamCall', + full_name='grpc.testing.Phone.StreamCall', + index=0, + containing_service=None, + input_type=_STREAMCALLREQUEST, + output_type=_STREAMCALLRESPONSE, + serialized_options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_PHONE) + +DESCRIPTOR.services_by_name['Phone'] = _PHONE + +# @@protoc_insertion_point(module_scope) diff --git a/examples/python/async_streaming/phone_pb2_grpc.py b/examples/python/async_streaming/phone_pb2_grpc.py new file mode 100644 index 00000000000..39bc58d47af --- /dev/null +++ b/examples/python/async_streaming/phone_pb2_grpc.py @@ -0,0 +1,65 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +import phone_pb2 as phone__pb2 + + +class PhoneStub(object): + """Missing associated documentation comment in .proto file""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.StreamCall = channel.stream_stream( + '/grpc.testing.Phone/StreamCall', + request_serializer=phone__pb2.StreamCallRequest.SerializeToString, + response_deserializer=phone__pb2.StreamCallResponse.FromString, + ) + + +class PhoneServicer(object): + """Missing associated documentation comment in .proto file""" + + def StreamCall(self, request_iterator, context): + """Makes a phone call and communicate states via a stream. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_PhoneServicer_to_server(servicer, server): + rpc_method_handlers = { + 'StreamCall': grpc.stream_stream_rpc_method_handler( + servicer.StreamCall, + request_deserializer=phone__pb2.StreamCallRequest.FromString, + response_serializer=phone__pb2.StreamCallResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'grpc.testing.Phone', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class Phone(object): + """Missing associated documentation comment in .proto file""" + + @staticmethod + def StreamCall(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream(request_iterator, target, '/grpc.testing.Phone/StreamCall', + phone__pb2.StreamCallRequest.SerializeToString, + phone__pb2.StreamCallResponse.FromString, + options, channel_credentials, + call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/examples/python/async_streaming/server.py b/examples/python/async_streaming/server.py new file mode 100644 index 00000000000..96ff822b33d --- /dev/null +++ b/examples/python/async_streaming/server.py @@ -0,0 +1,92 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from concurrent.futures import ThreadPoolExecutor +from typing import Iterable +import threading + +import grpc +from google.protobuf.json_format import MessageToJson + +import phone_pb2 +import phone_pb2_grpc + + +def create_state_response(call_state: phone_pb2.CallState.State + ) -> phone_pb2.StreamCallResponse: + response = phone_pb2.StreamCallResponse() + response.call_state.state = call_state + return response + + +class Phone(phone_pb2_grpc.PhoneServicer): + + def __init__(self): + self._id_counter = 0 + self._lock = threading.RLock() + + def _create_call_session(self) -> phone_pb2.CallInfo: + call_info = phone_pb2.CallInfo() + with self._lock: + call_info.session_id = str(self._id_counter) + self._id_counter += 1 + call_info.media = "https://link.to.audio.resources" + logging.info("Created a call session [%s]", MessageToJson(call_info)) + return call_info + + def _clean_call_session(self, call_info: phone_pb2.CallInfo) -> None: + logging.info("Call session cleaned [%s]", MessageToJson(call_info)) + + def StreamCall(self, + request_iterator: Iterable[phone_pb2.StreamCallRequest], + context: grpc.ServicerContext + ) -> Iterable[phone_pb2.StreamCallResponse]: + try: + request = next(request_iterator) + logging.info("Received a phone call request for number [%s]", + request.phone_number) + except StopIteration: + raise RuntimeError("Failed to receive call request") + # Simulate the acceptance of call request + time.sleep(1) + yield create_state_response(phone_pb2.CallState.NEW) + # Simulate the start of the call session + time.sleep(1) + call_info = self._create_call_session() + context.add_callback(lambda: self._clean_call_session(call_info)) + response = phone_pb2.StreamCallResponse() + response.call_info.session_id = call_info.session_id + response.call_info.media = call_info.media + yield response + yield create_state_response(phone_pb2.CallState.ACTIVE) + # Simulate the end of the call + time.sleep(2) + yield create_state_response(phone_pb2.CallState.ENDED) + logging.info("Call finished [%s]", request.phone_number) + + +def serve(address: str) -> None: + server = grpc.server(ThreadPoolExecutor()) + phone_pb2_grpc.add_PhoneServicer_to_server(Phone(), server) + server.add_insecure_port(address) + server.start() + logging.info("Server serving at %s", address) + server.wait_for_termination() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + serve("[::]:50051")