|
|
|
### Cancellation
|
|
|
|
|
|
|
|
In the example, we implement a silly algorithm. We search for bytestrings whose
|
|
|
|
hashes are similar to a given search string. For example, say we're looking for
|
|
|
|
the string "doctor". Our algorithm may return `JrqhZVkTDoctYrUlXDbL6pfYQHU=` or
|
|
|
|
`RC9/7mlM3ldy4TdoctOc6WzYbO4=`. This is a brute force algorithm, so the server
|
|
|
|
performing the search must be conscious of the resources it allows to each client
|
|
|
|
and each client must be conscientious of the resources it demands of the server.
|
|
|
|
|
|
|
|
In particular, we ensure that client processes cancel the stream explicitly
|
|
|
|
before terminating and we ensure that server processes cancel RPCs that have gone on longer
|
|
|
|
than a certain number of iterations.
|
|
|
|
|
|
|
|
#### Cancellation on the Client Side
|
|
|
|
|
|
|
|
A client may cancel an RPC for several reasons. Perhaps the data it requested
|
|
|
|
has been made irrelevant. Perhaps you, as the client, want to be a good citizen
|
|
|
|
of the server and are conserving compute resources.
|
|
|
|
|
|
|
|
##### Cancelling a Server-Side Unary RPC from the Client
|
|
|
|
|
|
|
|
The default RPC methods on a stub will simply return the result of an RPC.
|
|
|
|
|
|
|
|
```python
|
|
|
|
>>> stub = hash_name_pb2_grpc.HashFinderStub(channel)
|
|
|
|
>>> stub.Find(hash_name_pb2.HashNameRequest(desired_name=name))
|
|
|
|
<hash_name_pb2.HashNameResponse object at 0x7fe2eb8ce2d0>
|
|
|
|
```
|
|
|
|
|
|
|
|
But you may use the `future()` method to receive an instance of `grpc.Future`.
|
|
|
|
This interface allows you to wait on a response with a timeout, add a callback
|
|
|
|
to be executed when the RPC completes, or to cancel the RPC before it has
|
|
|
|
completed.
|
|
|
|
|
|
|
|
In the example, we use this interface to cancel our in-progress RPC when the
|
|
|
|
user interrupts the process with ctrl-c.
|
|
|
|
|
|
|
|
```python
|
|
|
|
stub = hash_name_pb2_grpc.HashFinderStub(channel)
|
|
|
|
future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name=name))
|
|
|
|
def cancel_request(unused_signum, unused_frame):
|
|
|
|
future.cancel()
|
|
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, cancel_request)
|
|
|
|
|
|
|
|
result = future.result()
|
|
|
|
print(result)
|
|
|
|
```
|
|
|
|
|
|
|
|
We also call `sys.exit(0)` to terminate the process. If we do not do this, then
|
|
|
|
`future.result()` with throw an `RpcError`. Alternatively, you may catch this
|
|
|
|
exception.
|
|
|
|
|
|
|
|
|
|
|
|
##### Cancelling a Server-Side Streaming RPC from the Client
|
|
|
|
|
|
|
|
Cancelling a Server-side streaming RPC is even simpler from the perspective of
|
|
|
|
the gRPC API. The default stub method is already an instance of `grpc.Future`,
|
|
|
|
so the methods outlined above still apply. It is also a generator, so we may
|
|
|
|
iterate over it to yield the results of our RPC.
|
|
|
|
|
|
|
|
```python
|
|
|
|
stub = hash_name_pb2_grpc.HashFinderStub(channel)
|
|
|
|
result_generator = stub.FindRange(hash_name_pb2.HashNameRequest(desired_name=name))
|
|
|
|
def cancel_request(unused_signum, unused_frame):
|
|
|
|
result_generator.cancel()
|
|
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, cancel_request)
|
|
|
|
for result in result_generator:
|
|
|
|
print(result)
|
|
|
|
```
|
|
|
|
|
|
|
|
We also call `sys.exit(0)` here to terminate the process. Alternatively, you may
|
|
|
|
catch the `RpcError` raised by the for loop upon cancellation.
|
|
|
|
|
|
|
|
|
|
|
|
#### Cancellation on the Server Side
|
|
|
|
|
|
|
|
A server is responsible for cancellation in two ways. It must respond in some way
|
|
|
|
when a client initiates a cancellation, otherwise long-running computations
|
|
|
|
could continue indefinitely.
|
|
|
|
|
|
|
|
It may also decide to cancel the RPC for its own reasons. In our example, the
|
|
|
|
server can be configured to cancel an RPC after a certain number of hashes has
|
|
|
|
been computed in order to conserve compute resources.
|
|
|
|
|
|
|
|
##### Responding to Cancellations from a Servicer Thread
|
|
|
|
|
|
|
|
It's important to remember that a gRPC Python server is backed by a thread pool
|
|
|
|
with a fixed size. When an RPC is cancelled, the library does *not* terminate
|
|
|
|
your servicer thread. It is your responsibility as the application author to
|
|
|
|
ensure that your servicer thread terminates soon after the RPC has been
|
|
|
|
cancelled.
|
|
|
|
|
|
|
|
In this example, we use the `ServicerContext.add_callback` method to set a
|
|
|
|
`threading.Event` object when the RPC is terminated. We pass this `Event` object
|
|
|
|
down through our hashing algorithm and ensure to check that the RPC is still
|
|
|
|
ongoing before each iteration.
|
|
|
|
|
|
|
|
```python
|
|
|
|
stop_event = threading.Event()
|
|
|
|
def on_rpc_done():
|
|
|
|
# Regain servicer thread.
|
|
|
|
stop_event.set()
|
|
|
|
context.add_callback(on_rpc_done)
|
|
|
|
secret = _find_secret(stop_event)
|
|
|
|
```
|
|
|
|
|
|
|
|
##### Initiating a Cancellation on the Server Side
|
|
|
|
|
|
|
|
Initiating a cancellation from the server side is simpler. Just call
|
|
|
|
`ServicerContext.cancel()`.
|
|
|
|
|
|
|
|
In our example, we ensure that no single client is monopolizing the server by
|
|
|
|
cancelling after a configurable number of hashes have been checked.
|
|
|
|
|
|
|
|
```python
|
|
|
|
try:
|
|
|
|
for candidate in secret_generator:
|
|
|
|
yield candidate
|
|
|
|
except ResourceLimitExceededError:
|
|
|
|
print("Cancelling RPC due to exhausted resources.")
|
|
|
|
context.cancel()
|
|
|
|
```
|
|
|
|
|
|
|
|
In this type of situation, you may also consider returning a more specific error
|
|
|
|
using the [`grpcio-status`](https://pypi.org/project/grpcio-status/) package.
|