Corrects handling of status during bidi call orchestration.

- Ensures that invalid status are raised as exceptions
- Ensures that exceptions during the write thread are bubbled up to the starting
  thread

Also - improves the debug messages during the BiDi calls
pull/1293/head
Tim Emiola 10 years ago
parent 1b39916bba
commit a10a8432f3
  1. 32
      src/ruby/lib/grpc/generic/bidi_call.rb

@ -78,13 +78,11 @@ module GRPC
# @param requests the Enumerable of requests to send # @param requests the Enumerable of requests to send
# @return an Enumerator of requests to yield # @return an Enumerator of requests to yield
def run_on_client(requests, &blk) def run_on_client(requests, &blk)
enq_th = start_write_loop(requests) @enq_th = start_write_loop(requests)
loop_th = start_read_loop @loop_th = start_read_loop
replies = each_queued_msg replies = each_queued_msg
return replies if blk.nil? return replies if blk.nil?
replies.each { |r| blk.call(r) } replies.each { |r| blk.call(r) }
enq_th.join
loop_th.join
end end
# Begins orchestration of the Bidi stream for a server generating replies. # Begins orchestration of the Bidi stream for a server generating replies.
@ -100,10 +98,8 @@ module GRPC
# @param gen_each_reply [Proc] generates the BiDi stream replies. # @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply) def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg) replys = gen_each_reply.call(each_queued_msg)
enq_th = start_write_loop(replys, is_client: false) @enq_th = start_write_loop(replys, is_client: false)
loop_th = start_read_loop @loop_th = start_read_loop
loop_th.join
enq_th.join
end end
private private
@ -122,10 +118,13 @@ module GRPC
logger.debug("each_queued_msg: msg##{count}") logger.debug("each_queued_msg: msg##{count}")
count += 1 count += 1
req = @readq.pop req = @readq.pop
logger.debug("each_queued_msg: req = #{req}")
throw req if req.is_a? StandardError throw req if req.is_a? StandardError
break if req.equal?(END_OF_READS) break if req.equal?(END_OF_READS)
yield req yield req
end end
@loop_th.join
@enq_th.join
end end
# during bidi-streaming, read the requests to send from a separate thread # during bidi-streaming, read the requests to send from a separate thread
@ -136,20 +135,23 @@ module GRPC
begin begin
count = 0 count = 0
requests.each do |req| requests.each do |req|
logger.debug("bidi-write_loop: #{count}")
count += 1 count += 1
payload = @marshal.call(req) payload = @marshal.call(req)
@call.run_batch(@cq, write_tag, INFINITE_FUTURE, @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_MESSAGE => payload) SEND_MESSAGE => payload)
end end
if is_client if is_client
logger.debug("bidi-client: sent #{count} reqs, waiting to finish") logger.debug("bidi-write-loop: sent #{count}, waiting to finish")
@call.run_batch(@cq, write_tag, INFINITE_FUTURE, batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_CLOSE_FROM_CLIENT => nil, SEND_CLOSE_FROM_CLIENT => nil,
RECV_STATUS_ON_CLIENT => nil) RECV_STATUS_ON_CLIENT => nil)
batch_result.check_status
end end
rescue StandardError => e rescue StandardError => e
logger.warn('bidi: write_loop failed') logger.warn('bidi-write_loop: failed')
logger.warn(e) logger.warn(e)
raise e
end end
end end
end end
@ -163,7 +165,7 @@ module GRPC
# queue the initial read before beginning the loop # queue the initial read before beginning the loop
loop do loop do
logger.debug("waiting for read #{count}") logger.debug("bidi-read_loop: #{count}")
count += 1 count += 1
# TODO: ensure metadata is read if available, currently it's not # TODO: ensure metadata is read if available, currently it's not
batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
@ -171,7 +173,7 @@ module GRPC
# handle the next message # handle the next message
if batch_result.message.nil? if batch_result.message.nil?
@readq.push(END_OF_READS) @readq.push(END_OF_READS)
logger.debug('done reading!') logger.debug('bidi-read-loop: done reading!')
break break
end end

Loading…
Cancel
Save