diff --git a/examples/python/debug/BUILD.bazel b/examples/python/debug/BUILD.bazel index 0459647b945..9cc8e1672f6 100644 --- a/examples/python/debug/BUILD.bazel +++ b/examples/python/debug/BUILD.bazel @@ -14,33 +14,35 @@ load("@grpc_python_dependencies//:requirements.bzl", "requirement") +package(default_testonly = 1) + py_binary( name = "debug_server", - testonly = 1, srcs = ["debug_server.py"], + data = ["helloworld.proto"], + imports = ["."], + python_version = "PY3", deps = [ - "//examples/protos:helloworld_py_pb2", - "//examples/protos:helloworld_py_pb2_grpc", "//src/python/grpcio/grpc:grpcio", "//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz", + "//tools/distrib/python/grpcio_tools:grpc_tools", ], ) py_binary( name = "send_message", - testonly = 1, srcs = ["send_message.py"], + data = ["helloworld.proto"], + imports = ["."], python_version = "PY3", deps = [ - "//examples/protos:helloworld_py_pb2", - "//examples/protos:helloworld_py_pb2_grpc", "//src/python/grpcio/grpc:grpcio", + "//tools/distrib/python/grpcio_tools:grpc_tools", ], ) py_binary( name = "get_stats", - testonly = 1, srcs = ["get_stats.py"], python_version = "PY3", deps = [ @@ -49,17 +51,52 @@ py_binary( ], ) +py_binary( + name = "asyncio_debug_server", + srcs = ["asyncio_debug_server.py"], + data = ["helloworld.proto"], + imports = ["."], + python_version = "PY3", + deps = [ + "//src/python/grpcio/grpc:grpcio", + "//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz", + "//tools/distrib/python/grpcio_tools:grpc_tools", + ], +) + +py_binary( + name = "asyncio_send_message", + srcs = ["asyncio_send_message.py"], + data = ["helloworld.proto"], + imports = ["."], + python_version = "PY3", + deps = [ + "//src/python/grpcio/grpc:grpcio", + "//tools/distrib/python/grpcio_tools:grpc_tools", + ], +) + +py_binary( + name = "asyncio_get_stats", + srcs = ["asyncio_get_stats.py"], + python_version = "PY3", + deps = [ + "//src/python/grpcio/grpc:grpcio", + "//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz", + ], +) + py_test( name = "_debug_example_test", srcs = ["test/_debug_example_test.py"], + imports = ["."], python_version = "PY3", deps = [ + ":asyncio_debug_server", + ":asyncio_get_stats", + ":asyncio_send_message", ":debug_server", ":get_stats", ":send_message", - "//examples/protos:helloworld_py_pb2", - "//examples/protos:helloworld_py_pb2_grpc", - "//src/python/grpcio/grpc:grpcio", - "//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz", ], ) diff --git a/examples/python/debug/asyncio_debug_server.py b/examples/python/debug/asyncio_debug_server.py new file mode 100644 index 00000000000..09b6e2dbda6 --- /dev/null +++ b/examples/python/debug/asyncio_debug_server.py @@ -0,0 +1,83 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""The Python AsyncIO example of utilizing Channelz feature.""" + +import asyncio +import argparse +import logging +import random + +import grpc + +helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services( + "helloworld.proto") + +# TODO: Suppress until the macOS segfault fix rolled out +from grpc_channelz.v1 import channelz # pylint: disable=wrong-import-position + +_LOGGER = logging.getLogger(__name__) +_LOGGER.setLevel(logging.INFO) + +_RANDOM_FAILURE_RATE = 0.3 + + +class FaultInjectGreeter(helloworld_pb2_grpc.GreeterServicer): + + def __init__(self, failure_rate): + self._failure_rate = failure_rate + + async def SayHello(self, request: helloworld_pb2.HelloRequest, + context: grpc.aio.ServicerContext + ) -> helloworld_pb2.HelloReply: + if random.random() < self._failure_rate: + context.abort(grpc.StatusCode.UNAVAILABLE, + 'Randomly injected failure.') + return helloworld_pb2.HelloReply(message=f'Hello, {request.name}!') + + +def create_server(addr: str, failure_rate: float) -> grpc.aio.Server: + server = grpc.aio.server() + helloworld_pb2_grpc.add_GreeterServicer_to_server( + FaultInjectGreeter(failure_rate), server) + + # Add Channelz Servicer to the gRPC server + channelz.add_channelz_servicer(server) + + server.add_insecure_port(addr) + return server + + +async def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--addr', + nargs=1, + type=str, + default='[::]:50051', + help='the address to listen on') + parser.add_argument( + '--failure_rate', + nargs=1, + type=float, + default=0.3, + help='a float indicates the percentage of failed message injections') + args = parser.parse_args() + + server = create_server(addr=args.addr, failure_rate=args.failure_rate) + await server.start() + await server.wait_for_termination() + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + asyncio.get_event_loop().run_until_complete(main()) diff --git a/examples/python/debug/asyncio_get_stats.py b/examples/python/debug/asyncio_get_stats.py new file mode 100644 index 00000000000..a421f8aa9b6 --- /dev/null +++ b/examples/python/debug/asyncio_get_stats.py @@ -0,0 +1,46 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Poll statistics from the server.""" + +import asyncio +import logging +import argparse +import grpc + +from grpc_channelz.v1 import channelz_pb2 +from grpc_channelz.v1 import channelz_pb2_grpc + + +async def run(addr: str) -> None: + async with grpc.aio.insecure_channel(addr) as channel: + channelz_stub = channelz_pb2_grpc.ChannelzStub(channel) + response = await channelz_stub.GetServers( + channelz_pb2.GetServersRequest(start_server_id=0)) + print('Info for all servers: %s' % response) + + +async def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--addr', + nargs=1, + type=str, + default='[::]:50051', + help='the address to request') + args = parser.parse_args() + run(addr=args.addr) + + +if __name__ == '__main__': + logging.basicConfig() + asyncio.get_event_loop().run_until_complete(main()) diff --git a/examples/python/debug/asyncio_send_message.py b/examples/python/debug/asyncio_send_message.py new file mode 100644 index 00000000000..c3923990a2c --- /dev/null +++ b/examples/python/debug/asyncio_send_message.py @@ -0,0 +1,61 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Send multiple greeting messages to the backend.""" + +import asyncio +import logging +import argparse +import grpc + +helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services( + "helloworld.proto") + + +async def process(stub: helloworld_pb2_grpc.GreeterStub, + request: helloworld_pb2.HelloRequest) -> None: + try: + response = await stub.SayHello(request) + except grpc.aio.AioRpcError as rpc_error: + print(f'Received error: {rpc_error}') + else: + print(f'Received message: {response}') + + +async def run(addr: str, n: int) -> None: + async with grpc.aio.insecure_channel(addr) as channel: + stub = helloworld_pb2_grpc.GreeterStub(channel) + request = helloworld_pb2.HelloRequest(name='you') + for _ in range(n): + await process(stub, request) + + +async def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument('--addr', + nargs=1, + type=str, + default='[::]:50051', + help='the address to request') + parser.add_argument('-n', + nargs=1, + type=int, + default=10, + help='an integer for number of messages to sent') + args = parser.parse_args() + await run(addr=args.addr, n=args.n) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + asyncio.get_event_loop().run_until_complete(main()) diff --git a/examples/python/debug/debug_server.py b/examples/python/debug/debug_server.py index 9e47a83895a..f627d3b3917 100644 --- a/examples/python/debug/debug_server.py +++ b/examples/python/debug/debug_server.py @@ -23,10 +23,12 @@ from concurrent import futures import random import grpc -from grpc_channelz.v1 import channelz -from examples.protos import helloworld_pb2 -from examples.protos import helloworld_pb2_grpc +helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services( + "helloworld.proto") + +# TODO: Suppress until the macOS segfault fix rolled out +from grpc_channelz.v1 import channelz # pylint: disable=wrong-import-position _LOGGER = logging.getLogger(__name__) _LOGGER.setLevel(logging.INFO) diff --git a/examples/python/debug/get_stats.py b/examples/python/debug/get_stats.py index 2888182f843..398e1c014bd 100644 --- a/examples/python/debug/get_stats.py +++ b/examples/python/debug/get_stats.py @@ -28,9 +28,11 @@ from grpc_channelz.v1 import channelz_pb2_grpc def run(addr): with grpc.insecure_channel(addr) as channel: channelz_stub = channelz_pb2_grpc.ChannelzStub(channel) - response = channelz_stub.GetServers( - channelz_pb2.GetServersRequest(start_server_id=0)) - print('Info for all servers: %s' % response) + # This RPC pulls server-level metrics, like sent/received messages, + # succeeded/failed RPCs. For more info see: + # https://github.com/grpc/grpc/blob/master/src/proto/grpc/channelz/channelz.proto + response = channelz_stub.GetServers(channelz_pb2.GetServersRequest()) + print(f'Info for all servers: {response}') def main(): diff --git a/examples/python/debug/helloworld.proto b/examples/python/debug/helloworld.proto new file mode 120000 index 00000000000..a052c1c1956 --- /dev/null +++ b/examples/python/debug/helloworld.proto @@ -0,0 +1 @@ +../../protos/helloworld.proto \ No newline at end of file diff --git a/examples/python/debug/send_message.py b/examples/python/debug/send_message.py index 9ca85375d16..3f0219954bf 100644 --- a/examples/python/debug/send_message.py +++ b/examples/python/debug/send_message.py @@ -20,8 +20,9 @@ from __future__ import print_function import logging import argparse import grpc -from examples.protos import helloworld_pb2 -from examples.protos import helloworld_pb2_grpc + +helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services( + "helloworld.proto") def process(stub, request): diff --git a/examples/python/debug/test/_debug_example_test.py b/examples/python/debug/test/_debug_example_test.py index 26a40ae5a43..efd7dbe104d 100644 --- a/examples/python/debug/test/_debug_example_test.py +++ b/examples/python/debug/test/_debug_example_test.py @@ -13,16 +13,16 @@ # limitations under the License. """Test for gRPC Python debug example.""" -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - +import asyncio import logging import unittest from examples.python.debug import debug_server +from examples.python.debug import asyncio_debug_server from examples.python.debug import send_message +from examples.python.debug import asyncio_send_message from examples.python.debug import get_stats +from examples.python.debug import asyncio_get_stats _LOGGER = logging.getLogger(__name__) _LOGGER.setLevel(logging.INFO) @@ -47,7 +47,23 @@ class DebugExampleTest(unittest.TestCase): server.stop(None) # No unhandled exception raised, test passed! + def test_asyncio_channelz_example(self): + + async def body(): + server = asyncio_debug_server.create_server( + addr='[::]:0', failure_rate=_FAILURE_RATE) + port = server.add_insecure_port('[::]:0') + await server.start() + address = _ADDR_TEMPLATE % port + + await asyncio_send_message.run(addr=address, n=_NUMBER_OF_MESSAGES) + await asyncio_get_stats.run(addr=address) + await server.stop(None) + # No unhandled exception raised, test passed! + + asyncio.get_event_loop().run_until_complete(body()) + if __name__ == '__main__': - logging.basicConfig() + logging.basicConfig(level=logging.DEBUG) unittest.main(verbosity=2) diff --git a/examples/python/route_guide/asyncio_route_guide_client.py b/examples/python/route_guide/asyncio_route_guide_client.py new file mode 100644 index 00000000000..3606f4837f0 --- /dev/null +++ b/examples/python/route_guide/asyncio_route_guide_client.py @@ -0,0 +1,129 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""The Python AsyncIO implementation of the gRPC route guide client.""" + +import asyncio +import random +import logging +from typing import List, Iterable + +import grpc + +import route_guide_pb2 +import route_guide_pb2_grpc +import route_guide_resources + + +def make_route_note(message: str, latitude: int, + longitude: int) -> route_guide_pb2.RouteNote: + return route_guide_pb2.RouteNote( + message=message, + location=route_guide_pb2.Point(latitude=latitude, longitude=longitude)) + + +# Performs an unary call +async def guide_get_one_feature(stub: route_guide_pb2_grpc.RouteGuideStub, + point: route_guide_pb2.Point) -> None: + feature = await stub.GetFeature(point) + if not feature.location: + print("Server returned incomplete feature") + return + + if feature.name: + print(f"Feature called {feature.name} at {feature.location}") + else: + print(f"Found no feature at {feature.location}") + + +async def guide_get_feature(stub: route_guide_pb2_grpc.RouteGuideStub) -> None: + await guide_get_one_feature( + stub, route_guide_pb2.Point(latitude=409146138, longitude=-746188906)) + await guide_get_one_feature(stub, + route_guide_pb2.Point(latitude=0, longitude=0)) + + +# Performs a server-streaming call +async def guide_list_features(stub: route_guide_pb2_grpc.RouteGuideStub + ) -> None: + rectangle = route_guide_pb2.Rectangle( + lo=route_guide_pb2.Point(latitude=400000000, longitude=-750000000), + hi=route_guide_pb2.Point(latitude=420000000, longitude=-730000000)) + print("Looking for features between 40, -75 and 42, -73") + + features = stub.ListFeatures(rectangle) + + async for feature in features: + print(f"Feature called {feature.name} at {feature.location}") + + +def generate_route(feature_list: List[route_guide_pb2.Feature] + ) -> Iterable[route_guide_pb2.Point]: + for _ in range(0, 10): + random_feature = random.choice(feature_list) + print(f"Visiting point {random_feature.location}") + yield random_feature.location + + +# Performs a client-streaming call +async def guide_record_route(stub: route_guide_pb2_grpc.RouteGuideStub) -> None: + feature_list = route_guide_resources.read_route_guide_database() + route_iterator = generate_route(feature_list) + + # gRPC AsyncIO client-streaming RPC API accepts both synchronous iterables + # and async iterables. + route_summary = await stub.RecordRoute(route_iterator) + print(f"Finished trip with {route_summary.point_count} points") + print(f"Passed {route_summary.feature_count} features") + print(f"Travelled {route_summary.distance} meters") + print(f"It took {route_summary.elapsed_time} seconds") + + +def generate_messages() -> Iterable[route_guide_pb2.RouteNote]: + messages = [ + make_route_note("First message", 0, 0), + make_route_note("Second message", 0, 1), + make_route_note("Third message", 1, 0), + make_route_note("Fourth message", 0, 0), + make_route_note("Fifth message", 1, 0), + ] + for msg in messages: + print(f"Sending {msg.message} at {msg.location}") + yield msg + + +# Performs a bidi-streaming call +async def guide_route_chat(stub: route_guide_pb2_grpc.RouteGuideStub) -> None: + # gRPC AsyncIO bidi-streaming RPC API accepts both synchronous iterables + # and async iterables. + call = stub.RouteChat(generate_messages()) + async for response in call: + print(f"Received message {response.message} at {response.location}") + + +async def main() -> None: + async with grpc.aio.insecure_channel('localhost:50051') as channel: + stub = route_guide_pb2_grpc.RouteGuideStub(channel) + print("-------------- GetFeature --------------") + await guide_get_feature(stub) + print("-------------- ListFeatures --------------") + await guide_list_features(stub) + print("-------------- RecordRoute --------------") + await guide_record_route(stub) + print("-------------- RouteChat --------------") + await guide_route_chat(stub) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + asyncio.get_event_loop().run_until_complete(main()) diff --git a/examples/python/route_guide/asyncio_route_guide_server.py b/examples/python/route_guide/asyncio_route_guide_server.py new file mode 100644 index 00000000000..22c1908eded --- /dev/null +++ b/examples/python/route_guide/asyncio_route_guide_server.py @@ -0,0 +1,134 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""The Python AsyncIO implementation of the gRPC route guide server.""" + +import asyncio +import time +import math +import logging +from typing import AsyncIterable, Iterable + +import grpc + +import route_guide_pb2 +import route_guide_pb2_grpc +import route_guide_resources + + +def get_feature(feature_db: Iterable[route_guide_pb2.Feature], + point: route_guide_pb2.Point) -> route_guide_pb2.Feature: + """Returns Feature at given location or None.""" + for feature in feature_db: + if feature.location == point: + return feature + return None + + +def get_distance(start: route_guide_pb2.Point, + end: route_guide_pb2.Point) -> float: + """Distance between two points.""" + coord_factor = 10000000.0 + lat_1 = start.latitude / coord_factor + lat_2 = end.latitude / coord_factor + lon_1 = start.longitude / coord_factor + lon_2 = end.longitude / coord_factor + lat_rad_1 = math.radians(lat_1) + lat_rad_2 = math.radians(lat_2) + delta_lat_rad = math.radians(lat_2 - lat_1) + delta_lon_rad = math.radians(lon_2 - lon_1) + + # Formula is based on http://mathforum.org/library/drmath/view/51879.html + a = (pow(math.sin(delta_lat_rad / 2), 2) + + (math.cos(lat_rad_1) * math.cos(lat_rad_2) * + pow(math.sin(delta_lon_rad / 2), 2))) + c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + R = 6371000 + # metres + return R * c + + +class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer): + """Provides methods that implement functionality of route guide server.""" + + def __init__(self) -> None: + self.db = route_guide_resources.read_route_guide_database() + + def GetFeature(self, request: route_guide_pb2.Point, + unused_context) -> route_guide_pb2.Feature: + feature = get_feature(self.db, request) + if feature is None: + return route_guide_pb2.Feature(name="", location=request) + else: + return feature + + async def ListFeatures(self, request: route_guide_pb2.Rectangle, + unused_context + ) -> AsyncIterable[route_guide_pb2.Feature]: + left = min(request.lo.longitude, request.hi.longitude) + right = max(request.lo.longitude, request.hi.longitude) + top = max(request.lo.latitude, request.hi.latitude) + bottom = min(request.lo.latitude, request.hi.latitude) + for feature in self.db: + if (feature.location.longitude >= left and + feature.location.longitude <= right and + feature.location.latitude >= bottom and + feature.location.latitude <= top): + yield feature + + async def RecordRoute( + self, request_iterator: AsyncIterable[route_guide_pb2.Point], + unused_context) -> route_guide_pb2.RouteSummary: + point_count = 0 + feature_count = 0 + distance = 0.0 + prev_point = None + + start_time = time.time() + async for point in request_iterator: + point_count += 1 + if get_feature(self.db, point): + feature_count += 1 + if prev_point: + distance += get_distance(prev_point, point) + prev_point = point + + elapsed_time = time.time() - start_time + return route_guide_pb2.RouteSummary(point_count=point_count, + feature_count=feature_count, + distance=int(distance), + elapsed_time=int(elapsed_time)) + + async def RouteChat( + self, request_iterator: AsyncIterable[route_guide_pb2.RouteNote], + unused_context) -> AsyncIterable[route_guide_pb2.RouteNote]: + prev_notes = [] + async for new_note in request_iterator: + for prev_note in prev_notes: + if prev_note.location == new_note.location: + yield prev_note + prev_notes.append(new_note) + + +async def serve() -> None: + server = grpc.aio.server() + route_guide_pb2_grpc.add_RouteGuideServicer_to_server( + RouteGuideServicer(), server) + server.add_insecure_port('[::]:50051') + await server.start() + await server.wait_for_termination() + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + asyncio.get_event_loop().run_until_complete(serve()) diff --git a/examples/python/wait_for_ready/BUILD.bazel b/examples/python/wait_for_ready/BUILD.bazel index d9a278457c9..e783af88dce 100644 --- a/examples/python/wait_for_ready/BUILD.bazel +++ b/examples/python/wait_for_ready/BUILD.bazel @@ -14,14 +14,29 @@ load("@grpc_python_dependencies//:requirements.bzl", "requirement") -py_library( +package(default_testonly = 1) + +py_binary( name = "wait_for_ready_example", - testonly = 1, srcs = ["wait_for_ready_example.py"], + data = ["helloworld.proto"], + imports = ["."], + python_version = "PY3", + deps = [ + "//src/python/grpcio/grpc:grpcio", + "//tools/distrib/python/grpcio_tools:grpc_tools", + ], +) + +py_binary( + name = "asyncio_wait_for_ready_example", + srcs = ["asyncio_wait_for_ready_example.py"], + data = ["helloworld.proto"], + imports = ["."], + python_version = "PY3", deps = [ - "//examples/protos:helloworld_py_pb2", - "//examples/protos:helloworld_py_pb2_grpc", "//src/python/grpcio/grpc:grpcio", + "//tools/distrib/python/grpcio_tools:grpc_tools", ], ) @@ -30,5 +45,8 @@ py_test( size = "small", srcs = ["test/_wait_for_ready_example_test.py"], python_version = "PY3", - deps = [":wait_for_ready_example"], + deps = [ + ":asyncio_wait_for_ready_example", + ":wait_for_ready_example", + ], ) diff --git a/examples/python/wait_for_ready/asyncio_wait_for_ready_example.py b/examples/python/wait_for_ready/asyncio_wait_for_ready_example.py new file mode 100644 index 00000000000..f4499090852 --- /dev/null +++ b/examples/python/wait_for_ready/asyncio_wait_for_ready_example.py @@ -0,0 +1,109 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""The Python example of utilizing wait-for-ready flag.""" + +import asyncio +import logging +from contextlib import contextmanager +import socket +from typing import Iterable + +import grpc + +helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services( + "helloworld.proto") + +_LOGGER = logging.getLogger(__name__) +_LOGGER.setLevel(logging.INFO) + + +@contextmanager +def get_free_loopback_tcp_port() -> Iterable[str]: + if socket.has_ipv6: + tcp_socket = socket.socket(socket.AF_INET6) + else: + tcp_socket = socket.socket(socket.AF_INET) + tcp_socket.bind(('', 0)) + address_tuple = tcp_socket.getsockname() + yield f"localhost:{address_tuple[1]}" + tcp_socket.close() + + +class Greeter(helloworld_pb2_grpc.GreeterServicer): + + async def SayHello(self, request: helloworld_pb2.HelloRequest, + unused_context) -> helloworld_pb2.HelloReply: + return helloworld_pb2.HelloReply(message=f'Hello, {request.name}!') + + +def create_server(server_address: str): + server = grpc.aio.server() + helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) + bound_port = server.add_insecure_port(server_address) + assert bound_port == int(server_address.split(':')[-1]) + return server + + +async def process(stub: helloworld_pb2_grpc.GreeterStub, + wait_for_ready: bool = None) -> None: + try: + response = await stub.SayHello(helloworld_pb2.HelloRequest(name='you'), + wait_for_ready=wait_for_ready) + message = response.message + except grpc.aio.AioRpcError as rpc_error: + assert rpc_error.code() == grpc.StatusCode.UNAVAILABLE + assert not wait_for_ready + message = rpc_error + else: + assert wait_for_ready + _LOGGER.info("Wait-for-ready %s, client received: %s", + "enabled" if wait_for_ready else "disabled", message) + + +async def main() -> None: + # Pick a random free port + with get_free_loopback_tcp_port() as server_address: + # Create gRPC channel + channel = grpc.aio.insecure_channel(server_address) + stub = helloworld_pb2_grpc.GreeterStub(channel) + + # Fire an RPC without wait_for_ready + fail_fast_task = asyncio.get_event_loop().create_task( + process(stub, wait_for_ready=False)) + # Fire an RPC with wait_for_ready + wait_for_ready_task = asyncio.get_event_loop().create_task( + process(stub, wait_for_ready=True)) + + # Wait for the channel entering TRANSIENT FAILURE state. + state = channel.get_state() + while state != grpc.ChannelConnectivity.TRANSIENT_FAILURE: + await channel.wait_for_state_change(state) + state = channel.get_state() + + # Start the server to handle the RPC + server = create_server(server_address) + await server.start() + + # Expected to fail with StatusCode.UNAVAILABLE. + await fail_fast_task + # Expected to success. + await wait_for_ready_task + + await server.stop(None) + await channel.close() + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + asyncio.get_event_loop().run_until_complete(main()) diff --git a/examples/python/wait_for_ready/helloworld.proto b/examples/python/wait_for_ready/helloworld.proto new file mode 120000 index 00000000000..a052c1c1956 --- /dev/null +++ b/examples/python/wait_for_ready/helloworld.proto @@ -0,0 +1 @@ +../../protos/helloworld.proto \ No newline at end of file diff --git a/examples/python/wait_for_ready/test/_wait_for_ready_example_test.py b/examples/python/wait_for_ready/test/_wait_for_ready_example_test.py index 03e83a12c56..24d9865b431 100644 --- a/examples/python/wait_for_ready/test/_wait_for_ready_example_test.py +++ b/examples/python/wait_for_ready/test/_wait_for_ready_example_test.py @@ -13,10 +13,12 @@ # limitations under the License. """Tests of the wait-for-ready example.""" +import asyncio import unittest import logging from examples.python.wait_for_ready import wait_for_ready_example +from examples.python.wait_for_ready import asyncio_wait_for_ready_example class WaitForReadyExampleTest(unittest.TestCase): @@ -25,7 +27,12 @@ class WaitForReadyExampleTest(unittest.TestCase): wait_for_ready_example.main() # No unhandled exception raised, no deadlock, test passed! + def test_asyncio_wait_for_ready_example(self): + asyncio.get_event_loop().run_until_complete( + asyncio_wait_for_ready_example.main()) + # No unhandled exception raised, no deadlock, test passed! + if __name__ == '__main__': - logging.basicConfig() + logging.basicConfig(level=logging.DEBUG) unittest.main(verbosity=2) diff --git a/examples/python/wait_for_ready/wait_for_ready_example.py b/examples/python/wait_for_ready/wait_for_ready_example.py index bddb0ac9205..6b6da869b4a 100644 --- a/examples/python/wait_for_ready/wait_for_ready_example.py +++ b/examples/python/wait_for_ready/wait_for_ready_example.py @@ -13,7 +13,6 @@ # limitations under the License. """The Python example of utilizing wait-for-ready flag.""" -from __future__ import print_function import logging from concurrent import futures from contextlib import contextmanager @@ -22,8 +21,8 @@ import threading import grpc -from examples.protos import helloworld_pb2 -from examples.protos import helloworld_pb2_grpc +helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services( + "helloworld.proto") _LOGGER = logging.getLogger(__name__) _LOGGER.setLevel(logging.INFO)