Merge pull request #14946 from apolcyn/backport_14922

Backport #14922 to 1.11.x
pull/14957/head v1.11.0-pre1
apolcyn 7 years ago committed by GitHub
commit 9eb1303210
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      src/ruby/lib/grpc/generic/bidi_call.rb
  2. 133
      src/ruby/spec/generic/client_stub_spec.rb

@ -124,12 +124,18 @@ module GRPC
def read_using_run_batch
ops = { RECV_MESSAGE => nil }
ops[RECV_INITIAL_METADATA] = nil unless @metadata_received
batch_result = @call.run_batch(ops)
unless @metadata_received
@call.metadata = batch_result.metadata
@metadata_received = true
begin
batch_result = @call.run_batch(ops)
unless @metadata_received
@call.metadata = batch_result.metadata
@metadata_received = true
end
batch_result
rescue GRPC::Core::CallError => e
GRPC.logger.warn('bidi call: read_using_run_batch failed')
GRPC.logger.warn(e)
nil
end
batch_result
end
# set_output_stream_done is relevant on client-side
@ -155,7 +161,12 @@ module GRPC
GRPC.logger.debug("bidi-write-loop: #{count} writes done")
if is_client
GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
@call.run_batch(SEND_CLOSE_FROM_CLIENT => nil)
begin
@call.run_batch(SEND_CLOSE_FROM_CLIENT => nil)
rescue GRPC::Core::CallError => e
GRPC.logger.warn('bidi-write-loop: send close failed')
GRPC.logger.warn(e)
end
GRPC.logger.debug('bidi-write-loop: done')
end
GRPC.logger.debug('bidi-write-loop: finished')
@ -187,7 +198,7 @@ module GRPC
batch_result = read_using_run_batch
# handle the next message
if batch_result.message.nil?
if batch_result.nil? || batch_result.message.nil?
GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
if is_client

@ -750,6 +750,90 @@ describe 'ClientStub' do # rubocop:disable Metrics/BlockLength
expected_error_message)
end
end
# Prompted by grpc/github #14853
describe 'client-side error handling on bidi streams' do
class EnumeratorQueue
def initialize(queue)
@queue = queue
end
def each
loop do
msg = @queue.pop
break if msg.nil?
yield msg
end
end
end
def run_server_bidi_shutdown_after_one_read
@server.start
recvd_rpc = @server.request_call
recvd_call = recvd_rpc.call
server_call = GRPC::ActiveCall.new(
recvd_call, noop, noop, INFINITE_FUTURE,
metadata_received: true, started: false)
expect(server_call.remote_read).to eq('first message')
@server.shutdown_and_notify(from_relative_time(0))
@server.close
end
it 'receives a grpc status code when writes to a bidi stream fail' do
# This test tries to trigger the case when a 'SEND_MESSAGE' op
# and subseqeunt 'SEND_CLOSE_FROM_CLIENT' op of a bidi stream fails.
# In this case, iteration through the response stream should result
# in a grpc status code, and the writer thread should not raise an
# exception.
server_thread = Thread.new do
run_server_bidi_shutdown_after_one_read
end
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
request_queue = Queue.new
@sent_msgs = EnumeratorQueue.new(request_queue)
responses = get_responses(stub)
request_queue.push('first message')
# Now wait for the server to shut down.
server_thread.join
# Sanity check. This test is not interesting if
# Thread.abort_on_exception is not set.
expect(Thread.abort_on_exception).to be(true)
# An attempt to send a second message should fail now that the
# server is down.
request_queue.push('second message')
request_queue.push(nil)
expect { responses.next }.to raise_error(GRPC::BadStatus)
end
def run_server_bidi_shutdown_after_one_write
@server.start
recvd_rpc = @server.request_call
recvd_call = recvd_rpc.call
server_call = GRPC::ActiveCall.new(
recvd_call, noop, noop, INFINITE_FUTURE,
metadata_received: true, started: false)
server_call.send_initial_metadata
server_call.remote_send('message')
@server.shutdown_and_notify(from_relative_time(0))
@server.close
end
it 'receives a grpc status code when reading from a failed bidi call' do
server_thread = Thread.new do
run_server_bidi_shutdown_after_one_write
end
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
request_queue = Queue.new
@sent_msgs = EnumeratorQueue.new(request_queue)
responses = get_responses(stub)
expect(responses.next).to eq('message')
# Wait for the server to shut down
server_thread.join
expect { responses.next }.to raise_error(GRPC::BadStatus)
# Push a sentinel to allow the writer thread to finish
request_queue.push(nil)
end
end
end
describe 'without a call operation' do
@ -810,6 +894,55 @@ describe 'ClientStub' do # rubocop:disable Metrics/BlockLength
responses.each { |r| p r }
end
end
def run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback)
@server.start
recvd_rpc = @server.request_call
recvd_call = recvd_rpc.call
server_call = GRPC::ActiveCall.new(
recvd_call, noop, noop, INFINITE_FUTURE,
metadata_received: true, started: false)
server_call.send_initial_metadata
server_call.remote_send('server call received')
wait_for_shutdown_ok_callback.call
# since the client is cancelling the call,
# we should be able to shut down cleanly
@server.shutdown_and_notify(nil)
@server.close
end
it 'receives a grpc status code when reading from a cancelled bidi call' do
# This test tries to trigger a 'RECV_INITIAL_METADATA' and/or
# 'RECV_MESSAGE' op failure.
# An attempt to read a message might fail; in that case, iteration
# through the response stream should still result in a grpc status.
server_can_shutdown = false
server_can_shutdown_mu = Mutex.new
server_can_shutdown_cv = ConditionVariable.new
wait_for_shutdown_ok_callback = proc do
server_can_shutdown_mu.synchronize do
server_can_shutdown_cv.wait(server_can_shutdown_mu) until server_can_shutdown
end
end
server_thread = Thread.new do
run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback)
end
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
request_queue = Queue.new
@sent_msgs = EnumeratorQueue.new(request_queue)
responses = get_responses(stub)
expect(responses.next).to eq('server call received')
@op.cancel
expect { responses.next }.to raise_error(GRPC::Cancelled)
# Now let the server proceed to shut down.
server_can_shutdown_mu.synchronize do
server_can_shutdown = true
server_can_shutdown_cv.broadcast
end
server_thread.join
# Push a sentinel to allow the writer thread to finish
request_queue.push(nil)
end
end
end

Loading…
Cancel
Save