diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 74cc9db57e0..fc9bb851aa9 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -30,6 +30,12 @@ require 'forwardable' require 'grpc/grpc' +def assert_event_type(ev, want) + raise OutOfTime if ev.nil? + got = ev.type + raise 'Unexpected rpc event: got %s, want %s' % [got, want] unless got == want +end + module Google::RPC # The BiDiCall class orchestrates exection of a BiDi stream on a client or @@ -71,7 +77,6 @@ module Google::RPC @marshal = marshal @readq = Queue.new @unmarshal = unmarshal - @writeq = Queue.new end # Begins orchestration of the Bidi stream for a client sending requests. @@ -82,11 +87,13 @@ module Google::RPC # @param requests the Enumerable of requests to send # @return an Enumerator of requests to yield def run_on_client(requests, &blk) - enq_th = enqueue_for_sending(requests) - loop_th = start_read_write_loop + enq_th = start_write_loop(requests) + loop_th = start_read_loop replies = each_queued_msg return replies if blk.nil? replies.each { |r| blk.call(r) } + enq_th.join + loop_th.join end # Begins orchestration of the Bidi stream for a server generating replies. @@ -102,8 +109,8 @@ module Google::RPC # @param gen_each_reply [Proc] generates the BiDi stream replies. def run_on_server(gen_each_reply) replys = gen_each_reply.call(each_queued_msg) - enq_th = enqueue_for_sending(replys) - loop_th = start_read_write_loop(is_client:false) + enq_th = start_write_loop(replys, is_client:false) + loop_th = start_read_loop() loop_th.join enq_th.join end @@ -115,7 +122,7 @@ module Google::RPC # each_queued_msg yields each message on this instances readq # - # - messages are added to the readq by #read_write_loop + # - messages are added to the readq by #read_loop # - iteration ends when the instance itself is added def each_queued_msg return enum_for(:each_queued_msg) if !block_given? @@ -131,187 +138,67 @@ module Google::RPC end # during bidi-streaming, read the requests to send from a separate thread - # read so that read_write_loop does not block waiting for requests to read. - def enqueue_for_sending(requests) + # read so that read_loop does not block waiting for requests to read. + def start_write_loop(requests, is_client: true) Thread.new do # TODO(temiola) run on a thread pool + write_tag = Object.new begin - requests.each { |req| @writeq.push(req)} - @writeq.push(END_OF_WRITES) + count = 0 + requests.each do |req| + count += 1 + payload = @marshal.call(req) + @call.start_write(Core::ByteBuffer.new(payload), write_tag) + ev = @cq.pluck(write_tag, INFINITE_FUTURE) + assert_event_type(ev, WRITE_ACCEPTED) + end + if is_client + @call.writes_done(write_tag) + ev = @cq.pluck(write_tag, INFINITE_FUTURE) + assert_event_type(ev, FINISH_ACCEPTED) + logger.debug("bidi-client: sent #{count} reqs, waiting to finish") + ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) + assert_event_type(ev, FINISHED) + logger.debug('bidi-client: finished received') + end rescue StandardError => e - logger.warn('enqueue_for_sending failed') + logger.warn('bidi: write_loop failed') logger.warn(e) - @writeq.push(e) end end end - # starts the read_write loop - def start_read_write_loop(is_client: true) + # starts the read loop + def start_read_loop() t = Thread.new do begin - read_write_loop(is_client: is_client) - rescue StandardError => e - logger.warn('start_read_write_loop failed') - logger.warn(e) - @readq.push(e) # let each_queued_msg terminate with the error - end - end - t.priority = 3 # hint that read_write_loop threads should be favoured - t - end - - # drain_writeq removes any outstanding message on the writeq - def drain_writeq - while @writeq.size != 0 do - discarded = @writeq.pop - logger.warn("discarding: queued write: #{discarded}") - end - end - - # sends the next queued write - # - # The return value is an array with three values - # - the first indicates if a writes was started - # - the second that all writes are done - # - the third indicates that are still writes to perform but they are lates - # - # If value pulled from writeq is a StandardError, the producer hit an error - # that should be raised. - # - # @param is_client [Boolean] when true, writes_done will be called when the - # last entry is read from the writeq - # - # @return [in_write, done_writing] - def next_queued_write(is_client: true) - in_write, done_writing = false, false - - # send the next item on the queue if there is any - return [in_write, done_writing] if @writeq.size == 0 - - # TODO(temiola): provide a queue class that returns nil after a timeout - req = @writeq.pop - if req.equal?(END_OF_WRITES) - logger.debug('done writing after last req') - if is_client - logger.debug('sent writes_done after last req') - @call.writes_done(self) - end - done_writing = true - return [in_write, done_writing] - elsif req.is_a?(StandardError) # used to signal an error in the producer - logger.debug('done writing due to a failure') - if is_client - logger.debug('sent writes_done after a failure') - @call.writes_done(self) - end - logger.warn(req) - done_writing = true - return [in_write, done_writing] - end - - # send the payload - payload = @marshal.call(req) - @call.start_write(Core::ByteBuffer.new(payload), self) - logger.debug("rwloop: sent payload #{req.inspect}") - in_write = true - return [in_write, done_writing] - end - - # read_write_loop takes items off the write_queue and sends them, reads - # msgs and adds them to the read queue. - def read_write_loop(is_client: true) - done_reading, done_writing = false, false - finished, pre_finished = false, false - in_write, writes_late = false, false - count = 0 - - # queue the initial read before beginning the loop - @call.start_read(self) - - loop do - # whether or not there are outstanding writes is independent of the - # next event from the completion queue. The producer may queue the - # first msg at any time, e.g, after the loop is started running. So, - # it's essential for the loop to check for possible writes here, in - # order to correctly begin writing. - if !in_write and !done_writing - in_write, done_writing = next_queued_write(is_client: is_client) - end - logger.debug("rwloop is_client? #{is_client}") - logger.debug("rwloop count: #{count}") - count += 1 - - # Loop control: - # - # - Break when no further events need to read. On clients, this means - # waiting for a FINISHED, servers just need to wait for all reads and - # writes to be done. - # - # - Also, don't read an event unless there's one expected. This can - # happen, e.g, when all the reads are done, there are no writes - # available, but writing is not complete. - logger.debug("done_reading? #{done_reading}") - logger.debug("done_writing? #{done_writing}") - logger.debug("finish accepted? #{pre_finished}") - logger.debug("finished? #{finished}") - logger.debug("in write? #{in_write}") - if is_client - break if done_writing and done_reading and pre_finished and finished - logger.debug('waiting for another event') - if in_write or !done_reading or !pre_finished - logger.debug('waiting for another event') - ev = @cq.pluck(self, INFINITE_FUTURE) - elsif !finished - logger.debug('waiting for another event') - ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) - else - next # no events to wait on, but not done writing - end - else - break if done_writing and done_reading - if in_write or !done_reading - logger.debug('waiting for another event') - ev = @cq.pluck(self, INFINITE_FUTURE) - else - next # no events to wait on, but not done writing - end - end + read_tag = Object.new + count = 0 + + # queue the initial read before beginning the loop + loop do + logger.debug("waiting for read #{count}") + count += 1 + @call.start_read(read_tag) + ev = @cq.pluck(read_tag, INFINITE_FUTURE) + assert_event_type(ev, READ) + + # handle the next event. + if ev.result.nil? + @readq.push(END_OF_READS) + logger.debug('done reading!') + break + end - # handle the next event. - if ev.nil? - drain_writeq - raise OutOfTime - elsif ev.type == WRITE_ACCEPTED - logger.debug('write accepted!') - in_write = false - next - elsif ev.type == FINISH_ACCEPTED - logger.debug('finish accepted!') - pre_finished = true - next - elsif ev.type == READ - logger.debug("received req: #{ev.result.inspect}") - if ev.result.nil? - logger.debug('done reading!') - done_reading = true - @readq.push(END_OF_READS) - else # push the latest read onto the queue and continue reading logger.debug("received req.to_s: #{ev.result.to_s}") res = @unmarshal.call(ev.result.to_s) - logger.debug("req (unmarshalled): #{res.inspect}") @readq.push(res) - if !done_reading - @call.start_read(self) - end - end - elsif ev.type == FINISHED - logger.debug("finished! with status:#{ev.result.inspect}") - finished = true - ev.call.status = ev.result - if ev.result.code != OK - raise BadStatus.new(ev.result.code, ev.result.details) end + + rescue StandardError => e + logger.warn('bidi: read_loop failed') + logger.warn(e) + @readq.push(e) # let each_queued_msg terminate with this error end end end diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 10767c47dec..a915708f920 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -99,7 +99,7 @@ module Google::RPC # event. Send a status of deadline exceeded logger.warn("late call: #{active_call}") send_status(active_call, DEADLINE_EXCEEDED, 'late') - rescue EventError => e + rescue Core::EventError => e # This is raised by GRPC internals but should rarely, if ever happen. # Log it, but don't notify the other endpoint.. logger.warn("failed call: #{active_call}\n#{e}") @@ -146,6 +146,7 @@ module Google::RPC def send_status(active_client, code, details) begin + details = 'Not sure why' if details.nil? active_client.send_status(code, details) rescue StandardError => e logger.warn('Could not send status %d:%s' % [code, details])