From 4c852bf25f30b618e412c0a1e9dfb2f33cc75478 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 12:50:10 -0700 Subject: [PATCH] Cancel RPCs after a hash limit has been reached --- examples/python/cancellation/README.md | 2 +- examples/python/cancellation/server.py | 61 ++++++++++++++++++++------ 2 files changed, 49 insertions(+), 14 deletions(-) diff --git a/examples/python/cancellation/README.md b/examples/python/cancellation/README.md index 7329ed834e8..b085c9bc016 100644 --- a/examples/python/cancellation/README.md +++ b/examples/python/cancellation/README.md @@ -162,7 +162,7 @@ context.add_callback(on_rpc_done) secret = _find_secret(stop_event) ``` -##### Initiating a Cancellation from a Servicer +##### Initiating a Cancellation on the Server Side Initiating a cancellation from the server side is simpler. Just call `ServicerContext.cancel()`. diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 12dbc5653ff..3eb5f0bd45b 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -81,13 +81,22 @@ def _get_hash(secret): return base64.b64encode(hasher.digest()) -def _find_secret_of_length(target, ideal_distance, length, stop_event, interesting_hamming_distance=None): +class ResourceLimitExceededError(Exception): + """Signifies the request has exceeded configured limits.""" + +# TODO(rbellevi): Docstring all the things. +# TODO(rbellevi): File issue about indefinite blocking for server-side +# streaming. + + +def _find_secret_of_length(target, ideal_distance, length, stop_event, maximum_hashes, interesting_hamming_distance=None): digits = [0] * length + hashes_computed = 0 while True: if stop_event.is_set(): # Yield a sentinel and stop the generator if the RPC has been # cancelled. - yield None + yield None, hashes_computed raise StopIteration() secret = b''.join(struct.pack('B', i) for i in digits) hash = _get_hash(secret) @@ -96,14 +105,14 @@ def _find_secret_of_length(target, ideal_distance, length, stop_event, interesti # Surface interesting candidates, but don't stop. yield hash_name_pb2.HashNameResponse(secret=base64.b64encode(secret), hashed_name=hash, - hamming_distance=distance) + hamming_distance=distance), hashes_computed elif distance <= ideal_distance: # Yield the ideal candidate followed by a sentinel to signal the end # of the stream. yield hash_name_pb2.HashNameResponse(secret=base64.b64encode(secret), hashed_name=hash, - hamming_distance=distance) - yield None + hamming_distance=distance), hashes_computed + yield None, hashes_computed raise StopIteration() digits[-1] += 1 i = length - 1 @@ -116,13 +125,19 @@ def _find_secret_of_length(target, ideal_distance, length, stop_event, interesti raise StopIteration() else: digits[i] += 1 + hashes_computed += 1 + if hashes_computed == maximum_hashes: + raise ResourceLimitExceededError() -def _find_secret(target, maximum_distance, stop_event, interesting_hamming_distance=None): +def _find_secret(target, maximum_distance, stop_event, maximum_hashes, interesting_hamming_distance=None): length = 1 + total_hashes = 0 while True: print("Checking strings of length {}.".format(length)) - for candidate in _find_secret_of_length(target, maximum_distance, length, stop_event, interesting_hamming_distance=interesting_hamming_distance): + last_hashes_computed = 0 + for candidate, hashes_computed in _find_secret_of_length(target, maximum_distance, length, stop_event, maximum_hashes - total_hashes, interesting_hamming_distance=interesting_hamming_distance): + last_hashes_computed = hashes_computed if candidate is not None: yield candidate else: @@ -130,19 +145,28 @@ def _find_secret(target, maximum_distance, stop_event, interesting_hamming_dista if stop_event.is_set(): # Terminate the generator if the RPC has been cancelled. raise StopIteration() + total_hashes += last_hashes_computed print("Incrementing length") length += 1 class HashFinder(hash_name_pb2_grpc.HashFinderServicer): + def __init__(self, maximum_hashes): + super(HashFinder, self).__init__() + self._maximum_hashes = maximum_hashes + def Find(self, request, context): stop_event = threading.Event() def on_rpc_done(): print("Attempting to regain servicer thread.") stop_event.set() context.add_callback(on_rpc_done) - candidates = list(_find_secret(request.desired_name, request.ideal_hamming_distance, stop_event)) + try: + candidates = list(_find_secret(request.desired_name, request.ideal_hamming_distance, stop_event, self._maximum_hashes)) + except ResourceLimitExceededError: + print("Cancelling RPC due to exhausted resources.") + context.cancel() print("Servicer thread returning.") if not candidates: return hash_name_pb2.HashNameResponse() @@ -158,17 +182,22 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): secret_generator = _find_secret(request.desired_name, request.ideal_hamming_distance, stop_event, + self._maximum_hashes, interesting_hamming_distance=request.interesting_hamming_distance) - for candidate in secret_generator: - yield candidate + try: + for candidate in secret_generator: + yield candidate + except ResourceLimitExceededError: + print("Cancelling RPC due to exhausted resources.") + context.cancel print("Regained servicer thread.") -def _run_server(port): +def _run_server(port, maximum_hashes): server = grpc.server(futures.ThreadPoolExecutor(max_workers=1), maximum_concurrent_rpcs=1) hash_name_pb2_grpc.add_HashFinderServicer_to_server( - HashFinder(), server) + HashFinder(maximum_hashes), server) address = '{}:{}'.format(_SERVER_HOST, port) server.add_insecure_port(address) server.start() @@ -188,8 +217,14 @@ def main(): default=50051, nargs='?', help='The port on which the server will listen.') + parser.add_argument( + '--maximum-hashes', + type=int, + default=10000, + nargs='?', + help='The maximum number of hashes to search before cancelling.') args = parser.parse_args() - _run_server(args.port) + _run_server(args.port, args.maximum_hashes) if __name__ == "__main__":