Add async bidi-streaming example

pull/24668/head
Lidi Zheng 4 years ago
parent a951c7632a
commit f2d06b8458
  1. 50
      examples/python/async_streaming/README.md
  2. 119
      examples/python/async_streaming/client.py
  3. 52
      examples/python/async_streaming/phone.proto
  4. 267
      examples/python/async_streaming/phone_pb2.py
  5. 65
      examples/python/async_streaming/phone_pb2_grpc.py
  6. 92
      examples/python/async_streaming/server.py

@ -0,0 +1,50 @@
# gRPC Python Non-Blocking Streaming RPC Client Example
The goal of this example is to demonstrate how to handle streaming responses
without blocking the current thread. Effectively, this can be achieved by
converting the gRPC Python streaming API into callback-based.
In this example, the RPC service `Phone` simulates the life cycle of virtual
phone calls. It requires one thread to handle the phone-call session state
changes, and another thread to process the audio stream. In this case, the
normal blocking style API could not fulfill the need easily. Hence, we should
asynchronously execute the streaming RPC.
## Steps to run this example
Start the server in one session
```
python3 server.py
```
Start the client in another session
```
python3 client.py
```
## Example Output
```
$ python3 server.py
INFO:root:Server serving at [::]:50051
INFO:root:Received a phone call request for number [1415926535]
INFO:root:Created a call session [{
"sessionId": "0",
"media": "https://link.to.audio.resources"
}]
INFO:root:Call finished [1415926535]
INFO:root:Call session cleaned [{
"sessionId": "0",
"media": "https://link.to.audio.resources"
}]
```
```
$ python3 client.py
INFO:root:Waiting for peer to connect [1415926535]...
INFO:root:Call toward [1415926535] enters [NEW] state
INFO:root:Call toward [1415926535] enters [ACTIVE] state
INFO:root:Consuming audio resource [https://link.to.audio.resources]
INFO:root:Call toward [1415926535] enters [ENDED] state
INFO:root:Audio session finished [https://link.to.audio.resources]
INFO:root:Call finished!
```

@ -0,0 +1,119 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import threading
from typing import Iterator
from concurrent.futures import ThreadPoolExecutor
import grpc
import phone_pb2
import phone_pb2_grpc
class CallMaker:
def __init__(self, executor: ThreadPoolExecutor, channel: grpc.Channel,
phone_number: str) -> None:
self._executor = executor
self._channel = channel
self._stub = phone_pb2_grpc.PhoneStub(self._channel)
self._phone_number = phone_number
self._session_id = None
self._audio_session_link = None
self._call_state = None
self._peer_responded = threading.Event()
self._call_finished = threading.Event()
self._consumer_future = None
def _response_watcher(
self,
response_iterator: Iterator[phone_pb2.StreamCallResponse]) -> None:
try:
for response in response_iterator:
# NOTE: All fields in Proto3 are optional. This is the recommended way
# to check if a field is present or not, or to exam which one-of field is
# fulfilled by this message.
if response.HasField("call_info"):
self._on_call_info(response.call_info)
elif response.HasField("call_state"):
self._on_call_state(response.call_state.state)
else:
raise RuntimeError(
"Received StreamCallResponse without call_info and call_state"
)
except Exception as e:
self._peer_responded.set()
raise
def _on_call_info(self, call_info: phone_pb2.CallInfo) -> None:
self._session_id = call_info.session_id
self._audio_session_link = call_info.media
def _on_call_state(self, call_state: phone_pb2.CallState.State) -> None:
logging.info("Call toward [%s] enters [%s] state", self._phone_number,
phone_pb2.CallState.State.Name(call_state))
self._call_state = call_state
if call_state is phone_pb2.CallState.State.ACTIVE:
self._peer_responded.set()
if call_state is phone_pb2.CallState.State.ENDED:
self._peer_responded.set()
self._call_finished.set()
def call(self) -> None:
request = phone_pb2.StreamCallRequest()
request.phone_number = self._phone_number
response_iterator = self._stub.StreamCall(iter((request,)))
# Instead of consuming the response on current thread, spawn a consumption thread.
self._consumer_future = self._executor.submit(self._response_watcher,
response_iterator)
def wait_peer(self) -> None:
logging.info("Waiting for peer to connect [%s]...", self._phone_number)
self._peer_responded.wait(timeout=None)
if self._consumer_future.done():
# If the future raises, forwards the exception here
self._consumer_future.result()
return self._call_state is phone_pb2.CallState.State.ACTIVE
def audio_session(self) -> None:
assert self._audio_session_link is not None
logging.info("Consuming audio resource [%s]", self._audio_session_link)
self._call_finished.wait(timeout=None)
logging.info("Audio session finished [%s]", self._audio_session_link)
def process_call(executor: ThreadPoolExecutor, channel: grpc.Channel,
phone_number: str) -> None:
call_maker = CallMaker(executor, channel, phone_number)
call_maker.call()
if call_maker.wait_peer():
call_maker.audio_session()
logging.info("Call finished!")
else:
logging.info("Call failed: peer didn't answer")
def run():
executor = ThreadPoolExecutor()
with grpc.insecure_channel("localhost:50051") as channel:
future = executor.submit(process_call, executor, channel,
"555-0100-XXXX")
future.result()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
run()

