From c6627caf3a05f76e8449eff5135955986b689c2b Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Thu, 13 Apr 2017 10:47:16 -0700 Subject: [PATCH] cancel calls with an error message when bidi write loop fails with user error --- src/ruby/ext/grpc/rb_call.c | 34 ++++++++ src/ruby/lib/grpc/generic/bidi_call.rb | 9 +- src/ruby/spec/call_spec.rb | 33 +++++++ src/ruby/spec/client_server_spec.rb | 56 ++++++++++++ src/ruby/spec/generic/client_stub_spec.rb | 101 ++++++++++++++++++++-- src/ruby/spec/generic/rpc_server_spec.rb | 35 ++++++++ 6 files changed, 261 insertions(+), 7 deletions(-) diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index b99954883f3..74f189e1e0d 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -179,6 +179,38 @@ static VALUE grpc_rb_call_cancel(VALUE self) { return Qnil; } +/* TODO: expose this as part of the surface API if needed. + * This is meant for internal usage by the "write thread" of grpc-ruby + * client-side bidi calls. It provides a way for the background write-thread + * to propogate failures to the main read-thread and give the user an error + * message. */ +static VALUE grpc_rb_call_cancel_with_status(VALUE self, VALUE status_code, + VALUE details) { + grpc_rb_call *call = NULL; + grpc_call_error err; + if (RTYPEDDATA_DATA(self) == NULL) { + // This call has been closed + return Qnil; + } + + if (TYPE(details) != T_STRING || TYPE(status_code) != T_FIXNUM) { + rb_raise(rb_eTypeError, + "Bad parameter type error for cancel with status. Want Fixnum, " + "String."); + return Qnil; + } + + TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call); + err = grpc_call_cancel_with_status(call->wrapped, NUM2LONG(status_code), + StringValueCStr(details), NULL); + if (err != GRPC_CALL_OK) { + rb_raise(grpc_rb_eCallError, "cancel with status failed: %s (code=%d)", + grpc_call_error_detail_of(err), err); + } + + return Qnil; +} + /* Releases the c-level resources associated with a call Once a call has been closed, no further requests can be processed. @@ -949,6 +981,8 @@ void Init_grpc_call() { /* Add ruby analogues of the Call methods. */ rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 1); rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0); + rb_define_method(grpc_rb_cCall, "cancel_with_status", + grpc_rb_call_cancel_with_status, 2); rb_define_method(grpc_rb_cCall, "close", grpc_rb_call_close, 0); rb_define_method(grpc_rb_cCall, "peer", grpc_rb_call_get_peer, 0); rb_define_method(grpc_rb_cCall, "peer_cert", grpc_rb_call_get_peer_cert, 0); diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 9e125cd986b..c2239d0178e 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -153,7 +153,12 @@ module GRPC rescue StandardError => e GRPC.logger.warn('bidi-write-loop: failed') GRPC.logger.warn(e) - raise e + if is_client + @call.cancel_with_status(GRPC::Core::StatusCodes::UNKNOWN, + "GRPC bidi call error: #{e.inspect}") + else + raise e + end ensure set_output_stream_done.call if is_client end @@ -180,8 +185,8 @@ module GRPC batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) @call.status = batch_result.status @call.trailing_metadata = @call.status.metadata if @call.status - batch_result.check_status GRPC.logger.debug("bidi-read-loop: done status #{@call.status}") + batch_result.check_status end GRPC.logger.debug('bidi-read-loop: done reading!') diff --git a/src/ruby/spec/call_spec.rb b/src/ruby/spec/call_spec.rb index 473ff4a8bd1..1cc05002429 100644 --- a/src/ruby/spec/call_spec.rb +++ b/src/ruby/spec/call_spec.rb @@ -137,6 +137,39 @@ describe GRPC::Core::Call do end end + describe '#cancel' do + it 'completes ok' do + call = make_test_call + expect { call.cancel }.not_to raise_error + end + + it 'completes ok when the call is closed' do + call = make_test_call + call.close + expect { call.cancel }.not_to raise_error + end + end + + describe '#cancel_with_status' do + it 'completes ok' do + call = make_test_call + expect do + call.cancel_with_status(0, 'test status') + end.not_to raise_error + expect do + call.cancel_with_status(0, nil) + end.to raise_error(TypeError) + end + + it 'completes ok when the call is closed' do + call = make_test_call + call.close + expect do + call.cancel_with_status(0, 'test status') + end.not_to raise_error + end + end + def make_test_call @ch.create_call(nil, nil, 'dummy_method', nil, deadline) end diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb index b48b4179ced..1a9b47e2c3b 100644 --- a/src/ruby/spec/client_server_spec.rb +++ b/src/ruby/spec/client_server_spec.rb @@ -226,6 +226,62 @@ shared_examples 'basic GRPC message delivery is OK' do svr_batch = server_call.run_batch(server_ops) expect(svr_batch.send_close).to be true end + + def client_cancel_test(cancel_proc, expected_code, + expected_details) + call = new_client_call + server_call = nil + + server_thread = Thread.new do + server_call = server_allows_client_to_proceed + end + + client_ops = { + CallOps::SEND_INITIAL_METADATA => {}, + CallOps::RECV_INITIAL_METADATA => nil + } + batch_result = call.run_batch(client_ops) + expect(batch_result.send_metadata).to be true + expect(batch_result.metadata).to eq({}) + + cancel_proc.call(call) + + server_thread.join + server_ops = { + CallOps::RECV_CLOSE_ON_SERVER => nil + } + svr_batch = server_call.run_batch(server_ops) + expect(svr_batch.send_close).to be true + + client_ops = { + CallOps::RECV_STATUS_ON_CLIENT => {} + } + batch_result = call.run_batch(client_ops) + + expect(batch_result.status.code).to be expected_code + expect(batch_result.status.details).to eq expected_details + end + + it 'clients can cancel a call on the server' do + expected_code = StatusCodes::CANCELLED + expected_details = 'Cancelled' + cancel_proc = proc { |call| call.cancel } + client_cancel_test(cancel_proc, expected_code, expected_details) + end + + it 'cancel_with_status unknown status' do + code = StatusCodes::UNKNOWN + details = 'test unknown reason' + cancel_proc = proc { |call| call.cancel_with_status(code, details) } + client_cancel_test(cancel_proc, code, details) + end + + it 'cancel_with_status unknown status' do + code = StatusCodes::FAILED_PRECONDITION + details = 'test failed precondition reason' + cancel_proc = proc { |call| call.cancel_with_status(code, details) } + client_cancel_test(cancel_proc, code, details) + end end shared_examples 'GRPC metadata delivery works OK' do diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index e1e7a535fb6..9539e56c0ff 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -472,7 +472,7 @@ describe 'ClientStub' do host = "localhost:#{server_port}" stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) expect do - get_responses(stub) + get_responses(stub).collect { |r| r } end.to raise_error(ArgumentError, /Header values must be of type string or array/) end @@ -641,11 +641,101 @@ describe 'ClientStub' do expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end + + # Prompted by grpc/github #10526 + describe 'surfacing of errors when sending requests' do + def run_server_bidi_send_one_then_read_indefinitely + @server.start + recvd_rpc = @server.request_call + recvd_call = recvd_rpc.call + server_call = GRPC::ActiveCall.new( + recvd_call, noop, noop, INFINITE_FUTURE, + metadata_received: true, started: false) + server_call.send_initial_metadata + server_call.remote_send('server response') + loop do + m = server_call.remote_read + break if m.nil? + end + # can't fail since initial metadata already sent + server_call.send_status(@pass, 'OK', true) + end + + def verify_error_from_write_thread(stub, requests_to_push, + request_queue, expected_description) + # TODO: an improvement might be to raise the original exception from + # bidi call write loops instead of only cancelling the call + failing_marshal_proc = proc do |req| + fail req if req.is_a?(StandardError) + req + end + begin + e = get_responses(stub, marshal_proc: failing_marshal_proc) + first_response = e.next + expect(first_response).to eq('server response') + requests_to_push.each { |req| request_queue.push(req) } + e.collect { |r| r } + rescue GRPC::Unknown => e + exception = e + end + expect(exception.message.include?(expected_description)).to be(true) + end + + # Provides an Enumerable view of a Queue + class BidiErrorTestingEnumerateForeverQueue + def initialize(queue) + @queue = queue + end + + def each + loop do + msg = @queue.pop + yield msg + end + end + end + + def run_error_in_client_request_stream_test(requests_to_push, + expected_error_message) + # start a server that waits on a read indefinitely - it should + # see a cancellation and be able to break out + th = Thread.new { run_server_bidi_send_one_then_read_indefinitely } + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + + request_queue = Queue.new + @sent_msgs = BidiErrorTestingEnumerateForeverQueue.new(request_queue) + + verify_error_from_write_thread(stub, + requests_to_push, + request_queue, + expected_error_message) + # the write loop errror should cancel the call and end the + # server's request stream + th.join + end + + it 'non-GRPC errors from the write loop surface when raised ' \ + 'at the start of a request stream' do + expected_error_message = 'expect error on first request' + requests_to_push = [StandardError.new(expected_error_message)] + run_error_in_client_request_stream_test(requests_to_push, + expected_error_message) + end + + it 'non-GRPC errors from the write loop surface when raised ' \ + 'during the middle of a request stream' do + expected_error_message = 'expect error on last request' + requests_to_push = %w( one two ) + requests_to_push << StandardError.new(expected_error_message) + run_error_in_client_request_stream_test(requests_to_push, + expected_error_message) + end + end end describe 'without a call operation' do - def get_responses(stub, deadline: nil) - e = stub.bidi_streamer(@method, @sent_msgs, noop, noop, + def get_responses(stub, deadline: nil, marshal_proc: noop) + e = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop, metadata: @metadata, deadline: deadline) expect(e).to be_a(Enumerator) e @@ -658,8 +748,9 @@ describe 'ClientStub' do after(:each) do @op.wait # make sure wait doesn't hang end - def get_responses(stub, run_start_call_first: false, deadline: nil) - @op = stub.bidi_streamer(@method, @sent_msgs, noop, noop, + def get_responses(stub, run_start_call_first: false, deadline: nil, + marshal_proc: noop) + @op = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop, return_op: true, metadata: @metadata, deadline: deadline) expect(@op).to be_a(GRPC::ActiveCall::Operation) diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index e0646f45997..27642d6975d 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -152,6 +152,18 @@ end CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class +# A service with a bidi streaming method. +class BidiService + include GRPC::GenericService + rpc :server_sends_bad_input, stream(EchoMsg), stream(EchoMsg) + + def server_sends_bad_input(_, _) + 'bad response. (not an enumerable, client sees an error)' + end +end + +BidiStub = BidiService.rpc_stub_class + describe GRPC::RpcServer do RpcServer = GRPC::RpcServer StatusCodes = GRPC::Core::StatusCodes @@ -450,6 +462,29 @@ describe GRPC::RpcServer do t.join expect(one_failed_as_unavailable).to be(true) end + + it 'should send a status UNKNOWN with a relevant message when the' \ + 'servers response stream is not an enumerable' do + @srv.handle(BidiService) + t = Thread.new { @srv.run } + @srv.wait_till_running + stub = BidiStub.new(@host, :this_channel_is_insecure, **client_opts) + responses = stub.server_sends_bad_input([]) + exception = nil + begin + responses.each { |r| r } + rescue GRPC::Unknown => e + exception = e + end + # Erroneous responses sent from the server handler should cause an + # exception on the client with relevant info. + expected_details = 'NoMethodError: undefined method `each\' for '\ + '"bad response. (not an enumerable, client sees an error)"' + + expect(exception.inspect.include?(expected_details)).to be true + @srv.stop + t.join + end end context 'with connect metadata' do