Merge pull request #10636 from apolcyn/alleviate_10526

cancel calls on ruby client bidi write loop exceptions and surface the errror
reviewable/pr12024/r2^2
apolcyn 7 years ago committed by GitHub
commit 240f35d6fb
  1. 34
      src/ruby/ext/grpc/rb_call.c
  2. 9
      src/ruby/lib/grpc/generic/bidi_call.rb
  3. 33
      src/ruby/spec/call_spec.rb
  4. 56
      src/ruby/spec/client_server_spec.rb
  5. 101
      src/ruby/spec/generic/client_stub_spec.rb
  6. 35
      src/ruby/spec/generic/rpc_server_spec.rb

@ -179,6 +179,38 @@ static VALUE grpc_rb_call_cancel(VALUE self) {
return Qnil;
}
/* TODO: expose this as part of the surface API if needed.
* This is meant for internal usage by the "write thread" of grpc-ruby
* client-side bidi calls. It provides a way for the background write-thread
* to propogate failures to the main read-thread and give the user an error
* message. */
static VALUE grpc_rb_call_cancel_with_status(VALUE self, VALUE status_code,
VALUE details) {
grpc_rb_call *call = NULL;
grpc_call_error err;
if (RTYPEDDATA_DATA(self) == NULL) {
// This call has been closed
return Qnil;
}
if (TYPE(details) != T_STRING || TYPE(status_code) != T_FIXNUM) {
rb_raise(rb_eTypeError,
"Bad parameter type error for cancel with status. Want Fixnum, "
"String.");
return Qnil;
}
TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
err = grpc_call_cancel_with_status(call->wrapped, NUM2LONG(status_code),
StringValueCStr(details), NULL);
if (err != GRPC_CALL_OK) {
rb_raise(grpc_rb_eCallError, "cancel with status failed: %s (code=%d)",
grpc_call_error_detail_of(err), err);
}
return Qnil;
}
/* Releases the c-level resources associated with a call
Once a call has been closed, no further requests can be
processed.
@ -949,6 +981,8 @@ void Init_grpc_call() {
/* Add ruby analogues of the Call methods. */
rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 1);
rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0);
rb_define_method(grpc_rb_cCall, "cancel_with_status",
grpc_rb_call_cancel_with_status, 2);
rb_define_method(grpc_rb_cCall, "close", grpc_rb_call_close, 0);
rb_define_method(grpc_rb_cCall, "peer", grpc_rb_call_get_peer, 0);
rb_define_method(grpc_rb_cCall, "peer_cert", grpc_rb_call_get_peer_cert, 0);

@ -153,7 +153,12 @@ module GRPC
rescue StandardError => e
GRPC.logger.warn('bidi-write-loop: failed')
GRPC.logger.warn(e)
raise e
if is_client
@call.cancel_with_status(GRPC::Core::StatusCodes::UNKNOWN,
"GRPC bidi call error: #{e.inspect}")
else
raise e
end
ensure
set_output_stream_done.call if is_client
end
@ -180,8 +185,8 @@ module GRPC
batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
@call.status = batch_result.status
@call.trailing_metadata = @call.status.metadata if @call.status
batch_result.check_status
GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
batch_result.check_status
end
GRPC.logger.debug('bidi-read-loop: done reading!')

@ -137,6 +137,39 @@ describe GRPC::Core::Call do
end
end
describe '#cancel' do
it 'completes ok' do
call = make_test_call
expect { call.cancel }.not_to raise_error
end
it 'completes ok when the call is closed' do
call = make_test_call
call.close
expect { call.cancel }.not_to raise_error
end
end
describe '#cancel_with_status' do
it 'completes ok' do
call = make_test_call
expect do
call.cancel_with_status(0, 'test status')
end.not_to raise_error
expect do
call.cancel_with_status(0, nil)
end.to raise_error(TypeError)
end
it 'completes ok when the call is closed' do
call = make_test_call
call.close
expect do
call.cancel_with_status(0, 'test status')
end.not_to raise_error
end
end
def make_test_call
@ch.create_call(nil, nil, 'dummy_method', nil, deadline)
end

@ -226,6 +226,62 @@ shared_examples 'basic GRPC message delivery is OK' do
svr_batch = server_call.run_batch(server_ops)
expect(svr_batch.send_close).to be true
end
def client_cancel_test(cancel_proc, expected_code,
expected_details)
call = new_client_call
server_call = nil
server_thread = Thread.new do
server_call = server_allows_client_to_proceed
end
client_ops = {
CallOps::SEND_INITIAL_METADATA => {},
CallOps::RECV_INITIAL_METADATA => nil
}
batch_result = call.run_batch(client_ops)
expect(batch_result.send_metadata).to be true
expect(batch_result.metadata).to eq({})
cancel_proc.call(call)
server_thread.join
server_ops = {
CallOps::RECV_CLOSE_ON_SERVER => nil
}
svr_batch = server_call.run_batch(server_ops)
expect(svr_batch.send_close).to be true
client_ops = {
CallOps::RECV_STATUS_ON_CLIENT => {}
}
batch_result = call.run_batch(client_ops)
expect(batch_result.status.code).to be expected_code
expect(batch_result.status.details).to eq expected_details
end
it 'clients can cancel a call on the server' do
expected_code = StatusCodes::CANCELLED
expected_details = 'Cancelled'
cancel_proc = proc { |call| call.cancel }
client_cancel_test(cancel_proc, expected_code, expected_details)
end
it 'cancel_with_status unknown status' do
code = StatusCodes::UNKNOWN
details = 'test unknown reason'
cancel_proc = proc { |call| call.cancel_with_status(code, details) }
client_cancel_test(cancel_proc, code, details)
end
it 'cancel_with_status unknown status' do
code = StatusCodes::FAILED_PRECONDITION
details = 'test failed precondition reason'
cancel_proc = proc { |call| call.cancel_with_status(code, details) }
client_cancel_test(cancel_proc, code, details)
end
end
shared_examples 'GRPC metadata delivery works OK' do

