diff --git a/examples/python/cancellation/README.md b/examples/python/cancellation/README.md index b085c9bc016..d5f9a844fcc 100644 --- a/examples/python/cancellation/README.md +++ b/examples/python/cancellation/README.md @@ -166,3 +166,18 @@ secret = _find_secret(stop_event) Initiating a cancellation from the server side is simpler. Just call `ServicerContext.cancel()`. + +In our example, we ensure that no single client is monopolizing the server by +cancelling after a configurable number of hashes have been checked. + +```python +try: + for candidate in secret_generator: + yield candidate +except ResourceLimitExceededError: + print("Cancelling RPC due to exhausted resources.") + context.cancel() +``` + +In this type of situation, you may also consider returning a more specific error +using the [`grpcio-status`](https://pypi.org/project/grpcio-status/) package. diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index f57867a9ae6..f97a8c05e50 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -47,11 +47,9 @@ _TIMEOUT_SECONDS = 0.05 def run_unary_client(server_target, name, ideal_distance): with grpc.insecure_channel(server_target) as channel: stub = hash_name_pb2_grpc.HashFinderStub(channel) - print("Sending request") future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name=name, ideal_hamming_distance=ideal_distance)) def cancel_request(unused_signum, unused_frame): - print("Cancelling request.") future.cancel() signal.signal(signal.SIGINT, cancel_request) while True: @@ -61,19 +59,17 @@ def run_unary_client(server_target, name, ideal_distance): continue except grpc.FutureCancelledError: break - print("Got response: \n{}".format(result)) + print(result) break def run_streaming_client(server_target, name, ideal_distance, interesting_distance): with grpc.insecure_channel(server_target) as channel: stub = hash_name_pb2_grpc.HashFinderStub(channel) - print("Initiating RPC") result_generator = stub.FindRange(hash_name_pb2.HashNameRequest(desired_name=name, ideal_hamming_distance=ideal_distance, interesting_hamming_distance=interesting_distance)) def cancel_request(unused_signum, unused_frame): - print("Cancelling request.") result_generator.cancel() signal.signal(signal.SIGINT, cancel_request) result_queue = Queue() @@ -81,7 +77,6 @@ def run_streaming_client(server_target, name, ideal_distance, interesting_distan def iterate_responses(result_generator, result_queue): try: for result in result_generator: - print("Result: {}".format(result)) result_queue.put(result) except grpc.RpcError as rpc_error: if rpc_error.code() != grpc.StatusCode.CANCELLED: @@ -89,7 +84,6 @@ def run_streaming_client(server_target, name, ideal_distance, interesting_distan raise rpc_error # Enqueue a sentinel to signal the end of the stream. result_queue.put(None) - print("RPC complete") response_thread = threading.Thread(target=iterate_responses, args=(result_generator, result_queue)) response_thread.daemon = True response_thread.start() @@ -101,7 +95,7 @@ def run_streaming_client(server_target, name, ideal_distance, interesting_distan continue if result is None: break - print("Got result: {}".format(result)) + print(result) def main(): parser = argparse.ArgumentParser(description=_DESCRIPTION) diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 3eb5f0bd45b..a2e8e947746 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -32,9 +32,6 @@ import grpc from examples.python.cancellation import hash_name_pb2 from examples.python.cancellation import hash_name_pb2_grpc -# TODO(rbellevi): Actually use the logger. -# TODO(rbellevi): Enforce per-user quotas with cancellation - _BYTE_MAX = 255 _LOGGER = logging.getLogger(__name__) @@ -134,7 +131,6 @@ def _find_secret(target, maximum_distance, stop_event, maximum_hashes, interesti length = 1 total_hashes = 0 while True: - print("Checking strings of length {}.".format(length)) last_hashes_computed = 0 for candidate, hashes_computed in _find_secret_of_length(target, maximum_distance, length, stop_event, maximum_hashes - total_hashes, interesting_hamming_distance=interesting_hamming_distance): last_hashes_computed = hashes_computed @@ -146,7 +142,6 @@ def _find_secret(target, maximum_distance, stop_event, maximum_hashes, interesti # Terminate the generator if the RPC has been cancelled. raise StopIteration() total_hashes += last_hashes_computed - print("Incrementing length") length += 1 @@ -159,15 +154,15 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): def Find(self, request, context): stop_event = threading.Event() def on_rpc_done(): - print("Attempting to regain servicer thread.") + _LOGGER.debug("Attempting to regain servicer thread.") stop_event.set() context.add_callback(on_rpc_done) try: candidates = list(_find_secret(request.desired_name, request.ideal_hamming_distance, stop_event, self._maximum_hashes)) except ResourceLimitExceededError: - print("Cancelling RPC due to exhausted resources.") + _LOGGER.info("Cancelling RPC due to exhausted resources.") context.cancel() - print("Servicer thread returning.") + _LOGGER.debug("Servicer thread returning.") if not candidates: return hash_name_pb2.HashNameResponse() return candidates[-1] @@ -176,7 +171,7 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): def FindRange(self, request, context): stop_event = threading.Event() def on_rpc_done(): - print("Attempting to regain servicer thread.") + _LOGGER.debug("Attempting to regain servicer thread.") stop_event.set() context.add_callback(on_rpc_done) secret_generator = _find_secret(request.desired_name, @@ -188,9 +183,9 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): for candidate in secret_generator: yield candidate except ResourceLimitExceededError: - print("Cancelling RPC due to exhausted resources.") - context.cancel - print("Regained servicer thread.") + _LOGGER.info("Cancelling RPC due to exhausted resources.") + context.cancel() + _LOGGER.debug("Regained servicer thread.") def _run_server(port, maximum_hashes):