From 39088914ef71e878ed3184c3b70eda8d332bb368 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Mon, 31 Jul 2017 14:45:54 -0700 Subject: [PATCH] fix cancellation test flake --- src/ruby/spec/generic/rpc_server_spec.rb | 88 +++++++++++++++++++++--- 1 file changed, 79 insertions(+), 9 deletions(-) diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index e0646f45997..e4fe594e220 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -111,6 +111,32 @@ end SlowStub = SlowService.rpc_stub_class +# A test service that allows a synchronized RPC cancellation +class SynchronizedCancellationService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + attr_reader :received_md, :delay + + # notify_request_received and wait_until_rpc_cancelled are + # callbacks to synchronously allow the client to proceed with + # cancellation (after the unary request has been received), + # and to synchronously wait until the client has cancelled the + # current RPC. + def initialize(notify_request_received, wait_until_rpc_cancelled) + @notify_request_received = notify_request_received + @wait_until_rpc_cancelled = wait_until_rpc_cancelled + end + + def an_rpc(req, _call) + GRPC.logger.info('starting a synchronusly cancelled rpc') + @notify_request_received.call(req) + @wait_until_rpc_cancelled.call + req # send back the req as the response + end +end + +SynchronizedCancellationStub = SynchronizedCancellationService.rpc_stub_class + # a test service that hangs onto call objects # and uses them after the server-side call has been # finished @@ -384,20 +410,64 @@ describe GRPC::RpcServer do end it 'should handle cancellation correctly', server: true do - service = SlowService.new + request_received = false + request_received_mu = Mutex.new + request_received_cv = ConditionVariable.new + notify_request_received = proc do |req| + request_received_mu.synchronize do + fail 'req is nil' if req.nil? + expect(req.is_a?(EchoMsg)).to be true + fail 'test bug - already set' if request_received + request_received = true + request_received_cv.signal + end + end + + rpc_cancelled = false + rpc_cancelled_mu = Mutex.new + rpc_cancelled_cv = ConditionVariable.new + wait_until_rpc_cancelled = proc do + rpc_cancelled_mu.synchronize do + loop do + break if rpc_cancelled + rpc_cancelled_cv.wait(rpc_cancelled_mu) + end + end + end + + service = SynchronizedCancellationService.new(notify_request_received, + wait_until_rpc_cancelled) @srv.handle(service) - t = Thread.new { @srv.run } + srv_thd = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts) - op = stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }, return_op: true) - Thread.new do # cancel the call - sleep 0.1 - op.cancel + stub = SynchronizedCancellationStub.new(@host, + :this_channel_is_insecure, + **client_opts) + op = stub.an_rpc(req, return_op: true) + + client_thd = Thread.new do + expect { op.execute }.to raise_error GRPC::Cancelled end - expect { op.execute }.to raise_error GRPC::Cancelled + + request_received_mu.synchronize do + loop do + break if request_received + request_received_cv.wait(request_received_mu) + end + end + + op.cancel + + rpc_cancelled_mu.synchronize do + fail 'test bug - already set' if rpc_cancelled + rpc_cancelled = true + rpc_cancelled_cv.signal + end + + client_thd.join @srv.stop - t.join + srv_thd.join end it 'should handle multiple parallel requests', server: true do