@ -472,7 +472,7 @@ describe 'ClientStub' do
host = "localhost:#{server_port}"
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
expect do
get_responses(stub)
get_responses(stub).collect { |r| r }
end.to raise_error(ArgumentError,
/Header values must be of type string or array/)
end
@ -641,11 +641,101 @@ describe 'ClientStub' do
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
end
# Prompted by grpc/github #10526
describe 'surfacing of errors when sending requests' do
def run_server_bidi_send_one_then_read_indefinitely
@server.start
recvd_rpc = @server.request_call
recvd_call = recvd_rpc.call
server_call = GRPC::ActiveCall.new(
recvd_call, noop, noop, INFINITE_FUTURE,
metadata_received: true, started: false)
server_call.send_initial_metadata
server_call.remote_send('server response')
loop do
m = server_call.remote_read
break if m.nil?
end
# can't fail since initial metadata already sent
server_call.send_status(@pass, 'OK', true)
end
def verify_error_from_write_thread(stub, requests_to_push,
request_queue, expected_description)
# TODO: an improvement might be to raise the original exception from
# bidi call write loops instead of only cancelling the call
failing_marshal_proc = proc do |req|
fail req if req.is_a?(StandardError)
req
end
begin
e = get_responses(stub, marshal_proc: failing_marshal_proc)
first_response = e.next
expect(first_response).to eq('server response')
requests_to_push.each { |req| request_queue.push(req) }
e.collect { |r| r }
rescue GRPC::Unknown => e
exception = e
end
expect(exception.message.include?(expected_description)).to be(true)
end
# Provides an Enumerable view of a Queue
class BidiErrorTestingEnumerateForeverQueue
def initialize(queue)
@queue = queue
end
def each
loop do
msg = @queue.pop
yield msg
end
end
end
def run_error_in_client_request_stream_test(requests_to_push,
expected_error_message)
# start a server that waits on a read indefinitely - it should
# see a cancellation and be able to break out
th = Thread.new { run_server_bidi_send_one_then_read_indefinitely }
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
request_queue = Queue.new
@sent_msgs = BidiErrorTestingEnumerateForeverQueue.new(request_queue)
verify_error_from_write_thread(stub,
requests_to_push,
request_queue,
expected_error_message)
# the write loop errror should cancel the call and end the
# server's request stream
th.join
end
it 'non-GRPC errors from the write loop surface when raised ' \
'at the start of a request stream' do
expected_error_message = 'expect error on first request'
requests_to_push = [StandardError.new(expected_error_message)]
run_error_in_client_request_stream_test(requests_to_push,
expected_error_message)
end
it 'non-GRPC errors from the write loop surface when raised ' \
'during the middle of a request stream' do
expected_error_message = 'expect error on last request'
requests_to_push = %w( one two )
requests_to_push << StandardError.new(expected_error_message)
run_error_in_client_request_stream_test(requests_to_push,
expected_error_message)
end
end
end
describe 'without a call operation' do
def get_responses(stub, deadline: nil)
e = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
def get_responses(stub, deadline: nil, marshal_proc: noop)
e = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop,
metadata: @metadata, deadline: deadline)
expect(e).to be_a(Enumerator)
e
@ -658,8 +748,9 @@ describe 'ClientStub' do
after(:each) do
@op.wait # make sure wait doesn't hang
end
def get_responses(stub, run_start_call_first: false, deadline: nil)
@op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
def get_responses(stub, run_start_call_first: false, deadline: nil,
marshal_proc: noop)
@op = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop,
return_op: true,
metadata: @metadata, deadline: deadline)
expect(@op).to be_a(GRPC::ActiveCall::Operation)

@ -178,6 +178,18 @@ end
CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class
# A service with a bidi streaming method.
class BidiService
include GRPC::GenericService
rpc :server_sends_bad_input, stream(EchoMsg), stream(EchoMsg)
def server_sends_bad_input(_, _)
'bad response. (not an enumerable, client sees an error)'
end
end
BidiStub = BidiService.rpc_stub_class
describe GRPC::RpcServer do
RpcServer = GRPC::RpcServer
StatusCodes = GRPC::Core::StatusCodes
@ -520,6 +532,29 @@ describe GRPC::RpcServer do
t.join
expect(one_failed_as_unavailable).to be(true)
end
it 'should send a status UNKNOWN with a relevant message when the' \
'servers response stream is not an enumerable' do
@srv.handle(BidiService)
t = Thread.new { @srv.run }
@srv.wait_till_running
stub = BidiStub.new(@host, :this_channel_is_insecure, **client_opts)
responses = stub.server_sends_bad_input([])
exception = nil
begin
responses.each { |r| r }
rescue GRPC::Unknown => e
exception = e
end
# Erroneous responses sent from the server handler should cause an
# exception on the client with relevant info.
expected_details = 'NoMethodError: undefined method `each\' for '\
'"bad response. (not an enumerable, client sees an error)"'
expect(exception.inspect.include?(expected_details)).to be true
@srv.stop
t.join
end
end
context 'with connect metadata' do

Loading…
Cancel
Save