Adds the ruby timeout interop test.

Corrects some other issues

- fixes status return when calls fail by always returning the status

- resolves bidi_call client's failure to return an exception on bad
  status by swapping the wait for status to the read thread

  * this also improves the cancel_after_first_response test

Also

- adds a unit test that verifies that a bidi call will time out.
pull/2937/head
Tim Emiola 9 years ago
parent 49b7650eaf
commit cdb7cccceb
  1. 13
      src/ruby/bin/interop/interop_client.rb
  2. 8
      src/ruby/ext/grpc/rb_call.c
  3. 25
      src/ruby/lib/grpc/generic/bidi_call.rb
  4. 20
      src/ruby/spec/generic/client_stub_spec.rb

@ -265,6 +265,17 @@ class NamedTests
p 'OK: ping_pong' p 'OK: ping_pong'
end end
def timeout_on_sleeping_server
msg_sizes = [[27_182, 31_415]]
ppp = PingPongPlayer.new(msg_sizes)
resps = @stub.full_duplex_call(ppp.each_item, timeout: 0.001)
resps.each { |r| ppp.queue.push(r) }
fail 'Should have raised GRPC::BadStatus(DEADLINE_EXCEEDED)'
rescue GRPC::BadStatus => e
assert_equal(e.code, GRPC::Core::StatusCodes::DEADLINE_EXCEEDED)
p "OK: #{__callee__}"
end
def cancel_after_begin def cancel_after_begin
msg_sizes = [27_182, 8, 1828, 45_904] msg_sizes = [27_182, 8, 1828, 45_904]
reqs = msg_sizes.map do |x| reqs = msg_sizes.map do |x|
@ -283,7 +294,7 @@ class NamedTests
ppp = PingPongPlayer.new(msg_sizes) ppp = PingPongPlayer.new(msg_sizes)
op = @stub.full_duplex_call(ppp.each_item, return_op: true) op = @stub.full_duplex_call(ppp.each_item, return_op: true)
ppp.canceller_op = op # causes ppp to cancel after the 1st message ppp.canceller_op = op # causes ppp to cancel after the 1st message
op.execute.each { |r| ppp.queue.push(r) } assert_raises(GRPC::Cancelled) { op.execute.each { |r| ppp.queue.push(r) } }
op.wait op.wait
assert(op.cancelled, 'call operation was not CANCELLED') assert(op.cancelled, 'call operation was not CANCELLED')
p 'OK: cancel_after_first_response' p 'OK: cancel_after_first_response'

@ -629,13 +629,9 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out"); rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out");
return Qnil; return Qnil;
} }
if (!ev.success) {
grpc_run_batch_stack_cleanup(&st);
rb_raise(grpc_rb_eCallError, "start_batch completion failed");
return Qnil;
}
/* Build and return the BatchResult struct result */ /* Build and return the BatchResult struct result,
if there is an error, it's reflected in the status */
result = grpc_run_batch_stack_build_result(&st); result = grpc_run_batch_stack_build_result(&st);
grpc_run_batch_stack_cleanup(&st); grpc_run_batch_stack_cleanup(&st);
return result; return result;

@ -97,7 +97,7 @@ module GRPC
# @param gen_each_reply [Proc] generates the BiDi stream replies. # @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply) def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg) replys = gen_each_reply.call(each_queued_msg)
@loop_th = start_read_loop @loop_th = start_read_loop(is_client: false)
write_loop(replys, is_client: false) write_loop(replys, is_client: false)
end end
@ -125,7 +125,7 @@ module GRPC
count += 1 count += 1
req = @readq.pop req = @readq.pop
GRPC.logger.debug("each_queued_msg: req = #{req}") GRPC.logger.debug("each_queued_msg: req = #{req}")
throw req if req.is_a? StandardError fail req if req.is_a? StandardError
break if req.equal?(END_OF_READS) break if req.equal?(END_OF_READS)
yield req yield req
end end
@ -145,12 +145,9 @@ module GRPC
GRPC.logger.debug("bidi-write-loop: #{count} writes done") GRPC.logger.debug("bidi-write-loop: #{count} writes done")
if is_client if is_client
GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_CLOSE_FROM_CLIENT => nil, SEND_CLOSE_FROM_CLIENT => nil)
RECV_STATUS_ON_CLIENT => nil) GRPC.logger.debug('bidi-write-loop: done')
@call.status = batch_result.status
batch_result.check_status
GRPC.logger.debug("bidi-write-loop: done status #{@call.status}")
notify_done notify_done
end end
GRPC.logger.debug('bidi-write-loop: finished') GRPC.logger.debug('bidi-write-loop: finished')
@ -162,7 +159,7 @@ module GRPC
end end
# starts the read loop # starts the read loop
def start_read_loop def start_read_loop(is_client: true)
Thread.new do Thread.new do
GRPC.logger.debug('bidi-read-loop: starting') GRPC.logger.debug('bidi-read-loop: starting')
begin begin
@ -175,9 +172,19 @@ module GRPC
# TODO: ensure metadata is read if available, currently it's not # TODO: ensure metadata is read if available, currently it's not
batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
RECV_MESSAGE => nil) RECV_MESSAGE => nil)
# handle the next message # handle the next message
if batch_result.message.nil? if batch_result.message.nil?
GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}") GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
if is_client
batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
RECV_STATUS_ON_CLIENT => nil)
@call.status = batch_result.status
batch_result.check_status
GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
end
@readq.push(END_OF_READS) @readq.push(END_OF_READS)
GRPC.logger.debug('bidi-read-loop: done reading!') GRPC.logger.debug('bidi-read-loop: done reading!')
break break

@ -408,6 +408,26 @@ describe 'ClientStub' do
it_behaves_like 'bidi streaming' it_behaves_like 'bidi streaming'
end end
describe 'without enough time to run' do
before(:each) do
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
server_port = create_test_server
@host = "localhost:#{server_port}"
end
it 'should fail with DeadlineExceeded', bidi: true do
@server.start
stub = GRPC::ClientStub.new(@host, @cq)
blk = proc do
e = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
timeout: 0.001)
e.collect { |r| r }
end
expect(&blk).to raise_error GRPC::BadStatus, /Deadline Exceeded/
end
end
end end
def run_server_streamer(expected_input, replys, status, **kw) def run_server_streamer(expected_input, replys, status, **kw)

Loading…
Cancel
Save