From 82aa4068c79ebb3ba37341199b078c9b6f910f8c Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 11:02:10 -0700 Subject: [PATCH] Elaborate on cancelling streaming RPCs --- examples/python/cancellation/README.md | 64 ++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/examples/python/cancellation/README.md b/examples/python/cancellation/README.md index 08aea49e2c9..e7f6b42106e 100644 --- a/examples/python/cancellation/README.md +++ b/examples/python/cancellation/README.md @@ -8,7 +8,7 @@ A client may cancel an RPC for several reasons. Perhaps the data it requested has been made irrelevant. Perhaps you, as the client, want to be a good citizen of the server and are conserving compute resources. -##### Cancelling a Client-Side Unary RPC +##### Cancelling a Server-Side Unary RPC from the Client The default RPC methods on a stub will simply return the result of an RPC. @@ -50,14 +50,72 @@ while True: ``` Here, we repeatedly block on a result for up to `_TIMEOUT_SECONDS`. Doing so -gives us a chance for the signal handlers to run. In the case that out timeout +gives the signal handlers a chance to run. In the case that our timeout was reached, we simply continue on in the loop. In the case that the RPC was cancelled (by our user's ctrl+c), we break out of the loop cleanly. Finally, if we received the result of the RPC, we print it out for the user and exit the loop. -##### Cancelling a Client-Side Streaming RPC +##### Cancelling a Server-Side Streaming RPC from the Client + +Cancelling a Server-side streaming RPC is even simpler from the perspective of +the gRPC API. The default stub method is already an instance of `grpc.Future`, +so the methods outlined above still apply. It is also a generator, so we may +iterate over it to yield the results of our RPC. + +```python +stub = hash_name_pb2_grpc.HashFinderStub(channel) +result_generator = stub.FindRange(hash_name_pb2.HashNameRequest(desired_name=name)) +def cancel_request(unused_signum, unused_frame): + result_generator.cancel() +signal.signal(signal.SIGINT, cancel_request) +``` + +However, the streaming case is complicated by the fact that there is no way to +propagate a timeout to Python generators. As a result, simply iterating over the +results of the RPC can block indefinitely and the signal handler may never run. +Instead, we iterate over the generator on another thread and retrieve the +results on the main thread with a synchronized `Queue`. + +```python +result_queue = Queue() +def iterate_responses(result_generator, result_queue): + try: + for result in result_generator: + result_queue.put(result) + except grpc.RpcError as rpc_error: + if rpc_error.code() != grpc.StatusCode.CANCELLED: + result_queue.put(None) + raise rpc_error + result_queue.put(None) + print("RPC complete") +response_thread = threading.Thread(target=iterate_responses, args=(result_generator, result_queue)) +response_thread.daemon = True +response_thread.start() +``` + +While this thread iterating over the results may block indefinitely, we can +structure the code running on our main thread in such a way that signal handlers +are guaranteed to be run at least every `_TIMEOUT_SECONDS` seconds. + +```python +while result_generator.running(): + try: + result = result_queue.get(timeout=_TIMEOUT_SECONDS) + except QueueEmpty: + continue + if result is None: + break + print("Got result: {}".format(result)) +``` + +Similarly to the unary example above, we continue in a loop waiting for results, +taking care to block for intervals of `_TIMEOUT_SECONDS` at the longest. +Finally, we use `None` as a sentinel value to signal the end of the stream. + +Using this scheme, our process responds nicely to `SIGINT`s while also +explicitly cancelling its RPCs. #### Cancellation on the Server Side