@ -0,0 +1,52 @@
// Copyright 2020 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package grpc.testing;
message CallInfo {
string session_id = 1;
string media = 2;
}
message CallState {
enum State {
// The default state.
UNDEFINED = 0;
// The call is newly created.
NEW = 1;
// The call is connected.
ACTIVE = 6;
// The call is finished.
ENDED = 7;
}
State state = 2;
}
message StreamCallRequest {
string phone_number = 1;
}
message StreamCallResponse {
oneof stream_call_response {
CallInfo call_info = 1;
CallState call_state = 2;
}
}
service Phone {
// Makes a phone call and communicate states via a stream.
rpc StreamCall(stream StreamCallRequest) returns (stream StreamCallResponse);
}

@ -0,0 +1,267 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: phone.proto
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='phone.proto',
package='grpc.testing',
syntax='proto3',
serialized_options=None,
serialized_pb=b'\n\x0bphone.proto\x12\x0cgrpc.testing\"-\n\x08\x43\x61llInfo\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\r\n\x05media\x18\x02 \x01(\t\"q\n\tCallState\x12,\n\x05state\x18\x02 \x01(\x0e\x32\x1d.grpc.testing.CallState.State\"6\n\x05State\x12\r\n\tUNDEFINED\x10\x00\x12\x07\n\x03NEW\x10\x01\x12\n\n\x06\x41\x43TIVE\x10\x06\x12\t\n\x05\x45NDED\x10\x07\")\n\x11StreamCallRequest\x12\x14\n\x0cphone_number\x18\x01 \x01(\t\"\x88\x01\n\x12StreamCallResponse\x12+\n\tcall_info\x18\x01 \x01(\x0b\x32\x16.grpc.testing.CallInfoH\x00\x12-\n\ncall_state\x18\x02 \x01(\x0b\x32\x17.grpc.testing.CallStateH\x00\x42\x16\n\x14stream_call_response2\\\n\x05Phone\x12S\n\nStreamCall\x12\x1f.grpc.testing.StreamCallRequest\x1a .grpc.testing.StreamCallResponse(\x01\x30\x01\x62\x06proto3'
)
_CALLSTATE_STATE = _descriptor.EnumDescriptor(
name='State',
full_name='grpc.testing.CallState.State',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='UNDEFINED', index=0, number=0,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='NEW', index=1, number=1,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='ACTIVE', index=2, number=6,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='ENDED', index=3, number=7,
serialized_options=None,
type=None),
],
containing_type=None,
serialized_options=None,
serialized_start=135,
serialized_end=189,
)
_sym_db.RegisterEnumDescriptor(_CALLSTATE_STATE)
_CALLINFO = _descriptor.Descriptor(
name='CallInfo',
full_name='grpc.testing.CallInfo',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='session_id', full_name='grpc.testing.CallInfo.session_id', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='media', full_name='grpc.testing.CallInfo.media', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=29,
serialized_end=74,
)
_CALLSTATE = _descriptor.Descriptor(
name='CallState',
full_name='grpc.testing.CallState',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='state', full_name='grpc.testing.CallState.state', index=0,
number=2, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
_CALLSTATE_STATE,
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=76,
serialized_end=189,
)
_STREAMCALLREQUEST = _descriptor.Descriptor(
name='StreamCallRequest',
full_name='grpc.testing.StreamCallRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='phone_number', full_name='grpc.testing.StreamCallRequest.phone_number', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=191,
serialized_end=232,
)
_STREAMCALLRESPONSE = _descriptor.Descriptor(
name='StreamCallResponse',
full_name='grpc.testing.StreamCallResponse',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='call_info', full_name='grpc.testing.StreamCallResponse.call_info', index=0,
number=1, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='call_state', full_name='grpc.testing.StreamCallResponse.call_state', index=1,
number=2, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
_descriptor.OneofDescriptor(
name='stream_call_response', full_name='grpc.testing.StreamCallResponse.stream_call_response',
index=0, containing_type=None, fields=[]),
],
serialized_start=235,
serialized_end=371,
)
_CALLSTATE.fields_by_name['state'].enum_type = _CALLSTATE_STATE
_CALLSTATE_STATE.containing_type = _CALLSTATE
_STREAMCALLRESPONSE.fields_by_name['call_info'].message_type = _CALLINFO
_STREAMCALLRESPONSE.fields_by_name['call_state'].message_type = _CALLSTATE
_STREAMCALLRESPONSE.oneofs_by_name['stream_call_response'].fields.append(
_STREAMCALLRESPONSE.fields_by_name['call_info'])
_STREAMCALLRESPONSE.fields_by_name['call_info'].containing_oneof = _STREAMCALLRESPONSE.oneofs_by_name['stream_call_response']
_STREAMCALLRESPONSE.oneofs_by_name['stream_call_response'].fields.append(
_STREAMCALLRESPONSE.fields_by_name['call_state'])
_STREAMCALLRESPONSE.fields_by_name['call_state'].containing_oneof = _STREAMCALLRESPONSE.oneofs_by_name['stream_call_response']
DESCRIPTOR.message_types_by_name['CallInfo'] = _CALLINFO
DESCRIPTOR.message_types_by_name['CallState'] = _CALLSTATE
DESCRIPTOR.message_types_by_name['StreamCallRequest'] = _STREAMCALLREQUEST
DESCRIPTOR.message_types_by_name['StreamCallResponse'] = _STREAMCALLRESPONSE
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
CallInfo = _reflection.GeneratedProtocolMessageType('CallInfo', (_message.Message,), {
'DESCRIPTOR' : _CALLINFO,
'__module__' : 'phone_pb2'
# @@protoc_insertion_point(class_scope:grpc.testing.CallInfo)
})
_sym_db.RegisterMessage(CallInfo)
CallState = _reflection.GeneratedProtocolMessageType('CallState', (_message.Message,), {
'DESCRIPTOR' : _CALLSTATE,
'__module__' : 'phone_pb2'
# @@protoc_insertion_point(class_scope:grpc.testing.CallState)
})
_sym_db.RegisterMessage(CallState)
StreamCallRequest = _reflection.GeneratedProtocolMessageType('StreamCallRequest', (_message.Message,), {
'DESCRIPTOR' : _STREAMCALLREQUEST,
'__module__' : 'phone_pb2'
# @@protoc_insertion_point(class_scope:grpc.testing.StreamCallRequest)
})
_sym_db.RegisterMessage(StreamCallRequest)
StreamCallResponse = _reflection.GeneratedProtocolMessageType('StreamCallResponse', (_message.Message,), {
'DESCRIPTOR' : _STREAMCALLRESPONSE,
'__module__' : 'phone_pb2'
# @@protoc_insertion_point(class_scope:grpc.testing.StreamCallResponse)
})
_sym_db.RegisterMessage(StreamCallResponse)
_PHONE = _descriptor.ServiceDescriptor(
name='Phone',
full_name='grpc.testing.Phone',
file=DESCRIPTOR,
index=0,
serialized_options=None,
serialized_start=373,
serialized_end=465,
methods=[
_descriptor.MethodDescriptor(
name='StreamCall',
full_name='grpc.testing.Phone.StreamCall',
index=0,
containing_service=None,
input_type=_STREAMCALLREQUEST,
output_type=_STREAMCALLRESPONSE,
serialized_options=None,
),
])
_sym_db.RegisterServiceDescriptor(_PHONE)
DESCRIPTOR.services_by_name['Phone'] = _PHONE
# @@protoc_insertion_point(module_scope)

