|
|
|
@ -111,6 +111,32 @@ end |
|
|
|
|
|
|
|
|
|
SlowStub = SlowService.rpc_stub_class |
|
|
|
|
|
|
|
|
|
# A test service that allows a synchronized RPC cancellation |
|
|
|
|
class SynchronizedCancellationService |
|
|
|
|
include GRPC::GenericService |
|
|
|
|
rpc :an_rpc, EchoMsg, EchoMsg |
|
|
|
|
attr_reader :received_md, :delay |
|
|
|
|
|
|
|
|
|
# notify_request_received and wait_until_rpc_cancelled are |
|
|
|
|
# callbacks to synchronously allow the client to proceed with |
|
|
|
|
# cancellation (after the unary request has been received), |
|
|
|
|
# and to synchronously wait until the client has cancelled the |
|
|
|
|
# current RPC. |
|
|
|
|
def initialize(notify_request_received, wait_until_rpc_cancelled) |
|
|
|
|
@notify_request_received = notify_request_received |
|
|
|
|
@wait_until_rpc_cancelled = wait_until_rpc_cancelled |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
def an_rpc(req, _call) |
|
|
|
|
GRPC.logger.info('starting a synchronusly cancelled rpc') |
|
|
|
|
@notify_request_received.call(req) |
|
|
|
|
@wait_until_rpc_cancelled.call |
|
|
|
|
req # send back the req as the response |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
SynchronizedCancellationStub = SynchronizedCancellationService.rpc_stub_class |
|
|
|
|
|
|
|
|
|
# a test service that hangs onto call objects |
|
|
|
|
# and uses them after the server-side call has been |
|
|
|
|
# finished |
|
|
|
@ -384,20 +410,64 @@ describe GRPC::RpcServer do |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
it 'should handle cancellation correctly', server: true do |
|
|
|
|
service = SlowService.new |
|
|
|
|
request_received = false |
|
|
|
|
request_received_mu = Mutex.new |
|
|
|
|
request_received_cv = ConditionVariable.new |
|
|
|
|
notify_request_received = proc do |req| |
|
|
|
|
request_received_mu.synchronize do |
|
|
|
|
fail 'req is nil' if req.nil? |
|
|
|
|
expect(req.is_a?(EchoMsg)).to be true |
|
|
|
|
fail 'test bug - already set' if request_received |
|
|
|
|
request_received = true |
|
|
|
|
request_received_cv.signal |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
rpc_cancelled = false |
|
|
|
|
rpc_cancelled_mu = Mutex.new |
|
|
|
|
rpc_cancelled_cv = ConditionVariable.new |
|
|
|
|
wait_until_rpc_cancelled = proc do |
|
|
|
|
rpc_cancelled_mu.synchronize do |
|
|
|
|
loop do |
|
|
|
|
break if rpc_cancelled |
|
|
|
|
rpc_cancelled_cv.wait(rpc_cancelled_mu) |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
service = SynchronizedCancellationService.new(notify_request_received, |
|
|
|
|
wait_until_rpc_cancelled) |
|
|
|
|
@srv.handle(service) |
|
|
|
|
t = Thread.new { @srv.run } |
|
|
|
|
srv_thd = Thread.new { @srv.run } |
|
|
|
|
@srv.wait_till_running |
|
|
|
|
req = EchoMsg.new |
|
|
|
|
stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts) |
|
|
|
|
op = stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }, return_op: true) |
|
|
|
|
Thread.new do # cancel the call |
|
|
|
|
sleep 0.1 |
|
|
|
|
op.cancel |
|
|
|
|
stub = SynchronizedCancellationStub.new(@host, |
|
|
|
|
:this_channel_is_insecure, |
|
|
|
|
**client_opts) |
|
|
|
|
op = stub.an_rpc(req, return_op: true) |
|
|
|
|
|
|
|
|
|
client_thd = Thread.new do |
|
|
|
|
expect { op.execute }.to raise_error GRPC::Cancelled |
|
|
|
|
end |
|
|
|
|
expect { op.execute }.to raise_error GRPC::Cancelled |
|
|
|
|
|
|
|
|
|
request_received_mu.synchronize do |
|
|
|
|
loop do |
|
|
|
|
break if request_received |
|
|
|
|
request_received_cv.wait(request_received_mu) |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
op.cancel |
|
|
|
|
|
|
|
|
|
rpc_cancelled_mu.synchronize do |
|
|
|
|
fail 'test bug - already set' if rpc_cancelled |
|
|
|
|
rpc_cancelled = true |
|
|
|
|
rpc_cancelled_cv.signal |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
client_thd.join |
|
|
|
|
@srv.stop |
|
|
|
|
t.join |
|
|
|
|
srv_thd.join |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
it 'should handle multiple parallel requests', server: true do |
|
|
|
|