Clean up logging

pull/19465/head
Richard Belleville 6 years ago
parent 4c852bf25f
commit b12299701d
  1. 15
      examples/python/cancellation/README.md
  2. 10
      examples/python/cancellation/client.py
  3. 19
      examples/python/cancellation/server.py

@ -166,3 +166,18 @@ secret = _find_secret(stop_event)
Initiating a cancellation from the server side is simpler. Just call Initiating a cancellation from the server side is simpler. Just call
`ServicerContext.cancel()`. `ServicerContext.cancel()`.
In our example, we ensure that no single client is monopolizing the server by
cancelling after a configurable number of hashes have been checked.
```python
try:
for candidate in secret_generator:
yield candidate
except ResourceLimitExceededError:
print("Cancelling RPC due to exhausted resources.")
context.cancel()
```
In this type of situation, you may also consider returning a more specific error
using the [`grpcio-status`](https://pypi.org/project/grpcio-status/) package.

@ -47,11 +47,9 @@ _TIMEOUT_SECONDS = 0.05
def run_unary_client(server_target, name, ideal_distance): def run_unary_client(server_target, name, ideal_distance):
with grpc.insecure_channel(server_target) as channel: with grpc.insecure_channel(server_target) as channel:
stub = hash_name_pb2_grpc.HashFinderStub(channel) stub = hash_name_pb2_grpc.HashFinderStub(channel)
print("Sending request")
future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name=name, future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name=name,
ideal_hamming_distance=ideal_distance)) ideal_hamming_distance=ideal_distance))
def cancel_request(unused_signum, unused_frame): def cancel_request(unused_signum, unused_frame):
print("Cancelling request.")
future.cancel() future.cancel()
signal.signal(signal.SIGINT, cancel_request) signal.signal(signal.SIGINT, cancel_request)
while True: while True:
@ -61,19 +59,17 @@ def run_unary_client(server_target, name, ideal_distance):
continue continue
except grpc.FutureCancelledError: except grpc.FutureCancelledError:
break break
print("Got response: \n{}".format(result)) print(result)
break break
def run_streaming_client(server_target, name, ideal_distance, interesting_distance): def run_streaming_client(server_target, name, ideal_distance, interesting_distance):
with grpc.insecure_channel(server_target) as channel: with grpc.insecure_channel(server_target) as channel:
stub = hash_name_pb2_grpc.HashFinderStub(channel) stub = hash_name_pb2_grpc.HashFinderStub(channel)
print("Initiating RPC")
result_generator = stub.FindRange(hash_name_pb2.HashNameRequest(desired_name=name, result_generator = stub.FindRange(hash_name_pb2.HashNameRequest(desired_name=name,
ideal_hamming_distance=ideal_distance, ideal_hamming_distance=ideal_distance,
interesting_hamming_distance=interesting_distance)) interesting_hamming_distance=interesting_distance))
def cancel_request(unused_signum, unused_frame): def cancel_request(unused_signum, unused_frame):
print("Cancelling request.")
result_generator.cancel() result_generator.cancel()
signal.signal(signal.SIGINT, cancel_request) signal.signal(signal.SIGINT, cancel_request)
result_queue = Queue() result_queue = Queue()
@ -81,7 +77,6 @@ def run_streaming_client(server_target, name, ideal_distance, interesting_distan
def iterate_responses(result_generator, result_queue): def iterate_responses(result_generator, result_queue):
try: try:
for result in result_generator: for result in result_generator:
print("Result: {}".format(result))
result_queue.put(result) result_queue.put(result)
except grpc.RpcError as rpc_error: except grpc.RpcError as rpc_error:
if rpc_error.code() != grpc.StatusCode.CANCELLED: if rpc_error.code() != grpc.StatusCode.CANCELLED:
@ -89,7 +84,6 @@ def run_streaming_client(server_target, name, ideal_distance, interesting_distan
raise rpc_error raise rpc_error
# Enqueue a sentinel to signal the end of the stream. # Enqueue a sentinel to signal the end of the stream.
result_queue.put(None) result_queue.put(None)
print("RPC complete")
response_thread = threading.Thread(target=iterate_responses, args=(result_generator, result_queue)) response_thread = threading.Thread(target=iterate_responses, args=(result_generator, result_queue))
response_thread.daemon = True response_thread.daemon = True
response_thread.start() response_thread.start()
@ -101,7 +95,7 @@ def run_streaming_client(server_target, name, ideal_distance, interesting_distan
continue continue
if result is None: if result is None:
break break
print("Got result: {}".format(result)) print(result)
def main(): def main():
parser = argparse.ArgumentParser(description=_DESCRIPTION) parser = argparse.ArgumentParser(description=_DESCRIPTION)

@ -32,9 +32,6 @@ import grpc
from examples.python.cancellation import hash_name_pb2 from examples.python.cancellation import hash_name_pb2
from examples.python.cancellation import hash_name_pb2_grpc from examples.python.cancellation import hash_name_pb2_grpc
# TODO(rbellevi): Actually use the logger.
# TODO(rbellevi): Enforce per-user quotas with cancellation
_BYTE_MAX = 255 _BYTE_MAX = 255
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -134,7 +131,6 @@ def _find_secret(target, maximum_distance, stop_event, maximum_hashes, interesti
length = 1 length = 1
total_hashes = 0 total_hashes = 0
while True: while True:
print("Checking strings of length {}.".format(length))
last_hashes_computed = 0 last_hashes_computed = 0
for candidate, hashes_computed in _find_secret_of_length(target, maximum_distance, length, stop_event, maximum_hashes - total_hashes, interesting_hamming_distance=interesting_hamming_distance): for candidate, hashes_computed in _find_secret_of_length(target, maximum_distance, length, stop_event, maximum_hashes - total_hashes, interesting_hamming_distance=interesting_hamming_distance):
last_hashes_computed = hashes_computed last_hashes_computed = hashes_computed
@ -146,7 +142,6 @@ def _find_secret(target, maximum_distance, stop_event, maximum_hashes, interesti
# Terminate the generator if the RPC has been cancelled. # Terminate the generator if the RPC has been cancelled.
raise StopIteration() raise StopIteration()
total_hashes += last_hashes_computed total_hashes += last_hashes_computed
print("Incrementing length")
length += 1 length += 1
@ -159,15 +154,15 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer):
def Find(self, request, context): def Find(self, request, context):
stop_event = threading.Event() stop_event = threading.Event()
def on_rpc_done(): def on_rpc_done():
print("Attempting to regain servicer thread.") _LOGGER.debug("Attempting to regain servicer thread.")
stop_event.set() stop_event.set()
context.add_callback(on_rpc_done) context.add_callback(on_rpc_done)
try: try:
candidates = list(_find_secret(request.desired_name, request.ideal_hamming_distance, stop_event, self._maximum_hashes)) candidates = list(_find_secret(request.desired_name, request.ideal_hamming_distance, stop_event, self._maximum_hashes))
except ResourceLimitExceededError: except ResourceLimitExceededError:
print("Cancelling RPC due to exhausted resources.") _LOGGER.info("Cancelling RPC due to exhausted resources.")
context.cancel() context.cancel()
print("Servicer thread returning.") _LOGGER.debug("Servicer thread returning.")
if not candidates: if not candidates:
return hash_name_pb2.HashNameResponse() return hash_name_pb2.HashNameResponse()
return candidates[-1] return candidates[-1]
@ -176,7 +171,7 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer):
def FindRange(self, request, context): def FindRange(self, request, context):
stop_event = threading.Event() stop_event = threading.Event()
def on_rpc_done(): def on_rpc_done():
print("Attempting to regain servicer thread.") _LOGGER.debug("Attempting to regain servicer thread.")
stop_event.set() stop_event.set()
context.add_callback(on_rpc_done) context.add_callback(on_rpc_done)
secret_generator = _find_secret(request.desired_name, secret_generator = _find_secret(request.desired_name,
@ -188,9 +183,9 @@ class HashFinder(hash_name_pb2_grpc.HashFinderServicer):
for candidate in secret_generator: for candidate in secret_generator:
yield candidate yield candidate
except ResourceLimitExceededError: except ResourceLimitExceededError:
print("Cancelling RPC due to exhausted resources.") _LOGGER.info("Cancelling RPC due to exhausted resources.")
context.cancel context.cancel()
print("Regained servicer thread.") _LOGGER.debug("Regained servicer thread.")
def _run_server(port, maximum_hashes): def _run_server(port, maximum_hashes):

Loading…
Cancel
Save