mirror of https://github.com/grpc/grpc.git
Merge pull request #24668 from lidizheng/example-async-bidi
Add async bidi-streaming examplepull/24988/head
commit
c515666a73
6 changed files with 645 additions and 0 deletions
@ -0,0 +1,50 @@ |
||||
# gRPC Python Non-Blocking Streaming RPC Client Example |
||||
|
||||
The goal of this example is to demonstrate how to handle streaming responses |
||||
without blocking the current thread. Effectively, this can be achieved by |
||||
converting the gRPC Python streaming API into callback-based. |
||||
|
||||
In this example, the RPC service `Phone` simulates the life cycle of virtual |
||||
phone calls. It requires one thread to handle the phone-call session state |
||||
changes, and another thread to process the audio stream. In this case, the |
||||
normal blocking style API could not fulfill the need easily. Hence, we should |
||||
asynchronously execute the streaming RPC. |
||||
|
||||
## Steps to run this example |
||||
|
||||
Start the server in one session |
||||
``` |
||||
python3 server.py |
||||
``` |
||||
|
||||
Start the client in another session |
||||
``` |
||||
python3 client.py |
||||
``` |
||||
|
||||
## Example Output |
||||
``` |
||||
$ python3 server.py |
||||
INFO:root:Server serving at [::]:50051 |
||||
INFO:root:Received a phone call request for number [1415926535] |
||||
INFO:root:Created a call session [{ |
||||
"sessionId": "0", |
||||
"media": "https://link.to.audio.resources" |
||||
}] |
||||
INFO:root:Call finished [1415926535] |
||||
INFO:root:Call session cleaned [{ |
||||
"sessionId": "0", |
||||
"media": "https://link.to.audio.resources" |
||||
}] |
||||
``` |
||||
|
||||
``` |
||||
$ python3 client.py |
||||
INFO:root:Waiting for peer to connect [1415926535]... |
||||
INFO:root:Call toward [1415926535] enters [NEW] state |
||||
INFO:root:Call toward [1415926535] enters [ACTIVE] state |
||||
INFO:root:Consuming audio resource [https://link.to.audio.resources] |
||||
INFO:root:Call toward [1415926535] enters [ENDED] state |
||||
INFO:root:Audio session finished [https://link.to.audio.resources] |
||||
INFO:root:Call finished! |
||||
``` |
@ -0,0 +1,119 @@ |
||||
# Copyright 2020 The gRPC Authors |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
import logging |
||||
import threading |
||||
from typing import Iterator |
||||
from concurrent.futures import ThreadPoolExecutor |
||||
|
||||
import grpc |
||||
|
||||
import phone_pb2 |
||||
import phone_pb2_grpc |
||||
|
||||
|
||||
class CallMaker: |
||||
|
||||
def __init__(self, executor: ThreadPoolExecutor, channel: grpc.Channel, |
||||
phone_number: str) -> None: |
||||
self._executor = executor |
||||
self._channel = channel |
||||
self._stub = phone_pb2_grpc.PhoneStub(self._channel) |
||||
self._phone_number = phone_number |
||||
self._session_id = None |
||||
self._audio_session_link = None |
||||
self._call_state = None |
||||
self._peer_responded = threading.Event() |
||||
self._call_finished = threading.Event() |
||||
self._consumer_future = None |
||||
|
||||
def _response_watcher( |
||||
self, |
||||
response_iterator: Iterator[phone_pb2.StreamCallResponse]) -> None: |
||||
try: |
||||
for response in response_iterator: |
||||
# NOTE: All fields in Proto3 are optional. This is the recommended way |
||||
# to check if a field is present or not, or to exam which one-of field is |
||||
# fulfilled by this message. |
||||
if response.HasField("call_info"): |
||||
self._on_call_info(response.call_info) |
||||
elif response.HasField("call_state"): |
||||
self._on_call_state(response.call_state.state) |
||||
else: |
||||
raise RuntimeError( |
||||
"Received StreamCallResponse without call_info and call_state" |
||||
) |
||||
except Exception as e: |
||||
self._peer_responded.set() |
||||
raise |
||||
|
||||
def _on_call_info(self, call_info: phone_pb2.CallInfo) -> None: |
||||
self._session_id = call_info.session_id |
||||
self._audio_session_link = call_info.media |
||||
|
||||
def _on_call_state(self, call_state: phone_pb2.CallState.State) -> None: |
||||
logging.info("Call toward [%s] enters [%s] state", self._phone_number, |
||||
phone_pb2.CallState.State.Name(call_state)) |
||||
self._call_state = call_state |
||||
if call_state is phone_pb2.CallState.State.ACTIVE: |
||||
self._peer_responded.set() |
||||
if call_state is phone_pb2.CallState.State.ENDED: |
||||
self._peer_responded.set() |
||||
self._call_finished.set() |
||||
|
||||
def call(self) -> None: |
||||
request = phone_pb2.StreamCallRequest() |
||||
request.phone_number = self._phone_number |
||||
response_iterator = self._stub.StreamCall(iter((request,))) |
||||
# Instead of consuming the response on current thread, spawn a consumption thread. |
||||
self._consumer_future = self._executor.submit(self._response_watcher, |
||||
response_iterator) |
||||
|
||||
def wait_peer(self) -> None: |
||||
logging.info("Waiting for peer to connect [%s]...", self._phone_number) |
||||
self._peer_responded.wait(timeout=None) |
||||
if self._consumer_future.done(): |
||||
# If the future raises, forwards the exception here |
||||
self._consumer_future.result() |
||||
return self._call_state is phone_pb2.CallState.State.ACTIVE |
||||
|
||||
def audio_session(self) -> None: |
||||
assert self._audio_session_link is not None |
||||
logging.info("Consuming audio resource [%s]", self._audio_session_link) |
||||
self._call_finished.wait(timeout=None) |
||||
logging.info("Audio session finished [%s]", self._audio_session_link) |
||||
|
||||
|
||||
def process_call(executor: ThreadPoolExecutor, channel: grpc.Channel, |
||||
phone_number: str) -> None: |
||||
call_maker = CallMaker(executor, channel, phone_number) |
||||
call_maker.call() |
||||
if call_maker.wait_peer(): |
||||
call_maker.audio_session() |
||||
logging.info("Call finished!") |
||||
else: |
||||
logging.info("Call failed: peer didn't answer") |
||||
|
||||
|
||||
def run(): |
||||
executor = ThreadPoolExecutor() |
||||
with grpc.insecure_channel("localhost:50051") as channel: |
||||
future = executor.submit(process_call, executor, channel, |
||||
"555-0100-XXXX") |
||||
future.result() |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
logging.basicConfig(level=logging.INFO) |
||||
run() |
@ -0,0 +1,52 @@ |
||||
// Copyright 2020 The gRPC Authors |
||||
// |
||||
// Licensed under the Apache License, Version 2.0 (the "License"); |
||||
// you may not use this file except in compliance with the License. |
||||
// You may obtain a copy of the License at |
||||
// |
||||
// http://www.apache.org/licenses/LICENSE-2.0 |
||||
// |
||||
// Unless required by applicable law or agreed to in writing, software |
||||
// distributed under the License is distributed on an "AS IS" BASIS, |
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
// See the License for the specific language governing permissions and |
||||
// limitations under the License. |
||||
|
||||
syntax = "proto3"; |
||||
|
||||
package grpc.testing; |
||||
|
||||
message CallInfo { |
||||
string session_id = 1; |
||||
string media = 2; |
||||
} |
||||
|
||||
message CallState { |
||||
enum State { |
||||
// The default state. |
||||
UNDEFINED = 0; |
||||
// The call is newly created. |
||||
NEW = 1; |
||||
// The call is connected. |
||||
ACTIVE = 6; |
||||
// The call is finished. |
||||
ENDED = 7; |
||||
} |
||||
State state = 2; |
||||
} |
||||
|
||||
message StreamCallRequest { |
||||
string phone_number = 1; |
||||
} |
||||
|
||||
message StreamCallResponse { |
||||
oneof stream_call_response { |
||||
CallInfo call_info = 1; |
||||
CallState call_state = 2; |
||||
} |
||||
} |
||||
|
||||
service Phone { |
||||
// Makes a phone call and communicate states via a stream. |
||||
rpc StreamCall(stream StreamCallRequest) returns (stream StreamCallResponse); |
||||
} |
@ -0,0 +1,267 @@ |
||||
# -*- coding: utf-8 -*- |
||||
# Generated by the protocol buffer compiler. DO NOT EDIT! |
||||
# source: phone.proto |
||||
|
||||
from google.protobuf import descriptor as _descriptor |
||||
from google.protobuf import message as _message |
||||
from google.protobuf import reflection as _reflection |
||||
from google.protobuf import symbol_database as _symbol_database |
||||
# @@protoc_insertion_point(imports) |
||||
|
||||
_sym_db = _symbol_database.Default() |
||||
|
||||
|
||||
|
||||
|
||||
DESCRIPTOR = _descriptor.FileDescriptor( |
||||
name='phone.proto', |
||||
package='grpc.testing', |
||||
syntax='proto3', |
||||
serialized_options=None, |
||||
serialized_pb=b'\n\x0bphone.proto\x12\x0cgrpc.testing\"-\n\x08\x43\x61llInfo\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\r\n\x05media\x18\x02 \x01(\t\"q\n\tCallState\x12,\n\x05state\x18\x02 \x01(\x0e\x32\x1d.grpc.testing.CallState.State\"6\n\x05State\x12\r\n\tUNDEFINED\x10\x00\x12\x07\n\x03NEW\x10\x01\x12\n\n\x06\x41\x43TIVE\x10\x06\x12\t\n\x05\x45NDED\x10\x07\")\n\x11StreamCallRequest\x12\x14\n\x0cphone_number\x18\x01 \x01(\t\"\x88\x01\n\x12StreamCallResponse\x12+\n\tcall_info\x18\x01 \x01(\x0b\x32\x16.grpc.testing.CallInfoH\x00\x12-\n\ncall_state\x18\x02 \x01(\x0b\x32\x17.grpc.testing.CallStateH\x00\x42\x16\n\x14stream_call_response2\\\n\x05Phone\x12S\n\nStreamCall\x12\x1f.grpc.testing.StreamCallRequest\x1a .grpc.testing.StreamCallResponse(\x01\x30\x01\x62\x06proto3' |
||||
) |
||||
|
||||
|
||||
|
||||
_CALLSTATE_STATE = _descriptor.EnumDescriptor( |
||||
name='State', |
||||
full_name='grpc.testing.CallState.State', |
||||
filename=None, |
||||
file=DESCRIPTOR, |
||||
values=[ |
||||
_descriptor.EnumValueDescriptor( |
||||
name='UNDEFINED', index=0, number=0, |
||||
serialized_options=None, |
||||
type=None), |
||||
_descriptor.EnumValueDescriptor( |
||||
name='NEW', index=1, number=1, |
||||
serialized_options=None, |
||||
type=None), |
||||
_descriptor.EnumValueDescriptor( |
||||
name='ACTIVE', index=2, number=6, |
||||
serialized_options=None, |
||||
type=None), |
||||
_descriptor.EnumValueDescriptor( |
||||
name='ENDED', index=3, number=7, |
||||
serialized_options=None, |
||||
type=None), |
||||
], |
||||
containing_type=None, |
||||
serialized_options=None, |
||||
serialized_start=135, |
||||
serialized_end=189, |
||||
) |
||||
_sym_db.RegisterEnumDescriptor(_CALLSTATE_STATE) |
||||
|
||||
|
||||
_CALLINFO = _descriptor.Descriptor( |
||||
name='CallInfo', |
||||
full_name='grpc.testing.CallInfo', |
||||
filename=None, |
||||
file=DESCRIPTOR, |
||||
containing_type=None, |
||||
fields=[ |
||||
_descriptor.FieldDescriptor( |
||||
name='session_id', full_name='grpc.testing.CallInfo.session_id', index=0, |
||||
number=1, type=9, cpp_type=9, label=1, |
||||
has_default_value=False, default_value=b"".decode('utf-8'), |
||||
message_type=None, enum_type=None, containing_type=None, |
||||
is_extension=False, extension_scope=None, |
||||
serialized_options=None, file=DESCRIPTOR), |
||||
_descriptor.FieldDescriptor( |
||||
name='media', full_name='grpc.testing.CallInfo.media', index=1, |
||||
number=2, type=9, cpp_type=9, label=1, |
||||
has_default_value=False, default_value=b"".decode('utf-8'), |
||||
message_type=None, enum_type=None, containing_type=None, |
||||
is_extension=False, extension_scope=None, |
||||
serialized_options=None, file=DESCRIPTOR), |
||||
], |
||||
extensions=[ |
||||
], |
||||
nested_types=[], |
||||
enum_types=[ |
||||
], |
||||
serialized_options=None, |
||||
is_extendable=False, |
||||
syntax='proto3', |
||||
extension_ranges=[], |
||||
oneofs=[ |
||||
], |
||||
serialized_start=29, |
||||
serialized_end=74, |
||||
) |
||||
|
||||
|
||||
_CALLSTATE = _descriptor.Descriptor( |
||||
name='CallState', |
||||
full_name='grpc.testing.CallState', |
||||
filename=None, |
||||
file=DESCRIPTOR, |
||||
containing_type=None, |
||||
fields=[ |
||||
_descriptor.FieldDescriptor( |
||||
name='state', full_name='grpc.testing.CallState.state', index=0, |
||||
number=2, type=14, cpp_type=8, label=1, |
||||
has_default_value=False, default_value=0, |
||||
message_type=None, enum_type=None, containing_type=None, |
||||
is_extension=False, extension_scope=None, |
||||
serialized_options=None, file=DESCRIPTOR), |
||||
], |
||||
extensions=[ |
||||
], |
||||
nested_types=[], |
||||
enum_types=[ |
||||
_CALLSTATE_STATE, |
||||
], |
||||
serialized_options=None, |
||||
is_extendable=False, |
||||
syntax='proto3', |
||||
extension_ranges=[], |
||||
oneofs=[ |
||||
], |
||||
serialized_start=76, |
||||
serialized_end=189, |
||||
) |
||||
|
||||
|
||||
_STREAMCALLREQUEST = _descriptor.Descriptor( |
||||
name='StreamCallRequest', |
||||
full_name='grpc.testing.StreamCallRequest', |
||||
filename=None, |
||||
file=DESCRIPTOR, |
||||
containing_type=None, |
||||
fields=[ |
||||
_descriptor.FieldDescriptor( |
||||
name='phone_number', full_name='grpc.testing.StreamCallRequest.phone_number', index=0, |
||||
number=1, type=9, cpp_type=9, label=1, |
||||
has_default_value=False, default_value=b"".decode('utf-8'), |
||||
message_type=None, enum_type=None, containing_type=None, |
||||
is_extension=False, extension_scope=None, |
||||
serialized_options=None, file=DESCRIPTOR), |
||||
], |
||||
extensions=[ |
||||
], |
||||
nested_types=[], |
||||
enum_types=[ |
||||
], |
||||
serialized_options=None, |
||||
is_extendable=False, |
||||
syntax='proto3', |
||||
extension_ranges=[], |
||||
oneofs=[ |
||||
], |
||||
serialized_start=191, |
||||
serialized_end=232, |
||||
) |
||||
|
||||
|
||||
_STREAMCALLRESPONSE = _descriptor.Descriptor( |
||||
name='StreamCallResponse', |
||||
full_name='grpc.testing.StreamCallResponse', |
||||
filename=None, |
||||
file=DESCRIPTOR, |
||||
containing_type=None, |
||||
fields=[ |
||||
_descriptor.FieldDescriptor( |
||||
name='call_info', full_name='grpc.testing.StreamCallResponse.call_info', index=0, |
||||
number=1, type=11, cpp_type=10, label=1, |
||||
has_default_value=False, default_value=None, |
||||
message_type=None, enum_type=None, containing_type=None, |
||||
is_extension=False, extension_scope=None, |
||||
serialized_options=None, file=DESCRIPTOR), |
||||
_descriptor.FieldDescriptor( |
||||
name='call_state', full_name='grpc.testing.StreamCallResponse.call_state', index=1, |
||||
number=2, type=11, cpp_type=10, label=1, |
||||
has_default_value=False, default_value=None, |
||||
message_type=None, enum_type=None, containing_type=None, |
||||
is_extension=False, extension_scope=None, |
||||
serialized_options=None, file=DESCRIPTOR), |
||||
], |
||||
extensions=[ |
||||
], |
||||
nested_types=[], |
||||
enum_types=[ |
||||
], |
||||
serialized_options=None, |
||||
is_extendable=False, |
||||
syntax='proto3', |
||||
extension_ranges=[], |
||||
oneofs=[ |
||||
_descriptor.OneofDescriptor( |
||||
name='stream_call_response', full_name='grpc.testing.StreamCallResponse.stream_call_response', |
||||
index=0, containing_type=None, fields=[]), |
||||
], |
||||
serialized_start=235, |
||||
serialized_end=371, |
||||
) |
||||
|
||||
_CALLSTATE.fields_by_name['state'].enum_type = _CALLSTATE_STATE |
||||
_CALLSTATE_STATE.containing_type = _CALLSTATE |
||||
_STREAMCALLRESPONSE.fields_by_name['call_info'].message_type = _CALLINFO |
||||
_STREAMCALLRESPONSE.fields_by_name['call_state'].message_type = _CALLSTATE |
||||
_STREAMCALLRESPONSE.oneofs_by_name['stream_call_response'].fields.append( |
||||
_STREAMCALLRESPONSE.fields_by_name['call_info']) |
||||
_STREAMCALLRESPONSE.fields_by_name['call_info'].containing_oneof = _STREAMCALLRESPONSE.oneofs_by_name['stream_call_response'] |
||||
_STREAMCALLRESPONSE.oneofs_by_name['stream_call_response'].fields.append( |
||||
_STREAMCALLRESPONSE.fields_by_name['call_state']) |
||||
_STREAMCALLRESPONSE.fields_by_name['call_state'].containing_oneof = _STREAMCALLRESPONSE.oneofs_by_name['stream_call_response'] |
||||
DESCRIPTOR.message_types_by_name['CallInfo'] = _CALLINFO |
||||
DESCRIPTOR.message_types_by_name['CallState'] = _CALLSTATE |
||||
DESCRIPTOR.message_types_by_name['StreamCallRequest'] = _STREAMCALLREQUEST |
||||
DESCRIPTOR.message_types_by_name['StreamCallResponse'] = _STREAMCALLRESPONSE |
||||
_sym_db.RegisterFileDescriptor(DESCRIPTOR) |
||||
|
||||
CallInfo = _reflection.GeneratedProtocolMessageType('CallInfo', (_message.Message,), { |
||||
'DESCRIPTOR' : _CALLINFO, |
||||
'__module__' : 'phone_pb2' |
||||
# @@protoc_insertion_point(class_scope:grpc.testing.CallInfo) |
||||
}) |
||||
_sym_db.RegisterMessage(CallInfo) |
||||
|
||||
CallState = _reflection.GeneratedProtocolMessageType('CallState', (_message.Message,), { |
||||
'DESCRIPTOR' : _CALLSTATE, |
||||
'__module__' : 'phone_pb2' |
||||
# @@protoc_insertion_point(class_scope:grpc.testing.CallState) |
||||
}) |
||||
_sym_db.RegisterMessage(CallState) |
||||
|
||||
StreamCallRequest = _reflection.GeneratedProtocolMessageType('StreamCallRequest', (_message.Message,), { |
||||
'DESCRIPTOR' : _STREAMCALLREQUEST, |
||||
'__module__' : 'phone_pb2' |
||||
# @@protoc_insertion_point(class_scope:grpc.testing.StreamCallRequest) |
||||
}) |
||||
_sym_db.RegisterMessage(StreamCallRequest) |
||||
|
||||
StreamCallResponse = _reflection.GeneratedProtocolMessageType('StreamCallResponse', (_message.Message,), { |
||||
'DESCRIPTOR' : _STREAMCALLRESPONSE, |
||||
'__module__' : 'phone_pb2' |
||||
# @@protoc_insertion_point(class_scope:grpc.testing.StreamCallResponse) |
||||
}) |
||||
_sym_db.RegisterMessage(StreamCallResponse) |
||||
|
||||
|
||||
|
||||
_PHONE = _descriptor.ServiceDescriptor( |
||||
name='Phone', |
||||
full_name='grpc.testing.Phone', |
||||
file=DESCRIPTOR, |
||||
index=0, |
||||
serialized_options=None, |
||||
serialized_start=373, |
||||
serialized_end=465, |
||||
methods=[ |
||||
_descriptor.MethodDescriptor( |
||||
name='StreamCall', |
||||
full_name='grpc.testing.Phone.StreamCall', |
||||
index=0, |
||||
containing_service=None, |
||||
input_type=_STREAMCALLREQUEST, |
||||
output_type=_STREAMCALLRESPONSE, |
||||
serialized_options=None, |
||||
), |
||||
]) |
||||
_sym_db.RegisterServiceDescriptor(_PHONE) |
||||
|
||||
DESCRIPTOR.services_by_name['Phone'] = _PHONE |
||||
|
||||
# @@protoc_insertion_point(module_scope) |
@ -0,0 +1,65 @@ |
||||
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! |
||||
import grpc |
||||
|
||||
import phone_pb2 as phone__pb2 |
||||
|
||||
|
||||
class PhoneStub(object): |
||||
"""Missing associated documentation comment in .proto file""" |
||||
|
||||
def __init__(self, channel): |
||||
"""Constructor. |
||||
|
||||
Args: |
||||
channel: A grpc.Channel. |
||||
""" |
||||
self.StreamCall = channel.stream_stream( |
||||
'/grpc.testing.Phone/StreamCall', |
||||
request_serializer=phone__pb2.StreamCallRequest.SerializeToString, |
||||
response_deserializer=phone__pb2.StreamCallResponse.FromString, |
||||
) |
||||
|
||||
|
||||
class PhoneServicer(object): |
||||
"""Missing associated documentation comment in .proto file""" |
||||
|
||||
def StreamCall(self, request_iterator, context): |
||||
"""Makes a phone call and communicate states via a stream. |
||||
""" |
||||
context.set_code(grpc.StatusCode.UNIMPLEMENTED) |
||||
context.set_details('Method not implemented!') |
||||
raise NotImplementedError('Method not implemented!') |
||||
|
||||
|
||||
def add_PhoneServicer_to_server(servicer, server): |
||||
rpc_method_handlers = { |
||||
'StreamCall': grpc.stream_stream_rpc_method_handler( |
||||
servicer.StreamCall, |
||||
request_deserializer=phone__pb2.StreamCallRequest.FromString, |
||||
response_serializer=phone__pb2.StreamCallResponse.SerializeToString, |
||||
), |
||||
} |
||||
generic_handler = grpc.method_handlers_generic_handler( |
||||
'grpc.testing.Phone', rpc_method_handlers) |
||||
server.add_generic_rpc_handlers((generic_handler,)) |
||||
|
||||
|
||||
# This class is part of an EXPERIMENTAL API. |
||||
class Phone(object): |
||||
"""Missing associated documentation comment in .proto file""" |
||||
|
||||
@staticmethod |
||||
def StreamCall(request_iterator, |
||||
target, |
||||
options=(), |
||||
channel_credentials=None, |
||||
call_credentials=None, |
||||
compression=None, |
||||
wait_for_ready=None, |
||||
timeout=None, |
||||
metadata=None): |
||||
return grpc.experimental.stream_stream(request_iterator, target, '/grpc.testing.Phone/StreamCall', |
||||
phone__pb2.StreamCallRequest.SerializeToString, |
||||
phone__pb2.StreamCallResponse.FromString, |
||||
options, channel_credentials, |
||||
call_credentials, compression, wait_for_ready, timeout, metadata) |
@ -0,0 +1,92 @@ |
||||
# Copyright 2020 The gRPC Authors |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
import logging |
||||
import time |
||||
from concurrent.futures import ThreadPoolExecutor |
||||
from typing import Iterable |
||||
import threading |
||||
|
||||
import grpc |
||||
from google.protobuf.json_format import MessageToJson |
||||
|
||||
import phone_pb2 |
||||
import phone_pb2_grpc |
||||
|
||||
|
||||
def create_state_response(call_state: phone_pb2.CallState.State |
||||
) -> phone_pb2.StreamCallResponse: |
||||
response = phone_pb2.StreamCallResponse() |
||||
response.call_state.state = call_state |
||||
return response |
||||
|
||||
|
||||
class Phone(phone_pb2_grpc.PhoneServicer): |
||||
|
||||
def __init__(self): |
||||
self._id_counter = 0 |
||||
self._lock = threading.RLock() |
||||
|
||||
def _create_call_session(self) -> phone_pb2.CallInfo: |
||||
call_info = phone_pb2.CallInfo() |
||||
with self._lock: |
||||
call_info.session_id = str(self._id_counter) |
||||
self._id_counter += 1 |
||||
call_info.media = "https://link.to.audio.resources" |
||||
logging.info("Created a call session [%s]", MessageToJson(call_info)) |
||||
return call_info |
||||
|
||||
def _clean_call_session(self, call_info: phone_pb2.CallInfo) -> None: |
||||
logging.info("Call session cleaned [%s]", MessageToJson(call_info)) |
||||
|
||||
def StreamCall(self, |
||||
request_iterator: Iterable[phone_pb2.StreamCallRequest], |
||||
context: grpc.ServicerContext |
||||
) -> Iterable[phone_pb2.StreamCallResponse]: |
||||
try: |
||||
request = next(request_iterator) |
||||
logging.info("Received a phone call request for number [%s]", |
||||
request.phone_number) |
||||
except StopIteration: |
||||
raise RuntimeError("Failed to receive call request") |
||||
# Simulate the acceptance of call request |
||||
time.sleep(1) |
||||
yield create_state_response(phone_pb2.CallState.NEW) |
||||
# Simulate the start of the call session |
||||
time.sleep(1) |
||||
call_info = self._create_call_session() |
||||
context.add_callback(lambda: self._clean_call_session(call_info)) |
||||
response = phone_pb2.StreamCallResponse() |
||||
response.call_info.session_id = call_info.session_id |
||||
response.call_info.media = call_info.media |
||||
yield response |
||||
yield create_state_response(phone_pb2.CallState.ACTIVE) |
||||
# Simulate the end of the call |
||||
time.sleep(2) |
||||
yield create_state_response(phone_pb2.CallState.ENDED) |
||||
logging.info("Call finished [%s]", request.phone_number) |
||||
|
||||
|
||||
def serve(address: str) -> None: |
||||
server = grpc.server(ThreadPoolExecutor()) |
||||
phone_pb2_grpc.add_PhoneServicer_to_server(Phone(), server) |
||||
server.add_insecure_port(address) |
||||
server.start() |
||||
logging.info("Server serving at %s", address) |
||||
server.wait_for_termination() |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
logging.basicConfig(level=logging.INFO) |
||||
serve("[::]:50051") |
Loading…
Reference in new issue