@ -0,0 +1,65 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
import phone_pb2 as phone__pb2
class PhoneStub(object):
"""Missing associated documentation comment in .proto file"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.StreamCall = channel.stream_stream(
'/grpc.testing.Phone/StreamCall',
request_serializer=phone__pb2.StreamCallRequest.SerializeToString,
response_deserializer=phone__pb2.StreamCallResponse.FromString,
)
class PhoneServicer(object):
"""Missing associated documentation comment in .proto file"""
def StreamCall(self, request_iterator, context):
"""Makes a phone call and communicate states via a stream.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_PhoneServicer_to_server(servicer, server):
rpc_method_handlers = {
'StreamCall': grpc.stream_stream_rpc_method_handler(
servicer.StreamCall,
request_deserializer=phone__pb2.StreamCallRequest.FromString,
response_serializer=phone__pb2.StreamCallResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'grpc.testing.Phone', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class Phone(object):
"""Missing associated documentation comment in .proto file"""
@staticmethod
def StreamCall(request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.stream_stream(request_iterator, target, '/grpc.testing.Phone/StreamCall',
phone__pb2.StreamCallRequest.SerializeToString,
phone__pb2.StreamCallResponse.FromString,
options, channel_credentials,
call_credentials, compression, wait_for_ready, timeout, metadata)

@ -0,0 +1,92 @@
# Copyright 2020 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Iterable
import threading
import grpc
from google.protobuf.json_format import MessageToJson
import phone_pb2
import phone_pb2_grpc
def create_state_response(call_state: phone_pb2.CallState.State
) -> phone_pb2.StreamCallResponse:
response = phone_pb2.StreamCallResponse()
response.call_state.state = call_state
return response
class Phone(phone_pb2_grpc.PhoneServicer):
def __init__(self):
self._id_counter = 0
self._lock = threading.RLock()
def _create_call_session(self) -> phone_pb2.CallInfo:
call_info = phone_pb2.CallInfo()
with self._lock:
call_info.session_id = str(self._id_counter)
self._id_counter += 1
call_info.media = "https://link.to.audio.resources"
logging.info("Created a call session [%s]", MessageToJson(call_info))
return call_info
def _clean_call_session(self, call_info: phone_pb2.CallInfo) -> None:
logging.info("Call session cleaned [%s]", MessageToJson(call_info))
def StreamCall(self,
request_iterator: Iterable[phone_pb2.StreamCallRequest],
context: grpc.ServicerContext
) -> Iterable[phone_pb2.StreamCallResponse]:
try:
request = next(request_iterator)
logging.info("Received a phone call request for number [%s]",
request.phone_number)
except StopIteration:
raise RuntimeError("Failed to receive call request")
# Simulate the acceptance of call request
time.sleep(1)
yield create_state_response(phone_pb2.CallState.NEW)
# Simulate the start of the call session
time.sleep(1)
call_info = self._create_call_session()
context.add_callback(lambda: self._clean_call_session(call_info))
response = phone_pb2.StreamCallResponse()
response.call_info.session_id = call_info.session_id
response.call_info.media = call_info.media
yield response
yield create_state_response(phone_pb2.CallState.ACTIVE)
# Simulate the end of the call
time.sleep(2)
yield create_state_response(phone_pb2.CallState.ENDED)
logging.info("Call finished [%s]", request.phone_number)
def serve(address: str) -> None:
server = grpc.server(ThreadPoolExecutor())
phone_pb2_grpc.add_PhoneServicer_to_server(Phone(), server)
server.add_insecure_port(address)
server.start()
logging.info("Server serving at %s", address)
server.wait_for_termination()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
serve("[::]:50051")
Loading…
Cancel
Save