From 32944fdeb2ad7d4a480d347f5b08c07a7f7797f3 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Mon, 24 Jun 2019 13:56:56 -0700 Subject: [PATCH 01/31] Lay out bones of example --- examples/python/cancellation/BUILD.bazel | 64 ++++++++++ examples/python/cancellation/README.md | 0 examples/python/cancellation/client.py | 41 ++++++ examples/python/cancellation/hash_name.proto | 32 +++++ examples/python/cancellation/server.py | 117 ++++++++++++++++++ .../test/_cancellation_example_test.py | 0 6 files changed, 254 insertions(+) create mode 100644 examples/python/cancellation/BUILD.bazel create mode 100644 examples/python/cancellation/README.md create mode 100644 examples/python/cancellation/client.py create mode 100644 examples/python/cancellation/hash_name.proto create mode 100644 examples/python/cancellation/server.py create mode 100644 examples/python/cancellation/test/_cancellation_example_test.py diff --git a/examples/python/cancellation/BUILD.bazel b/examples/python/cancellation/BUILD.bazel new file mode 100644 index 00000000000..30bede22f22 --- /dev/null +++ b/examples/python/cancellation/BUILD.bazel @@ -0,0 +1,64 @@ +# gRPC Bazel BUILD file. +# +# Copyright 2019 The gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load("@grpc_python_dependencies//:requirements.bzl", "requirement") +load("//bazel:python_rules.bzl", "py_proto_library") + +proto_library( + name = "hash_name_proto", + srcs = ["hash_name.proto"] +) + +py_proto_library( + name = "hash_name_proto_pb2", + deps = [":hash_name_proto"], + well_known_protos = False, +) + +py_binary( + name = "client", + testonly = 1, + srcs = ["client.py"], + deps = [ + "//src/python/grpcio/grpc:grpcio", + ":hash_name_proto_pb2", + ], + srcs_version = "PY2AND3", +) + +py_binary( + name = "server", + testonly = 1, + srcs = ["server.py"], + deps = [ + "//src/python/grpcio/grpc:grpcio", + ":hash_name_proto_pb2" + ] + select({ + "//conditions:default": [requirement("futures")], + "//:python3": [], + }), + srcs_version = "PY2AND3", +) + +py_test( + name = "test/_cancellation_example_test", + srcs = ["test/_cancellation_example_test.py"], + data = [ + ":client", + ":server" + ], + size = "small", +) diff --git a/examples/python/cancellation/README.md b/examples/python/cancellation/README.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py new file mode 100644 index 00000000000..b76ad0eabb5 --- /dev/null +++ b/examples/python/cancellation/client.py @@ -0,0 +1,41 @@ +# Copyright the 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""An example of cancelling requests in gRPC.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from concurrent import futures +import logging +import time + +import grpc + +from examples.python.cancellation import hash_name_pb2 +from examples.python.cancellation import hash_name_pb2_grpc + +_LOGGER = logging.getLogger(__name__) + +def main(): + # TODO(rbellevi): Fix the connaissance of target. + with grpc.insecure_channel('localhost:50051') as channel: + stub = hash_name_pb2_grpc.HashFinderStub(channel) + response = stub.Find(hash_name_pb2.HashNameRequest(desired_name="doctor", + maximum_hamming_distance=0)) + print(response) + +if __name__ == "__main__": + logging.basicConfig() + main() diff --git a/examples/python/cancellation/hash_name.proto b/examples/python/cancellation/hash_name.proto new file mode 100644 index 00000000000..b56d0f27a1e --- /dev/null +++ b/examples/python/cancellation/hash_name.proto @@ -0,0 +1,32 @@ +// Copyright 2019 the gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package hash_name; + +message HashNameRequest { + string desired_name = 1; + int32 maximum_hamming_distance = 2; +} + +message HashNameResponse { + string secret = 1; + string hashed_name = 2; + int32 hamming_distance = 3; +} + +service HashFinder { + rpc Find (HashNameRequest) returns (HashNameResponse) {} +} diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py new file mode 100644 index 00000000000..6aa3be4d3f2 --- /dev/null +++ b/examples/python/cancellation/server.py @@ -0,0 +1,117 @@ +# Copyright the 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""An example of cancelling requests in gRPC.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from concurrent import futures +from collections import deque +import base64 +import logging +import hashlib +import struct +import time + +import grpc + +from examples.python.cancellation import hash_name_pb2 +from examples.python.cancellation import hash_name_pb2_grpc + + +_LOGGER = logging.getLogger(__name__) +_SERVER_HOST = 'localhost' +_ONE_DAY_IN_SECONDS = 60 * 60 * 24 + + +def _get_hamming_distance(a, b): + """Calculates hamming distance between strings of equal length.""" + assert len(a) == len(b), "'{}', '{}'".format(a, b) + distance = 0 + for char_a, char_b in zip(a, b): + if char_a.lower() != char_b.lower(): + distance += 1 + return distance + + +def _get_substring_hamming_distance(candidate, target): + """Calculates the minimum hamming distance between between the target + and any substring of the candidate. + + Args: + candidate: The string whose substrings will be tested. + target: The target string. + + Returns: + The minimum Hamming distance between candidate and target. + """ + assert len(target) <= len(candidate) + assert len(candidate) != 0 + min_distance = None + for i in range(len(candidate) - len(target) + 1): + distance = _get_hamming_distance(candidate[i:i+len(target)], target) + if min_distance is None or distance < min_distance: + min_distance = distance + return min_distance + + +def _get_hash(secret): + hasher = hashlib.sha256() + hasher.update(secret) + return base64.b64encode(hasher.digest()) + + +class HashFinder(hash_name_pb2_grpc.HashFinderServicer): + + # TODO(rbellevi): Make this use less memory. + def Find(self, request, context): + to_check = deque((i,) for i in range(256)) + count = 0 + while True: + if count % 1000 == 0: + logging.info("Checked {} hashes.".format(count)) + current = to_check.popleft() + for i in range(256): + to_check.append(current + (i,)) + secret = b''.join(struct.pack('B', i) for i in current) + hash = _get_hash(secret) + distance = _get_substring_hamming_distance(hash, request.desired_name) + if distance <= request.maximum_hamming_distance: + return hash_name_pb2.HashNameResponse(secret=base64.b64encode(secret), + hashed_name=hash, + hamming_distance=distance) + count += 1 + + + +def main(): + port = 50051 + server = grpc.server(futures.ThreadPoolExecutor()) + hash_name_pb2_grpc.add_HashFinderServicer_to_server( + HashFinder(), server) + address = '{}:{}'.format(_SERVER_HOST, port) + server.add_insecure_port(address) + server.start() + print("Server listening at '{}'".format(address)) + try: + while True: + time.sleep(_ONE_DAY_IN_SECONDS) + except KeyboardInterrupt: + server.stop(None) + pass + +if __name__ == "__main__": + logging.basicConfig() + main() diff --git a/examples/python/cancellation/test/_cancellation_example_test.py b/examples/python/cancellation/test/_cancellation_example_test.py new file mode 100644 index 00000000000..e69de29bb2d From 335e655a7865f347efa92ed747363723c70dd299 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Mon, 24 Jun 2019 15:51:16 -0700 Subject: [PATCH 02/31] Free up server thread upon cancellation --- examples/python/cancellation/README.md | 9 +++ examples/python/cancellation/client.py | 27 +++++++- examples/python/cancellation/server.py | 91 +++++++++++++++++++------- 3 files changed, 99 insertions(+), 28 deletions(-) diff --git a/examples/python/cancellation/README.md b/examples/python/cancellation/README.md index e69de29bb2d..af0c8592625 100644 --- a/examples/python/cancellation/README.md +++ b/examples/python/cancellation/README.md @@ -0,0 +1,9 @@ +### Cancelling RPCs + +RPCs may be cancelled by both the client and the server. + +#### Cancellation on the Client Side + + + +#### Cancellation on the Server Side diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index b76ad0eabb5..599a774fefe 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -18,6 +18,7 @@ from __future__ import division from __future__ import print_function from concurrent import futures +import datetime import logging import time @@ -28,13 +29,33 @@ from examples.python.cancellation import hash_name_pb2_grpc _LOGGER = logging.getLogger(__name__) +# Interface: +# Cancel after we have n matches or we have an exact match. + + +# Test whether cancelling cancels a long-running unary RPC (I doubt it). +# Start the server with a single thread. +# Start a request and cancel it soon after. +# Start another request. If it succesfully cancelled, this will block forever. +# Add a bunch of logging so we know what's happening. + def main(): # TODO(rbellevi): Fix the connaissance of target. with grpc.insecure_channel('localhost:50051') as channel: stub = hash_name_pb2_grpc.HashFinderStub(channel) - response = stub.Find(hash_name_pb2.HashNameRequest(desired_name="doctor", - maximum_hamming_distance=0)) - print(response) + while True: + print("Sending request") + future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name="doctor", + maximum_hamming_distance=0)) + # TODO(rbellevi): Do not leave in a cancellation based on timeout. + # That's best handled by, well.. timeout. + try: + result = future.result(timeout=2.0) + print("Got response: \n{}".format(response)) + except grpc.FutureTimeoutError: + print("Cancelling request") + future.cancel() + if __name__ == "__main__": logging.basicConfig() diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 6aa3be4d3f2..badf5698b4a 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -19,11 +19,13 @@ from __future__ import print_function from concurrent import futures from collections import deque +import argparse import base64 import logging import hashlib import struct import time +import threading import grpc @@ -31,10 +33,14 @@ from examples.python.cancellation import hash_name_pb2 from examples.python.cancellation import hash_name_pb2_grpc +_BYTE_MAX = 255 + _LOGGER = logging.getLogger(__name__) _SERVER_HOST = 'localhost' _ONE_DAY_IN_SECONDS = 60 * 60 * 24 +_DESCRIPTION = "A server for finding hashes similar to names." + def _get_hamming_distance(a, b): """Calculates hamming distance between strings of equal length.""" @@ -68,37 +74,61 @@ def _get_substring_hamming_distance(candidate, target): def _get_hash(secret): - hasher = hashlib.sha256() + hasher = hashlib.sha1() hasher.update(secret) return base64.b64encode(hasher.digest()) -class HashFinder(hash_name_pb2_grpc.HashFinderServicer): - - # TODO(rbellevi): Make this use less memory. - def Find(self, request, context): - to_check = deque((i,) for i in range(256)) - count = 0 - while True: - if count % 1000 == 0: - logging.info("Checked {} hashes.".format(count)) - current = to_check.popleft() - for i in range(256): - to_check.append(current + (i,)) - secret = b''.join(struct.pack('B', i) for i in current) - hash = _get_hash(secret) - distance = _get_substring_hamming_distance(hash, request.desired_name) - if distance <= request.maximum_hamming_distance: - return hash_name_pb2.HashNameResponse(secret=base64.b64encode(secret), - hashed_name=hash, - hamming_distance=distance) - count += 1 +def _find_secret_of_length(target, maximum_distance, length, stop_event): + digits = [0] * length + while True: + if stop_event.is_set(): + return hash_name_pb2.HashNameResponse() + secret = b''.join(struct.pack('B', i) for i in digits) + hash = _get_hash(secret) + distance = _get_substring_hamming_distance(hash, target) + if distance <= maximum_distance: + return hash_name_pb2.HashNameResponse(secret=base64.b64encode(secret), + hashed_name=hash, + hamming_distance=distance) + digits[-1] += 1 + i = length - 1 + while digits[i] == _BYTE_MAX + 1: + digits[i] = 0 + i -= 1 + if i == -1: + return None + else: + digits[i] += 1 + + +def _find_secret(target, maximum_distance, stop_event): + length = 1 + while True: + print("Checking strings of length {}.".format(length)) + match = _find_secret_of_length(target, maximum_distance, length, stop_event) + if match is not None: + return match + if stop_event.is_set(): + return hash_name_pb2.HashNameResponse() + length += 1 +class HashFinder(hash_name_pb2_grpc.HashFinderServicer): -def main(): - port = 50051 - server = grpc.server(futures.ThreadPoolExecutor()) + def Find(self, request, context): + stop_event = threading.Event() + def on_rpc_done(): + stop_event.set() + context.add_callback(on_rpc_done) + print("Received request:\n{}".format(request)) + result = _find_secret(request.desired_name, request.maximum_hamming_distance, stop_event) + print("Returning result:\n{}".format(result)) + return result + + +def _run_server(port): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) hash_name_pb2_grpc.add_HashFinderServicer_to_server( HashFinder(), server) address = '{}:{}'.format(_SERVER_HOST, port) @@ -110,7 +140,18 @@ def main(): time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: server.stop(None) - pass + + +def main(): + parser = argparse.ArgumentParser(description=_DESCRIPTION) + parser.add_argument( + '--port', + type=int, + default=50051, + nargs='?', + help='The port on which the server will listen.') + args = parser.parse_args() + _run_server(args.port) if __name__ == "__main__": logging.basicConfig() From 7dccc07c2ac7e54cd44ed1e81cdd2b6daf944f62 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Mon, 24 Jun 2019 16:02:34 -0700 Subject: [PATCH 03/31] Start writing README --- examples/python/cancellation/README.md | 31 +++++++++++++++++++++++++- examples/python/cancellation/client.py | 6 ----- examples/python/cancellation/server.py | 2 -- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/examples/python/cancellation/README.md b/examples/python/cancellation/README.md index af0c8592625..8d3e4767b9c 100644 --- a/examples/python/cancellation/README.md +++ b/examples/python/cancellation/README.md @@ -4,6 +4,35 @@ RPCs may be cancelled by both the client and the server. #### Cancellation on the Client Side - +A client may cancel an RPC for several reasons. Perhaps the data it requested +has been made irrelevant. Perhaps you, as the client, want to be a good citizen +of the server and are conserving compute resources. #### Cancellation on the Server Side + +A server is reponsible for cancellation in two ways. It must respond in some way +when a client initiates a cancellation, otherwise long-running computations +could continue indefinitely. + +It may also decide to cancel the RPC for its own reasons. In our example, the +server can be configured to cancel an RPC after a certain number of hashes has +been computed in order to conserve compute resources. + +##### Responding to Cancellations from a Servicer Thread + +It's important to remember that a gRPC Python server is backed by a thread pool +with a fixed size. When an RPC is cancelled, the library does *not* terminate +your servicer thread. It is your responsibility as the application author to +ensure that your servicer thread terminates soon after the RPC has been +cancelled. + +In this example, we use the `ServicerContext.add_callback` method to set a +`threading.Event` object when the RPC is terminated. We pass this `Event` object +down through our hashing algorithm and ensure to check that the RPC is still +ongoing before each iteration. + + +##### Initiating a Cancellation from a Servicer + +Initiating a cancellation from the server side is simpler. Just call +`ServicerContext.cancel()`. diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index 599a774fefe..93673ad8cc9 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -33,12 +33,6 @@ _LOGGER = logging.getLogger(__name__) # Cancel after we have n matches or we have an exact match. -# Test whether cancelling cancels a long-running unary RPC (I doubt it). -# Start the server with a single thread. -# Start a request and cancel it soon after. -# Start another request. If it succesfully cancelled, this will block forever. -# Add a bunch of logging so we know what's happening. - def main(): # TODO(rbellevi): Fix the connaissance of target. with grpc.insecure_channel('localhost:50051') as channel: diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index badf5698b4a..6af36809348 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -121,9 +121,7 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): def on_rpc_done(): stop_event.set() context.add_callback(on_rpc_done) - print("Received request:\n{}".format(request)) result = _find_secret(request.desired_name, request.maximum_hamming_distance, stop_event) - print("Returning result:\n{}".format(result)) return result From b31431aea3e0caac3b3741d077e7b4bd692f05c9 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Mon, 24 Jun 2019 16:49:35 -0700 Subject: [PATCH 04/31] Switch over to a generator --- examples/python/cancellation/client.py | 6 +- examples/python/cancellation/hash_name.proto | 4 +- examples/python/cancellation/server.py | 61 +++++++++++++++----- 3 files changed, 54 insertions(+), 17 deletions(-) diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index 93673ad8cc9..bd5296a544f 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -40,12 +40,12 @@ def main(): while True: print("Sending request") future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name="doctor", - maximum_hamming_distance=0)) + ideal_hamming_distance=1)) # TODO(rbellevi): Do not leave in a cancellation based on timeout. # That's best handled by, well.. timeout. try: - result = future.result(timeout=2.0) - print("Got response: \n{}".format(response)) + result = future.result(timeout=20.0) + print("Got response: \n{}".format(result)) except grpc.FutureTimeoutError: print("Cancelling request") future.cancel() diff --git a/examples/python/cancellation/hash_name.proto b/examples/python/cancellation/hash_name.proto index b56d0f27a1e..e0a5c8357be 100644 --- a/examples/python/cancellation/hash_name.proto +++ b/examples/python/cancellation/hash_name.proto @@ -18,7 +18,8 @@ package hash_name; message HashNameRequest { string desired_name = 1; - int32 maximum_hamming_distance = 2; + int32 ideal_hamming_distance = 2; + int32 interesting_hamming_distance = 3; } message HashNameResponse { @@ -29,4 +30,5 @@ message HashNameResponse { service HashFinder { rpc Find (HashNameRequest) returns (HashNameResponse) {} + rpc FindRange (HashNameRequest) returns (stream HashNameResponse) {} } diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 6af36809348..334a3770247 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -79,25 +79,39 @@ def _get_hash(secret): return base64.b64encode(hasher.digest()) -def _find_secret_of_length(target, maximum_distance, length, stop_event): +def _find_secret_of_length(target, ideal_distance, length, stop_event, interesting_hamming_distance=None): digits = [0] * length while True: if stop_event.is_set(): - return hash_name_pb2.HashNameResponse() + # Yield a sentinel and stop the generator if the RPC has been + # cancelled. + yield None + raise StopIteration() secret = b''.join(struct.pack('B', i) for i in digits) hash = _get_hash(secret) distance = _get_substring_hamming_distance(hash, target) - if distance <= maximum_distance: - return hash_name_pb2.HashNameResponse(secret=base64.b64encode(secret), + if interesting_hamming_distance is not None and distance <= interesting_hamming_distance: + # Surface interesting candidates, but don't stop. + yield hash_name_pb2.HashNameResponse(secret=base64.b64encode(secret), + hashed_name=hash, + hamming_distance=distance) + elif distance <= ideal_distance: + # Yield the ideal candidate followed by a sentinel to signal the end + # of the stream. + yield hash_name_pb2.HashNameResponse(secret=base64.b64encode(secret), hashed_name=hash, hamming_distance=distance) + yield None + raise StopIteration() digits[-1] += 1 i = length - 1 while digits[i] == _BYTE_MAX + 1: digits[i] = 0 i -= 1 if i == -1: - return None + # Terminate the generator since we've run out of strings of + # `length` bytes. + raise StopIteration() else: digits[i] += 1 @@ -106,11 +120,15 @@ def _find_secret(target, maximum_distance, stop_event): length = 1 while True: print("Checking strings of length {}.".format(length)) - match = _find_secret_of_length(target, maximum_distance, length, stop_event) - if match is not None: - return match - if stop_event.is_set(): - return hash_name_pb2.HashNameResponse() + for candidate in _find_secret_of_length(target, maximum_distance, length, stop_event): + if candidate is not None: + yield candidate + else: + raise StopIteration() + if stop_event.is_set(): + # Terminate the generator if the RPC has been cancelled. + raise StopIteration() + print("Incrementing length") length += 1 @@ -121,12 +139,28 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): def on_rpc_done(): stop_event.set() context.add_callback(on_rpc_done) - result = _find_secret(request.desired_name, request.maximum_hamming_distance, stop_event) - return result + candidates = list(_find_secret(request.desired_name, request.ideal_hamming_distance, stop_event)) + if not candidates: + return hash_name_pb2.HashNameResponse() + return candidates[-1] + + + def FindRange(self, request, context): + stop_event = threading.Event() + def on_rpc_done(): + stop_event.set() + context.add_callback(on_rpc_done) + secret_generator = _find_secret(request.desired_name, + request.ideal_hamming_distance, + stop_event, + interesting_hamming_distance=request.interesting_hamming_distance) + for candidate in secret_generator: + yield candidate def _run_server(port): - server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) + server = grpc.server(futures.ThreadPoolExecutor(max_workers=1), + maximum_concurrent_rpcs=1) hash_name_pb2_grpc.add_HashFinderServicer_to_server( HashFinder(), server) address = '{}:{}'.format(_SERVER_HOST, port) @@ -151,6 +185,7 @@ def main(): args = parser.parse_args() _run_server(args.port) + if __name__ == "__main__": logging.basicConfig() main() From 244279cb3651999a52e8b33f365e2d81819ce77b Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 09:07:46 -0700 Subject: [PATCH 05/31] Add client CLI --- examples/python/cancellation/client.py | 43 +++++++++++++++++++++----- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index bd5296a544f..63891d05842 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -18,6 +18,7 @@ from __future__ import division from __future__ import print_function from concurrent import futures +import argparse import datetime import logging import time @@ -27,20 +28,20 @@ import grpc from examples.python.cancellation import hash_name_pb2 from examples.python.cancellation import hash_name_pb2_grpc +_DESCRIPTION = "A client for finding hashes similar to names." _LOGGER = logging.getLogger(__name__) # Interface: -# Cancel after we have n matches or we have an exact match. +# Cancel on ctrl+c or an ideal candidate. - -def main(): - # TODO(rbellevi): Fix the connaissance of target. - with grpc.insecure_channel('localhost:50051') as channel: +def run_unary_client(server_target, name, ideal_distance): + # TODO(rbellevi): Cancel on ctrl+c + with grpc.insecure_channel(server_target) as channel: stub = hash_name_pb2_grpc.HashFinderStub(channel) while True: print("Sending request") - future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name="doctor", - ideal_hamming_distance=1)) + future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name=name, + ideal_hamming_distance=ideal_distance)) # TODO(rbellevi): Do not leave in a cancellation based on timeout. # That's best handled by, well.. timeout. try: @@ -51,6 +52,34 @@ def main(): future.cancel() +def run_streaming_client(target, name, ideal_distance, interesting_distance): + pass + + +def main(): + parser = argparse.ArgumentParser(description=_DESCRIPTION) + parser.add_argument("name", type=str, help='The desired name.') + parser.add_argument("--ideal-distance", default=0, nargs='?', + type=int, help="The desired Hamming distance.") + parser.add_argument( + '--server', + default='localhost:50051', + type=str, + nargs='?', + help='The host-port pair at which to reach the server.') + parser.add_argument( + '--show-inferior', + default=None, + type=int, + nargs='?', + help='Also show candidates with a Hamming distance less than this value.') + + args = parser.parse_args() + if args.show_inferior is not None: + run_streaming_client(args.server, args.name, args.ideal_distance, args.interesting_distance) + else: + run_unary_client(args.server, args.name, args.ideal_distance) + if __name__ == "__main__": logging.basicConfig() main() From b6a5e94f71917046ae3b0c29825ec986e11876ca Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 09:36:49 -0700 Subject: [PATCH 06/31] Respond to ctrl+c on client side --- examples/python/cancellation/client.py | 27 ++++++++++++++------------ examples/python/cancellation/server.py | 2 ++ 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index 63891d05842..f86a32af175 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -22,6 +22,7 @@ import argparse import datetime import logging import time +import signal import grpc @@ -31,25 +32,27 @@ from examples.python.cancellation import hash_name_pb2_grpc _DESCRIPTION = "A client for finding hashes similar to names." _LOGGER = logging.getLogger(__name__) -# Interface: -# Cancel on ctrl+c or an ideal candidate. +_TIMEOUT_SECONDS = 0.05 def run_unary_client(server_target, name, ideal_distance): - # TODO(rbellevi): Cancel on ctrl+c with grpc.insecure_channel(server_target) as channel: stub = hash_name_pb2_grpc.HashFinderStub(channel) + print("Sending request") + future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name=name, + ideal_hamming_distance=ideal_distance)) + def cancel_request(unused_signum, unused_frame): + print("Cancelling request.") + future.cancel() + signal.signal(signal.SIGINT, cancel_request) while True: - print("Sending request") - future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name=name, - ideal_hamming_distance=ideal_distance)) - # TODO(rbellevi): Do not leave in a cancellation based on timeout. - # That's best handled by, well.. timeout. try: - result = future.result(timeout=20.0) - print("Got response: \n{}".format(result)) + result = future.result(timeout=_TIMEOUT_SECONDS) except grpc.FutureTimeoutError: - print("Cancelling request") - future.cancel() + continue + except grpc.FutureCancelledError: + break + print("Got response: \n{}".format(result)) + break def run_streaming_client(target, name, ideal_distance, interesting_distance): diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 334a3770247..575c2fc8e74 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -137,9 +137,11 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): def Find(self, request, context): stop_event = threading.Event() def on_rpc_done(): + print("Attempting to regain servicer thread.") stop_event.set() context.add_callback(on_rpc_done) candidates = list(_find_secret(request.desired_name, request.ideal_hamming_distance, stop_event)) + print("Servicer thread returning.") if not candidates: return hash_name_pb2.HashNameResponse() return candidates[-1] From c9e83db6bcaf08024f8e00184990a1e72cfe2b82 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 10:34:48 -0700 Subject: [PATCH 07/31] Implement streaming on the client side --- examples/python/cancellation/README.md | 28 +++++++++++++++ examples/python/cancellation/client.py | 50 ++++++++++++++++++++++++-- examples/python/cancellation/server.py | 8 +++-- 3 files changed, 81 insertions(+), 5 deletions(-) diff --git a/examples/python/cancellation/README.md b/examples/python/cancellation/README.md index 8d3e4767b9c..64ffa3cf1c7 100644 --- a/examples/python/cancellation/README.md +++ b/examples/python/cancellation/README.md @@ -8,6 +8,34 @@ A client may cancel an RPC for several reasons. Perhaps the data it requested has been made irrelevant. Perhaps you, as the client, want to be a good citizen of the server and are conserving compute resources. +##### Cancelling a Client-Side Unary RPC + +The default RPC methods on a stub will simply return the result of an RPC. + +```python +>>> stub = hash_name_pb2_grpc.HashFinderStub(channel) +>>> stub.Find(hash_name_pb2.HashNameRequest(desired_name=name)) + +``` + +But you may use the `future()` method to receive an instance of `grpc.Future`. +This interface allows you to wait on a response with a timeout, add a callback +to be executed when the RPC completes, or to cancel the RPC before it has +completed. + +In the example, we use this interface to cancel our in-progress RPC when the +user interrupts the process with ctrl-c. + +```python +stub = hash_name_pb2_grpc.HashFinderStub(channel) +future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name=name)) +def cancel_request(unused_signum, unused_frame): + future.cancel() +signal.signal(signal.SIGINT, cancel_request) +``` + +##### Cancelling a Client-Side Streaming RPC + #### Cancellation on the Server Side A server is reponsible for cancellation in two ways. It must respond in some way diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index f86a32af175..f57867a9ae6 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -23,6 +23,14 @@ import datetime import logging import time import signal +import threading + +try: + from queue import Queue + from queue import Empty as QueueEmpty +except ImportError: + from Queue import Queue + from Queue import Empty as QueueEmpty import grpc @@ -34,6 +42,8 @@ _LOGGER = logging.getLogger(__name__) _TIMEOUT_SECONDS = 0.05 +# TODO(rbellevi): Actually use the logger. + def run_unary_client(server_target, name, ideal_distance): with grpc.insecure_channel(server_target) as channel: stub = hash_name_pb2_grpc.HashFinderStub(channel) @@ -55,9 +65,43 @@ def run_unary_client(server_target, name, ideal_distance): break -def run_streaming_client(target, name, ideal_distance, interesting_distance): - pass +def run_streaming_client(server_target, name, ideal_distance, interesting_distance): + with grpc.insecure_channel(server_target) as channel: + stub = hash_name_pb2_grpc.HashFinderStub(channel) + print("Initiating RPC") + result_generator = stub.FindRange(hash_name_pb2.HashNameRequest(desired_name=name, + ideal_hamming_distance=ideal_distance, + interesting_hamming_distance=interesting_distance)) + def cancel_request(unused_signum, unused_frame): + print("Cancelling request.") + result_generator.cancel() + signal.signal(signal.SIGINT, cancel_request) + result_queue = Queue() + + def iterate_responses(result_generator, result_queue): + try: + for result in result_generator: + print("Result: {}".format(result)) + result_queue.put(result) + except grpc.RpcError as rpc_error: + if rpc_error.code() != grpc.StatusCode.CANCELLED: + result_queue.put(None) + raise rpc_error + # Enqueue a sentinel to signal the end of the stream. + result_queue.put(None) + print("RPC complete") + response_thread = threading.Thread(target=iterate_responses, args=(result_generator, result_queue)) + response_thread.daemon = True + response_thread.start() + while result_generator.running(): + try: + result = result_queue.get(timeout=_TIMEOUT_SECONDS) + except QueueEmpty: + continue + if result is None: + break + print("Got result: {}".format(result)) def main(): parser = argparse.ArgumentParser(description=_DESCRIPTION) @@ -79,7 +123,7 @@ def main(): args = parser.parse_args() if args.show_inferior is not None: - run_streaming_client(args.server, args.name, args.ideal_distance, args.interesting_distance) + run_streaming_client(args.server, args.name, args.ideal_distance, args.show_inferior) else: run_unary_client(args.server, args.name, args.ideal_distance) diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 575c2fc8e74..12dbc5653ff 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -32,6 +32,8 @@ import grpc from examples.python.cancellation import hash_name_pb2 from examples.python.cancellation import hash_name_pb2_grpc +# TODO(rbellevi): Actually use the logger. +# TODO(rbellevi): Enforce per-user quotas with cancellation _BYTE_MAX = 255 @@ -116,11 +118,11 @@ def _find_secret_of_length(target, ideal_distance, length, stop_event, interesti digits[i] += 1 -def _find_secret(target, maximum_distance, stop_event): +def _find_secret(target, maximum_distance, stop_event, interesting_hamming_distance=None): length = 1 while True: print("Checking strings of length {}.".format(length)) - for candidate in _find_secret_of_length(target, maximum_distance, length, stop_event): + for candidate in _find_secret_of_length(target, maximum_distance, length, stop_event, interesting_hamming_distance=interesting_hamming_distance): if candidate is not None: yield candidate else: @@ -150,6 +152,7 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): def FindRange(self, request, context): stop_event = threading.Event() def on_rpc_done(): + print("Attempting to regain servicer thread.") stop_event.set() context.add_callback(on_rpc_done) secret_generator = _find_secret(request.desired_name, @@ -158,6 +161,7 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): interesting_hamming_distance=request.interesting_hamming_distance) for candidate in secret_generator: yield candidate + print("Regained servicer thread.") def _run_server(port): From 4ee154dd53c7ce6ad8bb2e7831af40abac4f545d Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 10:50:04 -0700 Subject: [PATCH 08/31] Elaborate on unary cancellation --- examples/python/cancellation/README.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/examples/python/cancellation/README.md b/examples/python/cancellation/README.md index 64ffa3cf1c7..08aea49e2c9 100644 --- a/examples/python/cancellation/README.md +++ b/examples/python/cancellation/README.md @@ -34,6 +34,29 @@ def cancel_request(unused_signum, unused_frame): signal.signal(signal.SIGINT, cancel_request) ``` +It's also important that you not block indefinitely on the RPC. Otherwise, the +signal handler will never have a chance to run. + +```python +while True: + try: + result = future.result(timeout=_TIMEOUT_SECONDS) + except grpc.FutureTimeoutError: + continue + except grpc.FutureCancelledError: + break + print("Got response: \n{}".format(result)) + break +``` + +Here, we repeatedly block on a result for up to `_TIMEOUT_SECONDS`. Doing so +gives us a chance for the signal handlers to run. In the case that out timeout +was reached, we simply continue on in the loop. In the case that the RPC was +cancelled (by our user's ctrl+c), we break out of the loop cleanly. Finally, if +we received the result of the RPC, we print it out for the user and exit the +loop. + + ##### Cancelling a Client-Side Streaming RPC #### Cancellation on the Server Side From 82aa4068c79ebb3ba37341199b078c9b6f910f8c Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 11:02:10 -0700 Subject: [PATCH 09/31] Elaborate on cancelling streaming RPCs --- examples/python/cancellation/README.md | 64 ++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/examples/python/cancellation/README.md b/examples/python/cancellation/README.md index 08aea49e2c9..e7f6b42106e 100644 --- a/examples/python/cancellation/README.md +++ b/examples/python/cancellation/README.md @@ -8,7 +8,7 @@ A client may cancel an RPC for several reasons. Perhaps the data it requested has been made irrelevant. Perhaps you, as the client, want to be a good citizen of the server and are conserving compute resources. -##### Cancelling a Client-Side Unary RPC +##### Cancelling a Server-Side Unary RPC from the Client The default RPC methods on a stub will simply return the result of an RPC. @@ -50,14 +50,72 @@ while True: ``` Here, we repeatedly block on a result for up to `_TIMEOUT_SECONDS`. Doing so -gives us a chance for the signal handlers to run. In the case that out timeout +gives the signal handlers a chance to run. In the case that our timeout was reached, we simply continue on in the loop. In the case that the RPC was cancelled (by our user's ctrl+c), we break out of the loop cleanly. Finally, if we received the result of the RPC, we print it out for the user and exit the loop. -##### Cancelling a Client-Side Streaming RPC +##### Cancelling a Server-Side Streaming RPC from the Client + +Cancelling a Server-side streaming RPC is even simpler from the perspective of +the gRPC API. The default stub method is already an instance of `grpc.Future`, +so the methods outlined above still apply. It is also a generator, so we may +iterate over it to yield the results of our RPC. + +```python +stub = hash_name_pb2_grpc.HashFinderStub(channel) +result_generator = stub.FindRange(hash_name_pb2.HashNameRequest(desired_name=name)) +def cancel_request(unused_signum, unused_frame): + result_generator.cancel() +signal.signal(signal.SIGINT, cancel_request) +``` + +However, the streaming case is complicated by the fact that there is no way to +propagate a timeout to Python generators. As a result, simply iterating over the +results of the RPC can block indefinitely and the signal handler may never run. +Instead, we iterate over the generator on another thread and retrieve the +results on the main thread with a synchronized `Queue`. + +```python +result_queue = Queue() +def iterate_responses(result_generator, result_queue): + try: + for result in result_generator: + result_queue.put(result) + except grpc.RpcError as rpc_error: + if rpc_error.code() != grpc.StatusCode.CANCELLED: + result_queue.put(None) + raise rpc_error + result_queue.put(None) + print("RPC complete") +response_thread = threading.Thread(target=iterate_responses, args=(result_generator, result_queue)) +response_thread.daemon = True +response_thread.start() +``` + +While this thread iterating over the results may block indefinitely, we can +structure the code running on our main thread in such a way that signal handlers +are guaranteed to be run at least every `_TIMEOUT_SECONDS` seconds. + +```python +while result_generator.running(): + try: + result = result_queue.get(timeout=_TIMEOUT_SECONDS) + except QueueEmpty: + continue + if result is None: + break + print("Got result: {}".format(result)) +``` + +Similarly to the unary example above, we continue in a loop waiting for results, +taking care to block for intervals of `_TIMEOUT_SECONDS` at the longest. +Finally, we use `None` as a sentinel value to signal the end of the stream. + +Using this scheme, our process responds nicely to `SIGINT`s while also +explicitly cancelling its RPCs. #### Cancellation on the Server Side From cdae8ca6ad87c6fb94c7dc5dec7ba49f282b06aa Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 11:11:03 -0700 Subject: [PATCH 10/31] Add intro about algorithm --- examples/python/cancellation/README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/examples/python/cancellation/README.md b/examples/python/cancellation/README.md index e7f6b42106e..c50cc3b34df 100644 --- a/examples/python/cancellation/README.md +++ b/examples/python/cancellation/README.md @@ -2,6 +2,19 @@ RPCs may be cancelled by both the client and the server. +#### The Example + +In the example, we implement a silly algorithm. We search for bytestrings whose +hashes are similar to a given search string. For example, say we're looking for +the string "doctor". Our algorithm may `JrqhZVkTDoctYrUlXDbL6pfYQHU=` or +`RC9/7mlM3ldy4TdoctOc6WzYbO4=`. This is a brute force algorithm, so the server +performing the search must be conscious the resources it allows to each client +and each client must be conscientious of the resources demanded of the server. + +In particular, we ensure that client processes cancel the stream explicitly +before terminating and we ensure the server cancels RPCs that have gone on longer +than a certain number of iterations. + #### Cancellation on the Client Side A client may cancel an RPC for several reasons. Perhaps the data it requested From b9cc2c210f85f6b6a672b3f0be853e26fcfa9ae3 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 11:15:43 -0700 Subject: [PATCH 11/31] Explain how we take care of servicer threads --- examples/python/cancellation/README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/examples/python/cancellation/README.md b/examples/python/cancellation/README.md index c50cc3b34df..7329ed834e8 100644 --- a/examples/python/cancellation/README.md +++ b/examples/python/cancellation/README.md @@ -153,6 +153,14 @@ In this example, we use the `ServicerContext.add_callback` method to set a down through our hashing algorithm and ensure to check that the RPC is still ongoing before each iteration. +```python +stop_event = threading.Event() +def on_rpc_done(): + # Regain servicer thread. + stop_event.set() +context.add_callback(on_rpc_done) +secret = _find_secret(stop_event) +``` ##### Initiating a Cancellation from a Servicer From 4c852bf25f30b618e412c0a1e9dfb2f33cc75478 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 12:50:10 -0700 Subject: [PATCH 12/31] Cancel RPCs after a hash limit has been reached --- examples/python/cancellation/README.md | 2 +- examples/python/cancellation/server.py | 61 ++++++++++++++++++++------ 2 files changed, 49 insertions(+), 14 deletions(-) diff --git a/examples/python/cancellation/README.md b/examples/python/cancellation/README.md index 7329ed834e8..b085c9bc016 100644 --- a/examples/python/cancellation/README.md +++ b/examples/python/cancellation/README.md @@ -162,7 +162,7 @@ context.add_callback(on_rpc_done) secret = _find_secret(stop_event) ``` -##### Initiating a Cancellation from a Servicer +##### Initiating a Cancellation on the Server Side Initiating a cancellation from the server side is simpler. Just call `ServicerContext.cancel()`. diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 12dbc5653ff..3eb5f0bd45b 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -81,13 +81,22 @@ def _get_hash(secret): return base64.b64encode(hasher.digest()) -def _find_secret_of_length(target, ideal_distance, length, stop_event, interesting_hamming_distance=None): +class ResourceLimitExceededError(Exception): + """Signifies the request has exceeded configured limits.""" + +# TODO(rbellevi): Docstring all the things. +# TODO(rbellevi): File issue about indefinite blocking for server-side +# streaming. + + +def _find_secret_of_length(target, ideal_distance, length, stop_event, maximum_hashes, interesting_hamming_distance=None): digits = [0] * length + hashes_computed = 0 while True: if stop_event.is_set(): # Yield a sentinel and stop the generator if the RPC has been # cancelled. - yield None + yield None, hashes_computed raise StopIteration() secret = b''.join(struct.pack('B', i) for i in digits) hash = _get_hash(secret) @@ -96,14 +105,14 @@ def _find_secret_of_length(target, ideal_distance, length, stop_event, interesti # Surface interesting candidates, but don't stop. yield hash_name_pb2.HashNameResponse(secret=base64.b64encode(secret), hashed_name=hash, - hamming_distance=distance) + hamming_distance=distance), hashes_computed elif distance <= ideal_distance: # Yield the ideal candidate followed by a sentinel to signal the end # of the stream. yield hash_name_pb2.HashNameResponse(secret=base64.b64encode(secret), hashed_name=hash, - hamming_distance=distance) - yield None + hamming_distance=distance), hashes_computed + yield None, hashes_computed raise StopIteration() digits[-1] += 1 i = length - 1 @@ -116,13 +125,19 @@ def _find_secret_of_length(target, ideal_distance, length, stop_event, interesti raise StopIteration() else: digits[i] += 1 + hashes_computed += 1 + if hashes_computed == maximum_hashes: + raise ResourceLimitExceededError() -def _find_secret(target, maximum_distance, stop_event, interesting_hamming_distance=None): +def _find_secret(target, maximum_distance, stop_event, maximum_hashes, interesting_hamming_distance=None): length = 1 + total_hashes = 0 while True: print("Checking strings of length {}.".format(length)) - for candidate in _find_secret_of_length(target, maximum_distance, length, stop_event, interesting_hamming_distance=interesting_hamming_distance): + last_hashes_computed = 0 + for candidate, hashes_computed in _find_secret_of_length(target, maximum_distance, length, stop_event, maximum_hashes - total_hashes, interesting_hamming_distance=interesting_hamming_distance): + last_hashes_computed = hashes_computed if candidate is not None: yield candidate else: @@ -130,19 +145,28 @@ def _find_secret(target, maximum_distance, stop_event, interesting_hamming_dista if stop_event.is_set(): # Terminate the generator if the RPC has been cancelled. raise StopIteration() + total_hashes += last_hashes_computed print("Incrementing length") length += 1 class HashFinder(hash_name_pb2_grpc.HashFinderServicer): + def __init__(self, maximum_hashes): + super(HashFinder, self).__init__() + self._maximum_hashes = maximum_hashes + def Find(self, request, context): stop_event = threading.Event() def on_rpc_done(): print("Attempting to regain servicer thread.") stop_event.set() context.add_callback(on_rpc_done) - candidates = list(_find_secret(request.desired_name, request.ideal_hamming_distance, stop_event)) + try: + candidates = list(_find_secret(request.desired_name, request.ideal_hamming_distance, stop_event, self._maximum_hashes)) + except ResourceLimitExceededError: + print("Cancelling RPC due to exhausted resources.") + context.cancel() print("Servicer thread returning.") if not candidates: return hash_name_pb2.HashNameResponse() @@ -158,17 +182,22 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): secret_generator = _find_secret(request.desired_name, request.ideal_hamming_distance, stop_event, + self._maximum_hashes, interesting_hamming_distance=request.interesting_hamming_distance) - for candidate in secret_generator: - yield candidate + try: + for candidate in secret_generator: + yield candidate + except ResourceLimitExceededError: + print("Cancelling RPC due to exhausted resources.") + context.cancel print("Regained servicer thread.") -def _run_server(port): +def _run_server(port, maximum_hashes): server = grpc.server(futures.ThreadPoolExecutor(max_workers=1), maximum_concurrent_rpcs=1) hash_name_pb2_grpc.add_HashFinderServicer_to_server( - HashFinder(), server) + HashFinder(maximum_hashes), server) address = '{}:{}'.format(_SERVER_HOST, port) server.add_insecure_port(address) server.start() @@ -188,8 +217,14 @@ def main(): default=50051, nargs='?', help='The port on which the server will listen.') + parser.add_argument( + '--maximum-hashes', + type=int, + default=10000, + nargs='?', + help='The maximum number of hashes to search before cancelling.') args = parser.parse_args() - _run_server(args.port) + _run_server(args.port, args.maximum_hashes) if __name__ == "__main__": From b12299701df6614c1fd5df86d9c46a5bda3d2d66 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 13:00:09 -0700 Subject: [PATCH 13/31] Clean up logging --- examples/python/cancellation/README.md | 15 +++++++++++++++ examples/python/cancellation/client.py | 10 ++-------- examples/python/cancellation/server.py | 19 +++++++------------ 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/examples/python/cancellation/README.md b/examples/python/cancellation/README.md index b085c9bc016..d5f9a844fcc 100644 --- a/examples/python/cancellation/README.md +++ b/examples/python/cancellation/README.md @@ -166,3 +166,18 @@ secret = _find_secret(stop_event) Initiating a cancellation from the server side is simpler. Just call `ServicerContext.cancel()`. + +In our example, we ensure that no single client is monopolizing the server by +cancelling after a configurable number of hashes have been checked. + +```python +try: + for candidate in secret_generator: + yield candidate +except ResourceLimitExceededError: + print("Cancelling RPC due to exhausted resources.") + context.cancel() +``` + +In this type of situation, you may also consider returning a more specific error +using the [`grpcio-status`](https://pypi.org/project/grpcio-status/) package. diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index f57867a9ae6..f97a8c05e50 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -47,11 +47,9 @@ _TIMEOUT_SECONDS = 0.05 def run_unary_client(server_target, name, ideal_distance): with grpc.insecure_channel(server_target) as channel: stub = hash_name_pb2_grpc.HashFinderStub(channel) - print("Sending request") future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name=name, ideal_hamming_distance=ideal_distance)) def cancel_request(unused_signum, unused_frame): - print("Cancelling request.") future.cancel() signal.signal(signal.SIGINT, cancel_request) while True: @@ -61,19 +59,17 @@ def run_unary_client(server_target, name, ideal_distance): continue except grpc.FutureCancelledError: break - print("Got response: \n{}".format(result)) + print(result) break def run_streaming_client(server_target, name, ideal_distance, interesting_distance): with grpc.insecure_channel(server_target) as channel: stub = hash_name_pb2_grpc.HashFinderStub(channel) - print("Initiating RPC") result_generator = stub.FindRange(hash_name_pb2.HashNameRequest(desired_name=name, ideal_hamming_distance=ideal_distance, interesting_hamming_distance=interesting_distance)) def cancel_request(unused_signum, unused_frame): - print("Cancelling request.") result_generator.cancel() signal.signal(signal.SIGINT, cancel_request) result_queue = Queue() @@ -81,7 +77,6 @@ def run_streaming_client(server_target, name, ideal_distance, interesting_distan def iterate_responses(result_generator, result_queue): try: for result in result_generator: - print("Result: {}".format(result)) result_queue.put(result) except grpc.RpcError as rpc_error: if rpc_error.code() != grpc.StatusCode.CANCELLED: @@ -89,7 +84,6 @@ def run_streaming_client(server_target, name, ideal_distance, interesting_distan raise rpc_error # Enqueue a sentinel to signal the end of the stream. result_queue.put(None) - print("RPC complete") response_thread = threading.Thread(target=iterate_responses, args=(result_generator, result_queue)) response_thread.daemon = True response_thread.start() @@ -101,7 +95,7 @@ def run_streaming_client(server_target, name, ideal_distance, interesting_distan continue if result is None: break - print("Got result: {}".format(result)) + print(result) def main(): parser = argparse.ArgumentParser(description=_DESCRIPTION) diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 3eb5f0bd45b..a2e8e947746 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -32,9 +32,6 @@ import grpc from examples.python.cancellation import hash_name_pb2 from examples.python.cancellation import hash_name_pb2_grpc -# TODO(rbellevi): Actually use the logger. -# TODO(rbellevi): Enforce per-user quotas with cancellation - _BYTE_MAX = 255 _LOGGER = logging.getLogger(__name__) @@ -134,7 +131,6 @@ def _find_secret(target, maximum_distance, stop_event, maximum_hashes, interesti length = 1 total_hashes = 0 while True: - print("Checking strings of length {}.".format(length)) last_hashes_computed = 0 for candidate, hashes_computed in _find_secret_of_length(target, maximum_distance, length, stop_event, maximum_hashes - total_hashes, interesting_hamming_distance=interesting_hamming_distance): last_hashes_computed = hashes_computed @@ -146,7 +142,6 @@ def _find_secret(target, maximum_distance, stop_event, maximum_hashes, interesti # Terminate the generator if the RPC has been cancelled. raise StopIteration() total_hashes += last_hashes_computed - print("Incrementing length") length += 1 @@ -159,15 +154,15 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): def Find(self, request, context): stop_event = threading.Event() def on_rpc_done(): - print("Attempting to regain servicer thread.") + _LOGGER.debug("Attempting to regain servicer thread.") stop_event.set() context.add_callback(on_rpc_done) try: candidates = list(_find_secret(request.desired_name, request.ideal_hamming_distance, stop_event, self._maximum_hashes)) except ResourceLimitExceededError: - print("Cancelling RPC due to exhausted resources.") + _LOGGER.info("Cancelling RPC due to exhausted resources.") context.cancel() - print("Servicer thread returning.") + _LOGGER.debug("Servicer thread returning.") if not candidates: return hash_name_pb2.HashNameResponse() return candidates[-1] @@ -176,7 +171,7 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): def FindRange(self, request, context): stop_event = threading.Event() def on_rpc_done(): - print("Attempting to regain servicer thread.") + _LOGGER.debug("Attempting to regain servicer thread.") stop_event.set() context.add_callback(on_rpc_done) secret_generator = _find_secret(request.desired_name, @@ -188,9 +183,9 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): for candidate in secret_generator: yield candidate except ResourceLimitExceededError: - print("Cancelling RPC due to exhausted resources.") - context.cancel - print("Regained servicer thread.") + _LOGGER.info("Cancelling RPC due to exhausted resources.") + context.cancel() + _LOGGER.debug("Regained servicer thread.") def _run_server(port, maximum_hashes): From 8f1bfdab55f08cebdf60d5847634ba894904e62e Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 13:01:22 -0700 Subject: [PATCH 14/31] Yapf --- examples/python/cancellation/client.py | 41 +++++++++++----- examples/python/cancellation/server.py | 65 ++++++++++++++++++-------- 2 files changed, 75 insertions(+), 31 deletions(-) diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index f97a8c05e50..288f93d057e 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -44,13 +44,17 @@ _TIMEOUT_SECONDS = 0.05 # TODO(rbellevi): Actually use the logger. + def run_unary_client(server_target, name, ideal_distance): with grpc.insecure_channel(server_target) as channel: stub = hash_name_pb2_grpc.HashFinderStub(channel) - future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name=name, - ideal_hamming_distance=ideal_distance)) + future = stub.Find.future( + hash_name_pb2.HashNameRequest( + desired_name=name, ideal_hamming_distance=ideal_distance)) + def cancel_request(unused_signum, unused_frame): future.cancel() + signal.signal(signal.SIGINT, cancel_request) while True: try: @@ -63,14 +67,19 @@ def run_unary_client(server_target, name, ideal_distance): break -def run_streaming_client(server_target, name, ideal_distance, interesting_distance): +def run_streaming_client(server_target, name, ideal_distance, + interesting_distance): with grpc.insecure_channel(server_target) as channel: stub = hash_name_pb2_grpc.HashFinderStub(channel) - result_generator = stub.FindRange(hash_name_pb2.HashNameRequest(desired_name=name, - ideal_hamming_distance=ideal_distance, - interesting_hamming_distance=interesting_distance)) + result_generator = stub.FindRange( + hash_name_pb2.HashNameRequest( + desired_name=name, + ideal_hamming_distance=ideal_distance, + interesting_hamming_distance=interesting_distance)) + def cancel_request(unused_signum, unused_frame): result_generator.cancel() + signal.signal(signal.SIGINT, cancel_request) result_queue = Queue() @@ -84,7 +93,9 @@ def run_streaming_client(server_target, name, ideal_distance, interesting_distan raise rpc_error # Enqueue a sentinel to signal the end of the stream. result_queue.put(None) - response_thread = threading.Thread(target=iterate_responses, args=(result_generator, result_queue)) + + response_thread = threading.Thread( + target=iterate_responses, args=(result_generator, result_queue)) response_thread.daemon = True response_thread.start() @@ -97,11 +108,16 @@ def run_streaming_client(server_target, name, ideal_distance, interesting_distan break print(result) + def main(): parser = argparse.ArgumentParser(description=_DESCRIPTION) parser.add_argument("name", type=str, help='The desired name.') - parser.add_argument("--ideal-distance", default=0, nargs='?', - type=int, help="The desired Hamming distance.") + parser.add_argument( + "--ideal-distance", + default=0, + nargs='?', + type=int, + help="The desired Hamming distance.") parser.add_argument( '--server', default='localhost:50051', @@ -113,14 +129,17 @@ def main(): default=None, type=int, nargs='?', - help='Also show candidates with a Hamming distance less than this value.') + help='Also show candidates with a Hamming distance less than this value.' + ) args = parser.parse_args() if args.show_inferior is not None: - run_streaming_client(args.server, args.name, args.ideal_distance, args.show_inferior) + run_streaming_client(args.server, args.name, args.ideal_distance, + args.show_inferior) else: run_unary_client(args.server, args.name, args.ideal_distance) + if __name__ == "__main__": logging.basicConfig() main() diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index a2e8e947746..1af07e4a1e4 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -66,7 +66,7 @@ def _get_substring_hamming_distance(candidate, target): assert len(candidate) != 0 min_distance = None for i in range(len(candidate) - len(target) + 1): - distance = _get_hamming_distance(candidate[i:i+len(target)], target) + distance = _get_hamming_distance(candidate[i:i + len(target)], target) if min_distance is None or distance < min_distance: min_distance = distance return min_distance @@ -81,12 +81,18 @@ def _get_hash(secret): class ResourceLimitExceededError(Exception): """Signifies the request has exceeded configured limits.""" + # TODO(rbellevi): Docstring all the things. # TODO(rbellevi): File issue about indefinite blocking for server-side # streaming. -def _find_secret_of_length(target, ideal_distance, length, stop_event, maximum_hashes, interesting_hamming_distance=None): +def _find_secret_of_length(target, + ideal_distance, + length, + stop_event, + maximum_hashes, + interesting_hamming_distance=None): digits = [0] * length hashes_computed = 0 while True: @@ -100,15 +106,17 @@ def _find_secret_of_length(target, ideal_distance, length, stop_event, maximum_h distance = _get_substring_hamming_distance(hash, target) if interesting_hamming_distance is not None and distance <= interesting_hamming_distance: # Surface interesting candidates, but don't stop. - yield hash_name_pb2.HashNameResponse(secret=base64.b64encode(secret), - hashed_name=hash, - hamming_distance=distance), hashes_computed + yield hash_name_pb2.HashNameResponse( + secret=base64.b64encode(secret), + hashed_name=hash, + hamming_distance=distance), hashes_computed elif distance <= ideal_distance: # Yield the ideal candidate followed by a sentinel to signal the end # of the stream. - yield hash_name_pb2.HashNameResponse(secret=base64.b64encode(secret), - hashed_name=hash, - hamming_distance=distance), hashes_computed + yield hash_name_pb2.HashNameResponse( + secret=base64.b64encode(secret), + hashed_name=hash, + hamming_distance=distance), hashes_computed yield None, hashes_computed raise StopIteration() digits[-1] += 1 @@ -127,12 +135,22 @@ def _find_secret_of_length(target, ideal_distance, length, stop_event, maximum_h raise ResourceLimitExceededError() -def _find_secret(target, maximum_distance, stop_event, maximum_hashes, interesting_hamming_distance=None): +def _find_secret(target, + maximum_distance, + stop_event, + maximum_hashes, + interesting_hamming_distance=None): length = 1 total_hashes = 0 while True: last_hashes_computed = 0 - for candidate, hashes_computed in _find_secret_of_length(target, maximum_distance, length, stop_event, maximum_hashes - total_hashes, interesting_hamming_distance=interesting_hamming_distance): + for candidate, hashes_computed in _find_secret_of_length( + target, + maximum_distance, + length, + stop_event, + maximum_hashes - total_hashes, + interesting_hamming_distance=interesting_hamming_distance): last_hashes_computed = hashes_computed if candidate is not None: yield candidate @@ -153,12 +171,17 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): def Find(self, request, context): stop_event = threading.Event() + def on_rpc_done(): _LOGGER.debug("Attempting to regain servicer thread.") stop_event.set() + context.add_callback(on_rpc_done) try: - candidates = list(_find_secret(request.desired_name, request.ideal_hamming_distance, stop_event, self._maximum_hashes)) + candidates = list( + _find_secret(request.desired_name, + request.ideal_hamming_distance, stop_event, + self._maximum_hashes)) except ResourceLimitExceededError: _LOGGER.info("Cancelling RPC due to exhausted resources.") context.cancel() @@ -167,18 +190,20 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): return hash_name_pb2.HashNameResponse() return candidates[-1] - def FindRange(self, request, context): stop_event = threading.Event() + def on_rpc_done(): _LOGGER.debug("Attempting to regain servicer thread.") stop_event.set() + context.add_callback(on_rpc_done) - secret_generator = _find_secret(request.desired_name, - request.ideal_hamming_distance, - stop_event, - self._maximum_hashes, - interesting_hamming_distance=request.interesting_hamming_distance) + secret_generator = _find_secret( + request.desired_name, + request.ideal_hamming_distance, + stop_event, + self._maximum_hashes, + interesting_hamming_distance=request.interesting_hamming_distance) try: for candidate in secret_generator: yield candidate @@ -189,10 +214,10 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): def _run_server(port, maximum_hashes): - server = grpc.server(futures.ThreadPoolExecutor(max_workers=1), - maximum_concurrent_rpcs=1) + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), maximum_concurrent_rpcs=1) hash_name_pb2_grpc.add_HashFinderServicer_to_server( - HashFinder(maximum_hashes), server) + HashFinder(maximum_hashes), server) address = '{}:{}'.format(_SERVER_HOST, port) server.add_insecure_port(address) server.start() From dc8dba8afeb527f8118d9aca11ad27a5493f1966 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 13:19:02 -0700 Subject: [PATCH 15/31] Add docstrings --- examples/python/cancellation/client.py | 2 - examples/python/cancellation/server.py | 61 +++++++++++++++++++++++--- 2 files changed, 55 insertions(+), 8 deletions(-) diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index 288f93d057e..c3bc226be2c 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -42,8 +42,6 @@ _LOGGER = logging.getLogger(__name__) _TIMEOUT_SECONDS = 0.05 -# TODO(rbellevi): Actually use the logger. - def run_unary_client(server_target, name, ideal_distance): with grpc.insecure_channel(server_target) as channel: diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 1af07e4a1e4..44b8ee26139 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -21,6 +21,7 @@ from concurrent import futures from collections import deque import argparse import base64 +import contextlib import logging import hashlib import struct @@ -82,7 +83,6 @@ class ResourceLimitExceededError(Exception): """Signifies the request has exceeded configured limits.""" -# TODO(rbellevi): Docstring all the things. # TODO(rbellevi): File issue about indefinite blocking for server-side # streaming. @@ -93,6 +93,28 @@ def _find_secret_of_length(target, stop_event, maximum_hashes, interesting_hamming_distance=None): + """Find a candidate with the given length. + + Args: + target: The search string. + ideal_distance: The desired Hamming distance. + length: The length of secret string to search for. + stop_event: An event indicating whether the RPC should terminate. + maximum_hashes: The maximum number of hashes to check before stopping. + interesting_hamming_distance: If specified, strings with a Hamming + distance from the target below this value will be yielded. + + Yields: + A stream of tuples of type Tuple[Optional[HashNameResponse], int]. The + element of the tuple, if specified, signifies an ideal or interesting + candidate. If this element is None, it signifies that the stream has + ended because an ideal candidate has been found. The second element is + the number of hashes computed up this point. + + Raises: + ResourceLimitExceededError: If the computation exceeds `maximum_hashes` + iterations. + """ digits = [0] * length hashes_computed = 0 while True: @@ -140,6 +162,29 @@ def _find_secret(target, stop_event, maximum_hashes, interesting_hamming_distance=None): + """Find candidate strings. + + Search through the space of all bytestrings, in order of increasing length, + indefinitely, until a hash with a Hamming distance of `maximum_distance` or + less has been found. + + Args: + target: The search string. + maximum_distance: The desired Hamming distance. + stop_event: An event indicating whether the RPC should terminate. + maximum_hashes: The maximum number of hashes to check before stopping. + interesting_hamming_distance: If specified, strings with a Hamming + distance from the target below this value will be yielded. + + Yields: + Instances of HashNameResponse. The final entry in the stream will be of + `maximum_distance` Hamming distance or less from the target string, + while all others will be of less than `interesting_hamming_distance`. + + Raises: + ResourceLimitExceededError: If the computation exceeds `maximum_hashes` + iterations. + """ length = 1 total_hashes = 0 while True: @@ -213,19 +258,21 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): _LOGGER.debug("Regained servicer thread.") -def _run_server(port, maximum_hashes): +@contextlib.contextmanager +def _running_server(port, maximum_hashes): server = grpc.server( futures.ThreadPoolExecutor(max_workers=1), maximum_concurrent_rpcs=1) hash_name_pb2_grpc.add_HashFinderServicer_to_server( HashFinder(maximum_hashes), server) address = '{}:{}'.format(_SERVER_HOST, port) - server.add_insecure_port(address) + actual_port = server.add_insecure_port(address) server.start() print("Server listening at '{}'".format(address)) try: - while True: - time.sleep(_ONE_DAY_IN_SECONDS) + yield actual_port except KeyboardInterrupt: + pass + finally: server.stop(None) @@ -244,7 +291,9 @@ def main(): nargs='?', help='The maximum number of hashes to search before cancelling.') args = parser.parse_args() - _run_server(args.port, args.maximum_hashes) + with _running_server(args.port, args.maximum_hashes): + while True: + time.sleep(_ONE_DAY_IN_SECONDS) if __name__ == "__main__": From 93d6344ac60ccb5bb6aec5f12be42c0ba581dcfc Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 13:25:54 -0700 Subject: [PATCH 16/31] Add todo --- examples/python/cancellation/client.py | 2 ++ examples/python/cancellation/server.py | 4 ---- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index c3bc226be2c..20b76784622 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -92,6 +92,8 @@ def run_streaming_client(server_target, name, ideal_distance, # Enqueue a sentinel to signal the end of the stream. result_queue.put(None) + # TODO(https://github.com/grpc/grpc/issues/19464): Do everything on the + # main thread. response_thread = threading.Thread( target=iterate_responses, args=(result_generator, result_queue)) response_thread.daemon = True diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 44b8ee26139..a316a7c2c60 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -83,10 +83,6 @@ class ResourceLimitExceededError(Exception): """Signifies the request has exceeded configured limits.""" -# TODO(rbellevi): File issue about indefinite blocking for server-side -# streaming. - - def _find_secret_of_length(target, ideal_distance, length, From 786a3acab0f37ce539f4ed5000f3dc388c35b0a5 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 14:20:07 -0700 Subject: [PATCH 17/31] Add test --- examples/python/cancellation/client.py | 10 ++- examples/python/cancellation/server.py | 1 + .../test/_cancellation_example_test.py | 87 +++++++++++++++++++ 3 files changed, 96 insertions(+), 2 deletions(-) diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index 20b76784622..0621a05d643 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -48,7 +48,8 @@ def run_unary_client(server_target, name, ideal_distance): stub = hash_name_pb2_grpc.HashFinderStub(channel) future = stub.Find.future( hash_name_pb2.HashNameRequest( - desired_name=name, ideal_hamming_distance=ideal_distance)) + desired_name=name, ideal_hamming_distance=ideal_distance), + wait_for_ready=True) def cancel_request(unused_signum, unused_frame): future.cancel() @@ -61,6 +62,10 @@ def run_unary_client(server_target, name, ideal_distance): continue except grpc.FutureCancelledError: break + except grpc.RpcError as rpc_error: + if rpc_error.code() == grpc.StatusCode.CANCELLED: + break + raise rpc_error print(result) break @@ -73,7 +78,8 @@ def run_streaming_client(server_target, name, ideal_distance, hash_name_pb2.HashNameRequest( desired_name=name, ideal_hamming_distance=ideal_distance, - interesting_hamming_distance=interesting_distance)) + interesting_hamming_distance=interesting_distance), + wait_for_ready=True) def cancel_request(unused_signum, unused_frame): result_generator.cancel() diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index a316a7c2c60..98634727aaa 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -218,6 +218,7 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): stop_event.set() context.add_callback(on_rpc_done) + candidates = [] try: candidates = list( _find_secret(request.desired_name, diff --git a/examples/python/cancellation/test/_cancellation_example_test.py b/examples/python/cancellation/test/_cancellation_example_test.py index e69de29bb2d..45b0ccb400a 100644 --- a/examples/python/cancellation/test/_cancellation_example_test.py +++ b/examples/python/cancellation/test/_cancellation_example_test.py @@ -0,0 +1,87 @@ +# Copyright 2019 the gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test for cancellation example.""" + +import contextlib +import os +import signal +import socket +import subprocess +import unittest + +_BINARY_DIR = os.path.realpath( + os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')) +_SERVER_PATH = os.path.join(_BINARY_DIR, 'server') +_CLIENT_PATH = os.path.join(_BINARY_DIR, 'client') + + +@contextlib.contextmanager +def _get_port(): + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0: + raise RuntimeError("Failed to set SO_REUSEPORT.") + sock.bind(('', 0)) + try: + yield sock.getsockname()[1] + finally: + sock.close() + + +def _start_client(server_port, + desired_string, + ideal_distance, + interesting_distance=None): + interesting_distance_args = () if interesting_distance is None else ( + '--show-inferior', interesting_distance) + return subprocess.Popen((_CLIENT_PATH, desired_string, '--server', + 'localhost:{}'.format(server_port), + '--ideal-distance', + str(ideal_distance)) + interesting_distance_args) + + +class CancellationExampleTest(unittest.TestCase): + + def test_successful_run(self): + with _get_port() as test_port: + server_process = subprocess.Popen((_SERVER_PATH, '--port', + str(test_port))) + try: + client_process = _start_client(test_port, 'aa', 0) + client_return_code = client_process.wait() + self.assertEqual(0, client_return_code) + self.assertIsNone(server_process.poll()) + finally: + server_process.kill() + server_process.wait() + + def test_graceful_sigint(self): + with _get_port() as test_port: + server_process = subprocess.Popen((_SERVER_PATH, '--port', + str(test_port))) + try: + client_process1 = _start_client(test_port, 'aaaaaaaaaa', 0) + client_process1.send_signal(signal.SIGINT) + client_process1.wait() + client_process2 = _start_client(test_port, 'aaaaaaaaaa', 0) + client_return_code = client_process2.wait() + self.assertEqual(0, client_return_code) + self.assertIsNone(server_process.poll()) + finally: + server_process.kill() + server_process.wait() + + +if __name__ == '__main__': + unittest.main(verbosity=2) From edbddf25ab99c86f70c961d78e4f85a40aebc5a2 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 15:49:31 -0700 Subject: [PATCH 18/31] Typos --- examples/python/cancellation/README.md | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/examples/python/cancellation/README.md b/examples/python/cancellation/README.md index d5f9a844fcc..57ea1a30850 100644 --- a/examples/python/cancellation/README.md +++ b/examples/python/cancellation/README.md @@ -1,18 +1,14 @@ -### Cancelling RPCs - -RPCs may be cancelled by both the client and the server. - #### The Example In the example, we implement a silly algorithm. We search for bytestrings whose hashes are similar to a given search string. For example, say we're looking for -the string "doctor". Our algorithm may `JrqhZVkTDoctYrUlXDbL6pfYQHU=` or +the string "doctor". Our algorithm may return `JrqhZVkTDoctYrUlXDbL6pfYQHU=` or `RC9/7mlM3ldy4TdoctOc6WzYbO4=`. This is a brute force algorithm, so the server -performing the search must be conscious the resources it allows to each client -and each client must be conscientious of the resources demanded of the server. +performing the search must be conscious of the resources it allows to each client +and each client must be conscientious of the resources it demands of the server. In particular, we ensure that client processes cancel the stream explicitly -before terminating and we ensure the server cancels RPCs that have gone on longer +before terminating and we ensure that server processes cancel RPCs that have gone on longer than a certain number of iterations. #### Cancellation on the Client Side From 81f42031c638cec53f72a78d17e3524ced4b9ee9 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 15:58:06 -0700 Subject: [PATCH 19/31] Pylint --- examples/python/cancellation/client.py | 3 --- examples/python/cancellation/server.py | 21 ++++++++++----------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index 0621a05d643..9223e7c6ff4 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -17,11 +17,8 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -from concurrent import futures import argparse -import datetime import logging -import time import signal import threading diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 98634727aaa..46e7b88ce51 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -18,7 +18,6 @@ from __future__ import division from __future__ import print_function from concurrent import futures -from collections import deque import argparse import base64 import contextlib @@ -64,7 +63,7 @@ def _get_substring_hamming_distance(candidate, target): The minimum Hamming distance between candidate and target. """ assert len(target) <= len(candidate) - assert len(candidate) != 0 + assert candidate min_distance = None for i in range(len(candidate) - len(target) + 1): distance = _get_hamming_distance(candidate[i:i + len(target)], target) @@ -118,25 +117,25 @@ def _find_secret_of_length(target, # Yield a sentinel and stop the generator if the RPC has been # cancelled. yield None, hashes_computed - raise StopIteration() + raise StopIteration() # pylint: disable=stop-iteration-return secret = b''.join(struct.pack('B', i) for i in digits) - hash = _get_hash(secret) - distance = _get_substring_hamming_distance(hash, target) + candidate_hash = _get_hash(secret) + distance = _get_substring_hamming_distance(candidate_hash, target) if interesting_hamming_distance is not None and distance <= interesting_hamming_distance: # Surface interesting candidates, but don't stop. yield hash_name_pb2.HashNameResponse( secret=base64.b64encode(secret), - hashed_name=hash, + hashed_name=candidate_hash, hamming_distance=distance), hashes_computed elif distance <= ideal_distance: # Yield the ideal candidate followed by a sentinel to signal the end # of the stream. yield hash_name_pb2.HashNameResponse( secret=base64.b64encode(secret), - hashed_name=hash, + hashed_name=candidate_hash, hamming_distance=distance), hashes_computed yield None, hashes_computed - raise StopIteration() + raise StopIteration() # pylint: disable=stop-iteration-return digits[-1] += 1 i = length - 1 while digits[i] == _BYTE_MAX + 1: @@ -145,7 +144,7 @@ def _find_secret_of_length(target, if i == -1: # Terminate the generator since we've run out of strings of # `length` bytes. - raise StopIteration() + raise StopIteration() # pylint: disable=stop-iteration-return else: digits[i] += 1 hashes_computed += 1 @@ -196,10 +195,10 @@ def _find_secret(target, if candidate is not None: yield candidate else: - raise StopIteration() + raise StopIteration() # pylint: disable=stop-iteration-return if stop_event.is_set(): # Terminate the generator if the RPC has been cancelled. - raise StopIteration() + raise StopIteration() # pylint: disable=stop-iteration-return total_hashes += last_hashes_computed length += 1 From fed1c629e06f15ed063a44edcb4ce7717bdec569 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 25 Jun 2019 16:07:49 -0700 Subject: [PATCH 20/31] Make compatible with Python 3 --- examples/python/cancellation/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 46e7b88ce51..767337ea3ea 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -75,7 +75,7 @@ def _get_substring_hamming_distance(candidate, target): def _get_hash(secret): hasher = hashlib.sha1() hasher.update(secret) - return base64.b64encode(hasher.digest()) + return base64.b64encode(hasher.digest()).decode('ascii') class ResourceLimitExceededError(Exception): From 2bf4d502c1943abb93e4349d36035fb1d574157c Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 26 Jun 2019 09:06:07 -0700 Subject: [PATCH 21/31] Factor out simpler generator --- examples/python/cancellation/server.py | 41 +++++++++++++++++--------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 767337ea3ea..cb69e249d12 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -82,6 +82,32 @@ class ResourceLimitExceededError(Exception): """Signifies the request has exceeded configured limits.""" +def _bytestrings_of_length(length): + """Generates a stream containing all bytestrings of a given length. + + Args: + length: A non-negative integer length. + + Yields: + All bytestrings of length `length`. + """ + digits = [0] * length + hashes_computed = 0 + while True: + yield b''.join(struct.pack('B', i) for i in digits) + digits[-1] += 1 + i = length - 1 + while digits[i] == _BYTE_MAX + 1: + digits[i] = 0 + i -= 1 + if i == -1: + # Terminate the generator since we've run out of strings of + # `length` bytes. + raise StopIteration() # pylint: disable=stop-iteration-return + else: + digits[i] += 1 + + def _find_secret_of_length(target, ideal_distance, length, @@ -110,15 +136,13 @@ def _find_secret_of_length(target, ResourceLimitExceededError: If the computation exceeds `maximum_hashes` iterations. """ - digits = [0] * length hashes_computed = 0 - while True: + for secret in _bytestrings_of_length(length): if stop_event.is_set(): # Yield a sentinel and stop the generator if the RPC has been # cancelled. yield None, hashes_computed raise StopIteration() # pylint: disable=stop-iteration-return - secret = b''.join(struct.pack('B', i) for i in digits) candidate_hash = _get_hash(secret) distance = _get_substring_hamming_distance(candidate_hash, target) if interesting_hamming_distance is not None and distance <= interesting_hamming_distance: @@ -136,17 +160,6 @@ def _find_secret_of_length(target, hamming_distance=distance), hashes_computed yield None, hashes_computed raise StopIteration() # pylint: disable=stop-iteration-return - digits[-1] += 1 - i = length - 1 - while digits[i] == _BYTE_MAX + 1: - digits[i] = 0 - i -= 1 - if i == -1: - # Terminate the generator since we've run out of strings of - # `length` bytes. - raise StopIteration() # pylint: disable=stop-iteration-return - else: - digits[i] += 1 hashes_computed += 1 if hashes_computed == maximum_hashes: raise ResourceLimitExceededError() From 42b2fe154a4f2320c38206705ba9a7e90739ddab Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 26 Jun 2019 09:21:29 -0700 Subject: [PATCH 22/31] Simplify search implementation --- examples/python/cancellation/server.py | 108 ++++++++----------------- 1 file changed, 33 insertions(+), 75 deletions(-) diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index cb69e249d12..5d72b003dc6 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -86,13 +86,12 @@ def _bytestrings_of_length(length): """Generates a stream containing all bytestrings of a given length. Args: - length: A non-negative integer length. + length: A positive integer length. Yields: All bytestrings of length `length`. """ digits = [0] * length - hashes_computed = 0 while True: yield b''.join(struct.pack('B', i) for i in digits) digits[-1] += 1 @@ -108,40 +107,52 @@ def _bytestrings_of_length(length): digits[i] += 1 -def _find_secret_of_length(target, - ideal_distance, - length, - stop_event, - maximum_hashes, - interesting_hamming_distance=None): - """Find a candidate with the given length. +def _all_bytestrings(): + """Generates a stream containing all possible bytestrings. + + This generator does not terminate. + + Yields: + All bytestrings in ascending order of length. + """ + length = 1 + while True: + for bytestring in _bytestrings_of_length(length): + yield bytestring + length += 1 + + +def _find_secret(target, + ideal_distance, + stop_event, + maximum_hashes, + interesting_hamming_distance=None): + """Find candidate strings. + + Search through the space of all bytestrings, in order of increasing length, + indefinitely, until a hash with a Hamming distance of `maximum_distance` or + less has been found. Args: target: The search string. ideal_distance: The desired Hamming distance. - length: The length of secret string to search for. stop_event: An event indicating whether the RPC should terminate. maximum_hashes: The maximum number of hashes to check before stopping. interesting_hamming_distance: If specified, strings with a Hamming distance from the target below this value will be yielded. Yields: - A stream of tuples of type Tuple[Optional[HashNameResponse], int]. The - element of the tuple, if specified, signifies an ideal or interesting - candidate. If this element is None, it signifies that the stream has - ended because an ideal candidate has been found. The second element is - the number of hashes computed up this point. + Instances of HashNameResponse. The final entry in the stream will be of + `maximum_distance` Hamming distance or less from the target string, + while all others will be of less than `interesting_hamming_distance`. Raises: ResourceLimitExceededError: If the computation exceeds `maximum_hashes` iterations. """ hashes_computed = 0 - for secret in _bytestrings_of_length(length): + for secret in _all_bytestrings(): if stop_event.is_set(): - # Yield a sentinel and stop the generator if the RPC has been - # cancelled. - yield None, hashes_computed raise StopIteration() # pylint: disable=stop-iteration-return candidate_hash = _get_hash(secret) distance = _get_substring_hamming_distance(candidate_hash, target) @@ -150,72 +161,19 @@ def _find_secret_of_length(target, yield hash_name_pb2.HashNameResponse( secret=base64.b64encode(secret), hashed_name=candidate_hash, - hamming_distance=distance), hashes_computed + hamming_distance=distance) elif distance <= ideal_distance: - # Yield the ideal candidate followed by a sentinel to signal the end - # of the stream. + # Yield ideal candidate and end the stream. yield hash_name_pb2.HashNameResponse( secret=base64.b64encode(secret), hashed_name=candidate_hash, - hamming_distance=distance), hashes_computed - yield None, hashes_computed + hamming_distance=distance) raise StopIteration() # pylint: disable=stop-iteration-return hashes_computed += 1 if hashes_computed == maximum_hashes: raise ResourceLimitExceededError() -def _find_secret(target, - maximum_distance, - stop_event, - maximum_hashes, - interesting_hamming_distance=None): - """Find candidate strings. - - Search through the space of all bytestrings, in order of increasing length, - indefinitely, until a hash with a Hamming distance of `maximum_distance` or - less has been found. - - Args: - target: The search string. - maximum_distance: The desired Hamming distance. - stop_event: An event indicating whether the RPC should terminate. - maximum_hashes: The maximum number of hashes to check before stopping. - interesting_hamming_distance: If specified, strings with a Hamming - distance from the target below this value will be yielded. - - Yields: - Instances of HashNameResponse. The final entry in the stream will be of - `maximum_distance` Hamming distance or less from the target string, - while all others will be of less than `interesting_hamming_distance`. - - Raises: - ResourceLimitExceededError: If the computation exceeds `maximum_hashes` - iterations. - """ - length = 1 - total_hashes = 0 - while True: - last_hashes_computed = 0 - for candidate, hashes_computed in _find_secret_of_length( - target, - maximum_distance, - length, - stop_event, - maximum_hashes - total_hashes, - interesting_hamming_distance=interesting_hamming_distance): - last_hashes_computed = hashes_computed - if candidate is not None: - yield candidate - else: - raise StopIteration() # pylint: disable=stop-iteration-return - if stop_event.is_set(): - # Terminate the generator if the RPC has been cancelled. - raise StopIteration() # pylint: disable=stop-iteration-return - total_hashes += last_hashes_computed - length += 1 - - class HashFinder(hash_name_pb2_grpc.HashFinderServicer): def __init__(self, maximum_hashes): From 7fa7f932e3debe33d38ecc25be7ae9f1dbf4fe98 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 26 Jun 2019 09:32:58 -0700 Subject: [PATCH 23/31] Pull search algorithm out into another module --- examples/python/cancellation/BUILD.bazel | 17 ++- examples/python/cancellation/search.py | 158 +++++++++++++++++++++++ examples/python/cancellation/server.py | 151 +--------------------- 3 files changed, 180 insertions(+), 146 deletions(-) create mode 100644 examples/python/cancellation/search.py diff --git a/examples/python/cancellation/BUILD.bazel b/examples/python/cancellation/BUILD.bazel index 30bede22f22..31bba1548fa 100644 --- a/examples/python/cancellation/BUILD.bazel +++ b/examples/python/cancellation/BUILD.bazel @@ -19,12 +19,14 @@ load("//bazel:python_rules.bzl", "py_proto_library") proto_library( name = "hash_name_proto", - srcs = ["hash_name.proto"] + srcs = ["hash_name.proto"], + testonly = 1, ) py_proto_library( name = "hash_name_proto_pb2", deps = [":hash_name_proto"], + testonly = 1, well_known_protos = False, ) @@ -39,13 +41,24 @@ py_binary( srcs_version = "PY2AND3", ) +py_library( + name = "search", + srcs = ["search.py"], + srcs_version = "PY2AND3", + deps = [ + ":hash_name_proto_pb2", + ], + testonly = 1, +) + py_binary( name = "server", testonly = 1, srcs = ["server.py"], deps = [ "//src/python/grpcio/grpc:grpcio", - ":hash_name_proto_pb2" + ":hash_name_proto_pb2", + ":search", ] + select({ "//conditions:default": [requirement("futures")], "//:python3": [], diff --git a/examples/python/cancellation/search.py b/examples/python/cancellation/search.py new file mode 100644 index 00000000000..95e479deffa --- /dev/null +++ b/examples/python/cancellation/search.py @@ -0,0 +1,158 @@ +# Copyright the 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""A search algorithm over the space of all bytestrings.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import base64 +import hashlib +import logging +import struct + +from examples.python.cancellation import hash_name_pb2 + +_LOGGER = logging.getLogger(__name__) +_BYTE_MAX = 255 + + +def _get_hamming_distance(a, b): + """Calculates hamming distance between strings of equal length.""" + distance = 0 + for char_a, char_b in zip(a, b): + if char_a.lower() != char_b.lower(): + distance += 1 + return distance + + +def _get_substring_hamming_distance(candidate, target): + """Calculates the minimum hamming distance between between the target + and any substring of the candidate. + + Args: + candidate: The string whose substrings will be tested. + target: The target string. + + Returns: + The minimum Hamming distance between candidate and target. + """ + min_distance = None + for i in range(len(candidate) - len(target) + 1): + distance = _get_hamming_distance(candidate[i:i + len(target)], target) + if min_distance is None or distance < min_distance: + min_distance = distance + return min_distance + + +def _get_hash(secret): + hasher = hashlib.sha1() + hasher.update(secret) + return base64.b64encode(hasher.digest()).decode('ascii') + + +class ResourceLimitExceededError(Exception): + """Signifies the request has exceeded configured limits.""" + + +def _bytestrings_of_length(length): + """Generates a stream containing all bytestrings of a given length. + + Args: + length: A positive integer length. + + Yields: + All bytestrings of length `length`. + """ + digits = [0] * length + while True: + yield b''.join(struct.pack('B', i) for i in digits) + digits[-1] += 1 + i = length - 1 + while digits[i] == _BYTE_MAX + 1: + digits[i] = 0 + i -= 1 + if i == -1: + # Terminate the generator since we've run out of strings of + # `length` bytes. + raise StopIteration() # pylint: disable=stop-iteration-return + else: + digits[i] += 1 + + +def _all_bytestrings(): + """Generates a stream containing all possible bytestrings. + + This generator does not terminate. + + Yields: + All bytestrings in ascending order of length. + """ + length = 1 + while True: + for bytestring in _bytestrings_of_length(length): + yield bytestring + length += 1 + + +def search(target, + ideal_distance, + stop_event, + maximum_hashes, + interesting_hamming_distance=None): + """Find candidate strings. + + Search through the space of all bytestrings, in order of increasing length, + indefinitely, until a hash with a Hamming distance of `maximum_distance` or + less has been found. + + Args: + target: The search string. + ideal_distance: The desired Hamming distance. + stop_event: An event indicating whether the RPC should terminate. + maximum_hashes: The maximum number of hashes to check before stopping. + interesting_hamming_distance: If specified, strings with a Hamming + distance from the target below this value will be yielded. + + Yields: + Instances of HashNameResponse. The final entry in the stream will be of + `maximum_distance` Hamming distance or less from the target string, + while all others will be of less than `interesting_hamming_distance`. + + Raises: + ResourceLimitExceededError: If the computation exceeds `maximum_hashes` + iterations. + """ + hashes_computed = 0 + for secret in _all_bytestrings(): + if stop_event.is_set(): + raise StopIteration() # pylint: disable=stop-iteration-return + candidate_hash = _get_hash(secret) + distance = _get_substring_hamming_distance(candidate_hash, target) + if interesting_hamming_distance is not None and distance <= interesting_hamming_distance: + # Surface interesting candidates, but don't stop. + yield hash_name_pb2.HashNameResponse( + secret=base64.b64encode(secret), + hashed_name=candidate_hash, + hamming_distance=distance) + elif distance <= ideal_distance: + # Yield ideal candidate and end the stream. + yield hash_name_pb2.HashNameResponse( + secret=base64.b64encode(secret), + hashed_name=candidate_hash, + hamming_distance=distance) + raise StopIteration() # pylint: disable=stop-iteration-return + hashes_computed += 1 + if hashes_computed == maximum_hashes: + raise ResourceLimitExceededError() diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 5d72b003dc6..2c715565031 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -19,21 +19,17 @@ from __future__ import print_function from concurrent import futures import argparse -import base64 import contextlib import logging -import hashlib -import struct import time import threading import grpc +import search from examples.python.cancellation import hash_name_pb2 from examples.python.cancellation import hash_name_pb2_grpc -_BYTE_MAX = 255 - _LOGGER = logging.getLogger(__name__) _SERVER_HOST = 'localhost' _ONE_DAY_IN_SECONDS = 60 * 60 * 24 @@ -41,139 +37,6 @@ _ONE_DAY_IN_SECONDS = 60 * 60 * 24 _DESCRIPTION = "A server for finding hashes similar to names." -def _get_hamming_distance(a, b): - """Calculates hamming distance between strings of equal length.""" - assert len(a) == len(b), "'{}', '{}'".format(a, b) - distance = 0 - for char_a, char_b in zip(a, b): - if char_a.lower() != char_b.lower(): - distance += 1 - return distance - - -def _get_substring_hamming_distance(candidate, target): - """Calculates the minimum hamming distance between between the target - and any substring of the candidate. - - Args: - candidate: The string whose substrings will be tested. - target: The target string. - - Returns: - The minimum Hamming distance between candidate and target. - """ - assert len(target) <= len(candidate) - assert candidate - min_distance = None - for i in range(len(candidate) - len(target) + 1): - distance = _get_hamming_distance(candidate[i:i + len(target)], target) - if min_distance is None or distance < min_distance: - min_distance = distance - return min_distance - - -def _get_hash(secret): - hasher = hashlib.sha1() - hasher.update(secret) - return base64.b64encode(hasher.digest()).decode('ascii') - - -class ResourceLimitExceededError(Exception): - """Signifies the request has exceeded configured limits.""" - - -def _bytestrings_of_length(length): - """Generates a stream containing all bytestrings of a given length. - - Args: - length: A positive integer length. - - Yields: - All bytestrings of length `length`. - """ - digits = [0] * length - while True: - yield b''.join(struct.pack('B', i) for i in digits) - digits[-1] += 1 - i = length - 1 - while digits[i] == _BYTE_MAX + 1: - digits[i] = 0 - i -= 1 - if i == -1: - # Terminate the generator since we've run out of strings of - # `length` bytes. - raise StopIteration() # pylint: disable=stop-iteration-return - else: - digits[i] += 1 - - -def _all_bytestrings(): - """Generates a stream containing all possible bytestrings. - - This generator does not terminate. - - Yields: - All bytestrings in ascending order of length. - """ - length = 1 - while True: - for bytestring in _bytestrings_of_length(length): - yield bytestring - length += 1 - - -def _find_secret(target, - ideal_distance, - stop_event, - maximum_hashes, - interesting_hamming_distance=None): - """Find candidate strings. - - Search through the space of all bytestrings, in order of increasing length, - indefinitely, until a hash with a Hamming distance of `maximum_distance` or - less has been found. - - Args: - target: The search string. - ideal_distance: The desired Hamming distance. - stop_event: An event indicating whether the RPC should terminate. - maximum_hashes: The maximum number of hashes to check before stopping. - interesting_hamming_distance: If specified, strings with a Hamming - distance from the target below this value will be yielded. - - Yields: - Instances of HashNameResponse. The final entry in the stream will be of - `maximum_distance` Hamming distance or less from the target string, - while all others will be of less than `interesting_hamming_distance`. - - Raises: - ResourceLimitExceededError: If the computation exceeds `maximum_hashes` - iterations. - """ - hashes_computed = 0 - for secret in _all_bytestrings(): - if stop_event.is_set(): - raise StopIteration() # pylint: disable=stop-iteration-return - candidate_hash = _get_hash(secret) - distance = _get_substring_hamming_distance(candidate_hash, target) - if interesting_hamming_distance is not None and distance <= interesting_hamming_distance: - # Surface interesting candidates, but don't stop. - yield hash_name_pb2.HashNameResponse( - secret=base64.b64encode(secret), - hashed_name=candidate_hash, - hamming_distance=distance) - elif distance <= ideal_distance: - # Yield ideal candidate and end the stream. - yield hash_name_pb2.HashNameResponse( - secret=base64.b64encode(secret), - hashed_name=candidate_hash, - hamming_distance=distance) - raise StopIteration() # pylint: disable=stop-iteration-return - hashes_computed += 1 - if hashes_computed == maximum_hashes: - raise ResourceLimitExceededError() - - class HashFinder(hash_name_pb2_grpc.HashFinderServicer): def __init__(self, maximum_hashes): @@ -191,10 +54,10 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): candidates = [] try: candidates = list( - _find_secret(request.desired_name, - request.ideal_hamming_distance, stop_event, - self._maximum_hashes)) - except ResourceLimitExceededError: + search.search(request.desired_name, + request.ideal_hamming_distance, stop_event, + self._maximum_hashes)) + except search.ResourceLimitExceededError: _LOGGER.info("Cancelling RPC due to exhausted resources.") context.cancel() _LOGGER.debug("Servicer thread returning.") @@ -210,7 +73,7 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): stop_event.set() context.add_callback(on_rpc_done) - secret_generator = _find_secret( + secret_generator = search.search( request.desired_name, request.ideal_hamming_distance, stop_event, @@ -219,7 +82,7 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): try: for candidate in secret_generator: yield candidate - except ResourceLimitExceededError: + except search.ResourceLimitExceededError: _LOGGER.info("Cancelling RPC due to exhausted resources.") context.cancel() _LOGGER.debug("Regained servicer thread.") From 4100084c78cec4b5d7b5fe3e1b4b517523b45cb2 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 26 Jun 2019 09:37:33 -0700 Subject: [PATCH 24/31] Use six for compatibility in client --- examples/python/cancellation/BUILD.bazel | 1 + examples/python/cancellation/client.py | 9 +++------ 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/examples/python/cancellation/BUILD.bazel b/examples/python/cancellation/BUILD.bazel index 31bba1548fa..62bc7b2fa46 100644 --- a/examples/python/cancellation/BUILD.bazel +++ b/examples/python/cancellation/BUILD.bazel @@ -37,6 +37,7 @@ py_binary( deps = [ "//src/python/grpcio/grpc:grpcio", ":hash_name_proto_pb2", + requirement("six"), ], srcs_version = "PY2AND3", ) diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index 9223e7c6ff4..2661efc196f 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -20,14 +20,11 @@ from __future__ import print_function import argparse import logging import signal +import six import threading -try: - from queue import Queue - from queue import Empty as QueueEmpty -except ImportError: - from Queue import Queue - from Queue import Empty as QueueEmpty +from six.moves.queue import Queue +from six.moves.queue import Empty as QueueEmpty import grpc From 7486026eb97671f55a6320012d55222cfe3ffcf0 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 26 Jun 2019 09:42:55 -0700 Subject: [PATCH 25/31] Annotate the proto file --- examples/python/cancellation/hash_name.proto | 22 ++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/examples/python/cancellation/hash_name.proto b/examples/python/cancellation/hash_name.proto index e0a5c8357be..7b4e47e056f 100644 --- a/examples/python/cancellation/hash_name.proto +++ b/examples/python/cancellation/hash_name.proto @@ -16,19 +16,41 @@ syntax = "proto3"; package hash_name; +// A request for a single secret whose hash is similar to a desired name. message HashNameRequest { + // The string that is desired in the secret's hash. string desired_name = 1; + + // The ideal Hamming distance betwen desired_name and the secret that will + // be searched for. int32 ideal_hamming_distance = 2; + + // A Hamming distance greater than the ideal Hamming distance. Search results + // with a Hamming distance less than this value but greater than the ideal + // distance will be returned back to the client but will not terminate the + // search. int32 interesting_hamming_distance = 3; } message HashNameResponse { + // The search result. string secret = 1; + + // The hash of the search result. A substring of this is of + // ideal_hamming_distance Hamming distance or less from desired_name. string hashed_name = 2; + + // The Hamming distance between hashed_name and desired_name. int32 hamming_distance = 3; } service HashFinder { + + // Search for a single string whose hash is similar to the specified + // desired_name. interesting_hamming_distance is ignored. rpc Find (HashNameRequest) returns (HashNameResponse) {} + + // Search for a string whose hash is similar to the specified desired_name, + // but also stream back less-than-ideal candidates. rpc FindRange (HashNameRequest) returns (stream HashNameResponse) {} } From 25f3439c910e0168b4dbe776b77526eed84f5213 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 26 Jun 2019 09:45:47 -0700 Subject: [PATCH 26/31] Make whole package testonly --- examples/python/cancellation/BUILD.bazel | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/examples/python/cancellation/BUILD.bazel b/examples/python/cancellation/BUILD.bazel index 62bc7b2fa46..81cd3a881b8 100644 --- a/examples/python/cancellation/BUILD.bazel +++ b/examples/python/cancellation/BUILD.bazel @@ -17,22 +17,21 @@ load("@grpc_python_dependencies//:requirements.bzl", "requirement") load("//bazel:python_rules.bzl", "py_proto_library") +package(default_testonly = 1) + proto_library( name = "hash_name_proto", srcs = ["hash_name.proto"], - testonly = 1, ) py_proto_library( name = "hash_name_proto_pb2", deps = [":hash_name_proto"], - testonly = 1, well_known_protos = False, ) py_binary( name = "client", - testonly = 1, srcs = ["client.py"], deps = [ "//src/python/grpcio/grpc:grpcio", @@ -49,12 +48,10 @@ py_library( deps = [ ":hash_name_proto_pb2", ], - testonly = 1, ) py_binary( name = "server", - testonly = 1, srcs = ["server.py"], deps = [ "//src/python/grpcio/grpc:grpcio", From 1db141acccf91be59e46b584a0d1b883bf686ab9 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 26 Jun 2019 09:51:44 -0700 Subject: [PATCH 27/31] Change section title --- examples/python/cancellation/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/python/cancellation/README.md b/examples/python/cancellation/README.md index 57ea1a30850..ed85fbe53cb 100644 --- a/examples/python/cancellation/README.md +++ b/examples/python/cancellation/README.md @@ -1,4 +1,4 @@ -#### The Example +### Cancellation In the example, we implement a silly algorithm. We search for bytestrings whose hashes are similar to a given search string. For example, say we're looking for From 915e97b115cf5d339af2f6a1b5255f61de1b89c3 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Wed, 26 Jun 2019 13:16:02 -0700 Subject: [PATCH 28/31] Fix main thread starvation issues --- examples/python/cancellation/README.md | 74 +++---------------- examples/python/cancellation/client.py | 49 ++---------- examples/python/cancellation/server.py | 2 +- .../test/_cancellation_example_test.py | 2 +- 4 files changed, 20 insertions(+), 107 deletions(-) diff --git a/examples/python/cancellation/README.md b/examples/python/cancellation/README.md index ed85fbe53cb..26ef61c329f 100644 --- a/examples/python/cancellation/README.md +++ b/examples/python/cancellation/README.md @@ -40,30 +40,16 @@ stub = hash_name_pb2_grpc.HashFinderStub(channel) future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name=name)) def cancel_request(unused_signum, unused_frame): future.cancel() + sys.exit(0) signal.signal(signal.SIGINT, cancel_request) -``` - -It's also important that you not block indefinitely on the RPC. Otherwise, the -signal handler will never have a chance to run. -```python -while True: - try: - result = future.result(timeout=_TIMEOUT_SECONDS) - except grpc.FutureTimeoutError: - continue - except grpc.FutureCancelledError: - break - print("Got response: \n{}".format(result)) - break +result = future.result() +print(result) ``` -Here, we repeatedly block on a result for up to `_TIMEOUT_SECONDS`. Doing so -gives the signal handlers a chance to run. In the case that our timeout -was reached, we simply continue on in the loop. In the case that the RPC was -cancelled (by our user's ctrl+c), we break out of the loop cleanly. Finally, if -we received the result of the RPC, we print it out for the user and exit the -loop. +We also call `sys.exit(0)` to terminate the process. If we do not do this, then +`future.result()` with throw an `RpcError`. Alternatively, you may catch this +exception. ##### Cancelling a Server-Side Streaming RPC from the Client @@ -78,53 +64,15 @@ stub = hash_name_pb2_grpc.HashFinderStub(channel) result_generator = stub.FindRange(hash_name_pb2.HashNameRequest(desired_name=name)) def cancel_request(unused_signum, unused_frame): result_generator.cancel() + sys.exit(0) signal.signal(signal.SIGINT, cancel_request) +for result in result_generator: + print(result) ``` -However, the streaming case is complicated by the fact that there is no way to -propagate a timeout to Python generators. As a result, simply iterating over the -results of the RPC can block indefinitely and the signal handler may never run. -Instead, we iterate over the generator on another thread and retrieve the -results on the main thread with a synchronized `Queue`. - -```python -result_queue = Queue() -def iterate_responses(result_generator, result_queue): - try: - for result in result_generator: - result_queue.put(result) - except grpc.RpcError as rpc_error: - if rpc_error.code() != grpc.StatusCode.CANCELLED: - result_queue.put(None) - raise rpc_error - result_queue.put(None) - print("RPC complete") -response_thread = threading.Thread(target=iterate_responses, args=(result_generator, result_queue)) -response_thread.daemon = True -response_thread.start() -``` - -While this thread iterating over the results may block indefinitely, we can -structure the code running on our main thread in such a way that signal handlers -are guaranteed to be run at least every `_TIMEOUT_SECONDS` seconds. - -```python -while result_generator.running(): - try: - result = result_queue.get(timeout=_TIMEOUT_SECONDS) - except QueueEmpty: - continue - if result is None: - break - print("Got result: {}".format(result)) -``` - -Similarly to the unary example above, we continue in a loop waiting for results, -taking care to block for intervals of `_TIMEOUT_SECONDS` at the longest. -Finally, we use `None` as a sentinel value to signal the end of the stream. +We also call `sys.exit(0)` here to terminate the process. Alternatively, you may +catch the `RpcError` raised by the for loop upon cancellation. -Using this scheme, our process responds nicely to `SIGINT`s while also -explicitly cancelling its RPCs. #### Cancellation on the Server Side diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index 2661efc196f..491dffa170b 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -27,6 +27,8 @@ from six.moves.queue import Queue from six.moves.queue import Empty as QueueEmpty import grpc +import os +import sys from examples.python.cancellation import hash_name_pb2 from examples.python.cancellation import hash_name_pb2_grpc @@ -34,8 +36,6 @@ from examples.python.cancellation import hash_name_pb2_grpc _DESCRIPTION = "A client for finding hashes similar to names." _LOGGER = logging.getLogger(__name__) -_TIMEOUT_SECONDS = 0.05 - def run_unary_client(server_target, name, ideal_distance): with grpc.insecure_channel(server_target) as channel: @@ -47,21 +47,11 @@ def run_unary_client(server_target, name, ideal_distance): def cancel_request(unused_signum, unused_frame): future.cancel() + sys.exit(0) signal.signal(signal.SIGINT, cancel_request) - while True: - try: - result = future.result(timeout=_TIMEOUT_SECONDS) - except grpc.FutureTimeoutError: - continue - except grpc.FutureCancelledError: - break - except grpc.RpcError as rpc_error: - if rpc_error.code() == grpc.StatusCode.CANCELLED: - break - raise rpc_error - print(result) - break + result = future.result() + print(result) def run_streaming_client(server_target, name, ideal_distance, @@ -77,35 +67,10 @@ def run_streaming_client(server_target, name, ideal_distance, def cancel_request(unused_signum, unused_frame): result_generator.cancel() + sys.exit(0) signal.signal(signal.SIGINT, cancel_request) - result_queue = Queue() - - def iterate_responses(result_generator, result_queue): - try: - for result in result_generator: - result_queue.put(result) - except grpc.RpcError as rpc_error: - if rpc_error.code() != grpc.StatusCode.CANCELLED: - result_queue.put(None) - raise rpc_error - # Enqueue a sentinel to signal the end of the stream. - result_queue.put(None) - - # TODO(https://github.com/grpc/grpc/issues/19464): Do everything on the - # main thread. - response_thread = threading.Thread( - target=iterate_responses, args=(result_generator, result_queue)) - response_thread.daemon = True - response_thread.start() - - while result_generator.running(): - try: - result = result_queue.get(timeout=_TIMEOUT_SECONDS) - except QueueEmpty: - continue - if result is None: - break + for result in result_generator: print(result) diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 2c715565031..25ce494ee39 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -117,7 +117,7 @@ def main(): parser.add_argument( '--maximum-hashes', type=int, - default=10000, + default=1000000, nargs='?', help='The maximum number of hashes to search before cancelling.') args = parser.parse_args() diff --git a/examples/python/cancellation/test/_cancellation_example_test.py b/examples/python/cancellation/test/_cancellation_example_test.py index 45b0ccb400a..2301cc63c67 100644 --- a/examples/python/cancellation/test/_cancellation_example_test.py +++ b/examples/python/cancellation/test/_cancellation_example_test.py @@ -74,7 +74,7 @@ class CancellationExampleTest(unittest.TestCase): client_process1 = _start_client(test_port, 'aaaaaaaaaa', 0) client_process1.send_signal(signal.SIGINT) client_process1.wait() - client_process2 = _start_client(test_port, 'aaaaaaaaaa', 0) + client_process2 = _start_client(test_port, 'aa', 0) client_return_code = client_process2.wait() self.assertEqual(0, client_return_code) self.assertIsNone(server_process.poll()) From ba39c9255c0b3cc4335d0377da5f53b297804a40 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Mon, 8 Jul 2019 10:00:36 -0700 Subject: [PATCH 29/31] Adopt reviewer's comments --- examples/python/cancellation/search.py | 30 +++++++++----------------- examples/python/cancellation/server.py | 3 +++ 2 files changed, 13 insertions(+), 20 deletions(-) diff --git a/examples/python/cancellation/search.py b/examples/python/cancellation/search.py index 95e479deffa..8c41baf651c 100644 --- a/examples/python/cancellation/search.py +++ b/examples/python/cancellation/search.py @@ -19,6 +19,7 @@ from __future__ import print_function import base64 import hashlib +import itertools import logging import struct @@ -32,7 +33,7 @@ def _get_hamming_distance(a, b): """Calculates hamming distance between strings of equal length.""" distance = 0 for char_a, char_b in zip(a, b): - if char_a.lower() != char_b.lower(): + if char_a != char_b: distance += 1 return distance @@ -49,8 +50,11 @@ def _get_substring_hamming_distance(candidate, target): The minimum Hamming distance between candidate and target. """ min_distance = None + if len(target) > len(candidate): + raise ValueError("Candidate must be at least as long as target.") for i in range(len(candidate) - len(target) + 1): - distance = _get_hamming_distance(candidate[i:i + len(target)], target) + distance = _get_hamming_distance(candidate[i:i + len(target)].lower(), + target.lower()) if min_distance is None or distance < min_distance: min_distance = distance return min_distance @@ -75,20 +79,8 @@ def _bytestrings_of_length(length): Yields: All bytestrings of length `length`. """ - digits = [0] * length - while True: + for digits in itertools.product(range(_BYTE_MAX), repeat=length): yield b''.join(struct.pack('B', i) for i in digits) - digits[-1] += 1 - i = length - 1 - while digits[i] == _BYTE_MAX + 1: - digits[i] = 0 - i -= 1 - if i == -1: - # Terminate the generator since we've run out of strings of - # `length` bytes. - raise StopIteration() # pylint: disable=stop-iteration-return - else: - digits[i] += 1 def _all_bytestrings(): @@ -99,11 +91,9 @@ def _all_bytestrings(): Yields: All bytestrings in ascending order of length. """ - length = 1 - while True: - for bytestring in _bytestrings_of_length(length): - yield bytestring - length += 1 + for bytestring in itertools.chain.from_iterable( + _bytestrings_of_length(length) for length in itertools.count()): + yield bytestring def search(target, diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 25ce494ee39..22132d81fce 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -90,6 +90,9 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer): @contextlib.contextmanager def _running_server(port, maximum_hashes): + # We use only a single servicer thread here to demonstrate that, if managed + # carefully, cancelled RPCs can need not continue occupying servicers + # threads. server = grpc.server( futures.ThreadPoolExecutor(max_workers=1), maximum_concurrent_rpcs=1) hash_name_pb2_grpc.add_HashFinderServicer_to_server( From 79e78d16f703ad743fe1338f9f6abf0f9ff0e83a Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Mon, 8 Jul 2019 11:09:44 -0700 Subject: [PATCH 30/31] Pylint --- examples/python/cancellation/client.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index 491dffa170b..f5a180c7e57 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -20,15 +20,9 @@ from __future__ import print_function import argparse import logging import signal -import six -import threading - -from six.moves.queue import Queue -from six.moves.queue import Empty as QueueEmpty +import sys import grpc -import os -import sys from examples.python.cancellation import hash_name_pb2 from examples.python.cancellation import hash_name_pb2_grpc From 805afe647d72b68b9163d743e35e21867d4cdb70 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Mon, 8 Jul 2019 11:42:21 -0700 Subject: [PATCH 31/31] Copyright typo --- examples/python/cancellation/client.py | 2 +- examples/python/cancellation/search.py | 2 +- examples/python/cancellation/server.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/python/cancellation/client.py b/examples/python/cancellation/client.py index f5a180c7e57..f80f9668849 100644 --- a/examples/python/cancellation/client.py +++ b/examples/python/cancellation/client.py @@ -1,4 +1,4 @@ -# Copyright the 2019 gRPC authors. +# Copyright 2019 the gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/examples/python/cancellation/search.py b/examples/python/cancellation/search.py index 8c41baf651c..9d2331af1bb 100644 --- a/examples/python/cancellation/search.py +++ b/examples/python/cancellation/search.py @@ -1,4 +1,4 @@ -# Copyright the 2019 gRPC authors. +# Copyright 2019 the gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/examples/python/cancellation/server.py b/examples/python/cancellation/server.py index 22132d81fce..9597e8941b4 100644 --- a/examples/python/cancellation/server.py +++ b/examples/python/cancellation/server.py @@ -1,4 +1,4 @@ -# Copyright the 2019 gRPC authors. +# Copyright 2019 the gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License.