diff --git a/examples/python/cancellation/README.md b/examples/python/cancellation/README.md index 8d3e4767b9c..64ffa3cf1c7 100644 --- a/examples/python/cancellation/README.md +++ b/examples/python/cancellation/README.md @@ -8,6 +8,34 @@ A client may cancel an RPC for several reasons. Perhaps the data it requested has been made irrelevant. Perhaps you, as the client, want to be a good citizen of the server and are conserving compute resources. +##### Cancelling a Client-Side Unary RPC + +The default RPC methods on a stub will simply return the result of an RPC. + +```python +>>> stub = hash_name_pb2_grpc.HashFinderStub(channel) +>>> stub.Find(hash_name_pb2.HashNameRequest(desired_name=name)) + +``` + +But you may use the `future()` method to receive an instance of `grpc.Future`. +This interface allows you to wait on a response with a timeout, add a callback +to be executed when the RPC completes, or to cancel the RPC before it has +completed. + +In the example, we use this interface to cancel our in-progress RPC when the +user interrupts the process with ctrl-c. + +```python +stub = hash_name_pb2_grpc.HashFinderStub(channel) +future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name=name)) +def cancel_request(unused_signum, unused_frame): + future.cancel() +signal.signal(signal.SIGINT, cancel_request) +``` + +##### Cancelling a Client-Side Streaming RPC + #### Cancellation on the Server Side A server is reponsible for cancellation in two ways. It must respond in some way diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index f86a32af175..f57867a9ae6 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -23,6 +23,14 @@ import datetime import logging import time import signal +import threading + +try: + from queue import Queue + from queue import Empty as QueueEmpty +except ImportError: + from Queue import Queue + from Queue import Empty as QueueEmpty import grpc @@ -34,6 +42,8 @@ _LOGGER = logging.getLogger(__name__) _TIMEOUT_SECONDS = 0.05 +# TODO(rbellevi): Actually use the logger. + def run_unary_client(server_target, name, ideal_distance): with grpc.insecure_channel(server_target) as channel: stub = hash_name_pb2_grpc.HashFinderStub(channel) @@ -55,9 +65,43 @@ def run_unary_client(server_target, name, ideal_distance): break -def run_streaming_client(target, name, ideal_distance, interesting_distance): - pass +def run_streaming_client(server_target, name, ideal_distance, interesting_distance): + with grpc.insecure_channel(server_target) as channel: + stub = hash_name_pb2_grpc.HashFinderStub(channel) + print("Initiating RPC") + result_generator = stub.FindRange(hash_name_pb2.HashNameRequest(desired_name=name, + ideal_hamming_distance=ideal_distance, + interesting_hamming_distance=interesting_distance)) + def cancel_request(unused_signum, unused_frame): + print("Cancelling request.") + result_generator.cancel() + signal.signal(signal.SIGINT, cancel_request) + result_queue = Queue() + + def iterate_responses(result_generator, result_queue): + try: + for result in result_generator: + print("Result: {}".format(result)) + result_queue.put(result) + except grpc.RpcError as rpc_error: + if rpc_error.code() != grpc.StatusCode.CANCELLED: + result_queue.put(None) + raise rpc_error + # Enqueue a sentinel to signal the end of the stream. + result_queue.put(None) + print("RPC complete") + response_thread = threading.Thread(target=iterate_responses, args=(result_generator, result_queue)) + response_thread.daemon = True + response_thread.start() + while result_generator.running(): + try: + result = result_queue.get(timeout=_TIMEOUT_SECONDS) + except QueueEmpty: + continue + if result is None: + break + print("Got result: {}".format(result)) def main(): parser = argparse.ArgumentParser(description=_DESCRIPTION) @@ -79,7 +123,7 @@ def main(): args = parser.parse_args() if args.show_inferior is not None: - run_streaming_client(args.server, args.name, args.ideal_distance, args.interesting_distance) + run_streaming_client(args.server, args.name, args.ideal_distance, args.show_inferior) else: run_unary_client(args.server, args.name, args.ideal_distance) diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 575c2fc8e74..12dbc5653ff 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -32,6 +32,8 @@ import grpc from examples.python.cancellation import hash_name_pb2 from examples.python.cancellation import hash_name_pb2_grpc +# TODO(rbellevi): Actually use the logger. +# TODO(rbellevi): Enforce per-user quotas with cancellation _BYTE_MAX = 255 @@ -116,11 +118,11 @@ def _find_secret_of_length(target, ideal_distance, length, stop_event, interesti digits[i] += 1 -def _find_secret(target, maximum_distance, stop_event): +def _find_secret(target, maximum_distance, stop_event, interesting_hamming_distance=None): length = 1 while True: print("Checking strings of length {}.".format(length)) - for candidate in _find_secret_of_length(target, maximum_distance, length, stop_event): + for candidate in _find_secret_of_length(target, maximum_distance, length, stop_event, interesting_hamming_distance=interesting_hamming_distance): if candidate is not None: yield candidate else: @@ -150,6 +152,7 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): def FindRange(self, request, context): stop_event = threading.Event() def on_rpc_done(): + print("Attempting to regain servicer thread.") stop_event.set() context.add_callback(on_rpc_done) secret_generator = _find_secret(request.desired_name, @@ -158,6 +161,7 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): interesting_hamming_distance=request.interesting_hamming_distance) for candidate in secret_generator: yield candidate + print("Regained servicer thread.") def _run_server(port):