From ae937d34d6d517ffb29b8a22f947cdc3461aa1ca Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Tue, 3 Apr 2018 16:08:01 -0700 Subject: [PATCH] Never throw CallErrors for failed bidi reads or writes --- src/ruby/lib/grpc/generic/bidi_call.rb | 25 ++-- src/ruby/spec/generic/client_stub_spec.rb | 133 ++++++++++++++++++++++ 2 files changed, 151 insertions(+), 7 deletions(-) diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 086455db0b7..ffb232b8271 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -124,12 +124,18 @@ module GRPC def read_using_run_batch ops = { RECV_MESSAGE => nil } ops[RECV_INITIAL_METADATA] = nil unless @metadata_received - batch_result = @call.run_batch(ops) - unless @metadata_received - @call.metadata = batch_result.metadata - @metadata_received = true + begin + batch_result = @call.run_batch(ops) + unless @metadata_received + @call.metadata = batch_result.metadata + @metadata_received = true + end + batch_result + rescue GRPC::Core::CallError => e + GRPC.logger.warn('bidi call: read_using_run_batch failed') + GRPC.logger.warn(e) + nil end - batch_result end # set_output_stream_done is relevant on client-side @@ -155,7 +161,12 @@ module GRPC GRPC.logger.debug("bidi-write-loop: #{count} writes done") if is_client GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") - @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil) + begin + @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil) + rescue GRPC::Core::CallError => e + GRPC.logger.warn('bidi-write-loop: send close failed') + GRPC.logger.warn(e) + end GRPC.logger.debug('bidi-write-loop: done') end GRPC.logger.debug('bidi-write-loop: finished') @@ -187,7 +198,7 @@ module GRPC batch_result = read_using_run_batch # handle the next message - if batch_result.message.nil? + if batch_result.nil? || batch_result.message.nil? GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}") if is_client diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index d858c4e3fe6..da50f8d0c9c 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -750,6 +750,90 @@ describe 'ClientStub' do # rubocop:disable Metrics/BlockLength expected_error_message) end end + + # Prompted by grpc/github #14853 + describe 'client-side error handling on bidi streams' do + class EnumeratorQueue + def initialize(queue) + @queue = queue + end + + def each + loop do + msg = @queue.pop + break if msg.nil? + yield msg + end + end + end + + def run_server_bidi_shutdown_after_one_read + @server.start + recvd_rpc = @server.request_call + recvd_call = recvd_rpc.call + server_call = GRPC::ActiveCall.new( + recvd_call, noop, noop, INFINITE_FUTURE, + metadata_received: true, started: false) + expect(server_call.remote_read).to eq('first message') + @server.shutdown_and_notify(from_relative_time(0)) + @server.close + end + + it 'receives a grpc status code when writes to a bidi stream fail' do + # This test tries to trigger the case when a 'SEND_MESSAGE' op + # and subseqeunt 'SEND_CLOSE_FROM_CLIENT' op of a bidi stream fails. + # In this case, iteration through the response stream should result + # in a grpc status code, and the writer thread should not raise an + # exception. + server_thread = Thread.new do + run_server_bidi_shutdown_after_one_read + end + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + request_queue = Queue.new + @sent_msgs = EnumeratorQueue.new(request_queue) + responses = get_responses(stub) + request_queue.push('first message') + # Now wait for the server to shut down. + server_thread.join + # Sanity check. This test is not interesting if + # Thread.abort_on_exception is not set. + expect(Thread.abort_on_exception).to be(true) + # An attempt to send a second message should fail now that the + # server is down. + request_queue.push('second message') + request_queue.push(nil) + expect { responses.next }.to raise_error(GRPC::BadStatus) + end + + def run_server_bidi_shutdown_after_one_write + @server.start + recvd_rpc = @server.request_call + recvd_call = recvd_rpc.call + server_call = GRPC::ActiveCall.new( + recvd_call, noop, noop, INFINITE_FUTURE, + metadata_received: true, started: false) + server_call.send_initial_metadata + server_call.remote_send('message') + @server.shutdown_and_notify(from_relative_time(0)) + @server.close + end + + it 'receives a grpc status code when reading from a failed bidi call' do + server_thread = Thread.new do + run_server_bidi_shutdown_after_one_write + end + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + request_queue = Queue.new + @sent_msgs = EnumeratorQueue.new(request_queue) + responses = get_responses(stub) + expect(responses.next).to eq('message') + # Wait for the server to shut down + server_thread.join + expect { responses.next }.to raise_error(GRPC::BadStatus) + # Push a sentinel to allow the writer thread to finish + request_queue.push(nil) + end + end end describe 'without a call operation' do @@ -810,6 +894,55 @@ describe 'ClientStub' do # rubocop:disable Metrics/BlockLength responses.each { |r| p r } end end + + def run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback) + @server.start + recvd_rpc = @server.request_call + recvd_call = recvd_rpc.call + server_call = GRPC::ActiveCall.new( + recvd_call, noop, noop, INFINITE_FUTURE, + metadata_received: true, started: false) + server_call.send_initial_metadata + server_call.remote_send('server call received') + wait_for_shutdown_ok_callback.call + # since the client is cancelling the call, + # we should be able to shut down cleanly + @server.shutdown_and_notify(nil) + @server.close + end + + it 'receives a grpc status code when reading from a cancelled bidi call' do + # This test tries to trigger a 'RECV_INITIAL_METADATA' and/or + # 'RECV_MESSAGE' op failure. + # An attempt to read a message might fail; in that case, iteration + # through the response stream should still result in a grpc status. + server_can_shutdown = false + server_can_shutdown_mu = Mutex.new + server_can_shutdown_cv = ConditionVariable.new + wait_for_shutdown_ok_callback = proc do + server_can_shutdown_mu.synchronize do + server_can_shutdown_cv.wait(server_can_shutdown_mu) until server_can_shutdown + end + end + server_thread = Thread.new do + run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback) + end + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + request_queue = Queue.new + @sent_msgs = EnumeratorQueue.new(request_queue) + responses = get_responses(stub) + expect(responses.next).to eq('server call received') + @op.cancel + expect { responses.next }.to raise_error(GRPC::Cancelled) + # Now let the server proceed to shut down. + server_can_shutdown_mu.synchronize do + server_can_shutdown = true + server_can_shutdown_cv.broadcast + end + server_thread.join + # Push a sentinel to allow the writer thread to finish + request_queue.push(nil) + end end end