@ -30,6 +30,12 @@
require 'forwardable'
require 'grpc/grpc'
def assert_event_type ( ev , want )
raise OutOfTime if ev . nil?
got = ev . type
raise 'Unexpected rpc event: got %s, want %s' % [ got , want ] unless got == want
end
module Google::RPC
# The BiDiCall class orchestrates exection of a BiDi stream on a client or
@ -71,7 +77,6 @@ module Google::RPC
@marshal = marshal
@readq = Queue . new
@unmarshal = unmarshal
@writeq = Queue . new
end
# Begins orchestration of the Bidi stream for a client sending requests.
@ -82,11 +87,13 @@ module Google::RPC
# @param requests the Enumerable of requests to send
# @return an Enumerator of requests to yield
def run_on_client ( requests , & blk )
enq_th = enqueue_for_sending ( requests )
loop_th = start_read_write_ loop
enq_th = start_write_loop ( requests )
loop_th = start_read_loop
replies = each_queued_msg
return replies if blk . nil?
replies . each { | r | blk . call ( r ) }
enq_th . join
loop_th . join
end
# Begins orchestration of the Bidi stream for a server generating replies.
@ -102,8 +109,8 @@ module Google::RPC
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server ( gen_each_reply )
replys = gen_each_reply . call ( each_queued_msg )
enq_th = enqueue_for_sending ( replys )
loop_th = start_read_write_ loop ( is_client : false )
enq_th = start_write_loop ( replys , is_client : false )
loop_th = start_read_loop ( )
loop_th . join
enq_th . join
end
@ -115,7 +122,7 @@ module Google::RPC
# each_queued_msg yields each message on this instances readq
#
# - messages are added to the readq by #read_write_ loop
# - messages are added to the readq by #read_loop
# - iteration ends when the instance itself is added
def each_queued_msg
return enum_for ( :each_queued_msg ) if ! block_given?
@ -131,187 +138,67 @@ module Google::RPC
end
# during bidi-streaming, read the requests to send from a separate thread
# read so that read_write_ loop does not block waiting for requests to read.
def enqueue_for_sending ( requests )
# read so that read_loop does not block waiting for requests to read.
def start_write_loop ( requests , is_client : true )
Thread . new do # TODO(temiola) run on a thread pool
write_tag = Object . new
begin
requests . each { | req | @writeq . push ( req ) }
@writeq . push ( END_OF_WRITES )
rescue StandardError = > e
logger . warn ( 'enqueue_for_sending failed' )
logger . warn ( e )
@writeq . push ( e )
end
count = 0
requests . each do | req |
count += 1
payload = @marshal . call ( req )
@call . start_write ( Core :: ByteBuffer . new ( payload ) , write_tag )
ev = @cq . pluck ( write_tag , INFINITE_FUTURE )
assert_event_type ( ev , WRITE_ACCEPTED )
end
if is_client
@call . writes_done ( write_tag )
ev = @cq . pluck ( write_tag , INFINITE_FUTURE )
assert_event_type ( ev , FINISH_ACCEPTED )
logger . debug ( " bidi-client: sent #{ count } reqs, waiting to finish " )
ev = @cq . pluck ( @finished_tag , INFINITE_FUTURE )
assert_event_type ( ev , FINISHED )
logger . debug ( 'bidi-client: finished received' )
end
# starts the read_write loop
def start_read_write_loop ( is_client : true )
t = Thread . new do
begin
read_write_loop ( is_client : is_client )
rescue StandardError = > e
logger . warn ( 'start_read_ write_loop failed' )
logger . warn ( 'bidi: write_loop failed' )
logger . warn ( e )
@readq . push ( e ) # let each_queued_msg terminate with the error
end
end
t . priority = 3 # hint that read_write_loop threads should be favoured
t
end
# drain_writeq removes any outstanding message on the writeq
def drain_writeq
while @writeq . size != 0 do
discarded = @writeq . pop
logger . warn ( " discarding: queued write: #{ discarded } " )
end
end
# sends the next queued write
#
# The return value is an array with three values
# - the first indicates if a writes was started
# - the second that all writes are done
# - the third indicates that are still writes to perform but they are lates
#
# If value pulled from writeq is a StandardError, the producer hit an error
# that should be raised.
#
# @param is_client [Boolean] when true, writes_done will be called when the
# last entry is read from the writeq
#
# @return [in_write, done_writing]
def next_queued_write ( is_client : true )
in_write , done_writing = false , false
# send the next item on the queue if there is any
return [ in_write , done_writing ] if @writeq . size == 0
# TODO(temiola): provide a queue class that returns nil after a timeout
req = @writeq . pop
if req . equal? ( END_OF_WRITES )
logger . debug ( 'done writing after last req' )
if is_client
logger . debug ( 'sent writes_done after last req' )
@call . writes_done ( self )
end
done_writing = true
return [ in_write , done_writing ]
elsif req . is_a? ( StandardError ) # used to signal an error in the producer
logger . debug ( 'done writing due to a failure' )
if is_client
logger . debug ( 'sent writes_done after a failure' )
@call . writes_done ( self )
end
logger . warn ( req )
done_writing = true
return [ in_write , done_writing ]
end
# send the payload
payload = @marshal . call ( req )
@call . start_write ( Core :: ByteBuffer . new ( payload ) , self )
logger . debug ( " rwloop: sent payload #{ req . inspect } " )
in_write = true
return [ in_write , done_writing ]
end
# read_write_loop takes items off the write_queue and sends them, reads
# msgs and adds them to the read queue.
def read_write_loop ( is_client : true )
done_reading , done_writing = false , false
finished , pre_finished = false , false
in_write , writes_late = false , false
# starts the read loop
def start_read_loop ( )
t = Thread . new do
begin
read_tag = Object . new
count = 0
# queue the initial read before beginning the loop
@call . start_read ( self )
loop do
# whether or not there are outstanding writes is independent of the
# next event from the completion queue. The producer may queue the
# first msg at any time, e.g, after the loop is started running. So,
# it's essential for the loop to check for possible writes here, in
# order to correctly begin writing.
if ! in_write and ! done_writing
in_write , done_writing = next_queued_write ( is_client : is_client )
end
logger . debug ( " rwloop is_client? #{ is_client } " )
logger . debug ( " rwloop count: #{ count } " )
logger . debug ( " waiting for read #{ count } " )
count += 1
# Loop control:
#
# - Break when no further events need to read. On clients, this means
# waiting for a FINISHED, servers just need to wait for all reads and
# writes to be done.
#
# - Also, don't read an event unless there's one expected. This can
# happen, e.g, when all the reads are done, there are no writes
# available, but writing is not complete.
logger . debug ( " done_reading? #{ done_reading } " )
logger . debug ( " done_writing? #{ done_writing } " )
logger . debug ( " finish accepted? #{ pre_finished } " )
logger . debug ( " finished? #{ finished } " )
logger . debug ( " in write? #{ in_write } " )
if is_client
break if done_writing and done_reading and pre_finished and finished
logger . debug ( 'waiting for another event' )
if in_write or ! done_reading or ! pre_finished
logger . debug ( 'waiting for another event' )
ev = @cq . pluck ( self , INFINITE_FUTURE )
elsif ! finished
logger . debug ( 'waiting for another event' )
ev = @cq . pluck ( @finished_tag , INFINITE_FUTURE )
else
next # no events to wait on, but not done writing
end
else
break if done_writing and done_reading
if in_write or ! done_reading
logger . debug ( 'waiting for another event' )
ev = @cq . pluck ( self , INFINITE_FUTURE )
else
next # no events to wait on, but not done writing
end
end
@call . start_read ( read_tag )
ev = @cq . pluck ( read_tag , INFINITE_FUTURE )
assert_event_type ( ev , READ )
# handle the next event.
if ev . nil?
drain_writeq
raise OutOfTime
elsif ev . type == WRITE_ACCEPTED
logger . debug ( 'write accepted!' )
in_write = false
next
elsif ev . type == FINISH_ACCEPTED
logger . debug ( 'finish accepted!' )
pre_finished = true
next
elsif ev . type == READ
logger . debug ( " received req: #{ ev . result . inspect } " )
if ev . result . nil?
logger . debug ( 'done reading!' )
done_reading = true
@readq . push ( END_OF_READS )
else
logger . debug ( 'done reading!' )
break
end
# push the latest read onto the queue and continue reading
logger . debug ( " received req.to_s: #{ ev . result . to_s } " )
res = @unmarshal . call ( ev . result . to_s )
logger . debug ( " req (unmarshalled): #{ res . inspect } " )
@readq . push ( res )
if ! done_reading
@call . start_read ( self )
end
end
elsif ev . type == FINISHED
logger . debug ( " finished! with status: #{ ev . result . inspect } " )
finished = true
ev . call . status = ev . result
if ev . result . code != OK
raise BadStatus . new ( ev . result . code , ev . result . details )
end
rescue StandardError = > e
logger . warn ( 'bidi: read_loop failed' )
logger . warn ( e )
@readq . push ( e ) # let each_queued_msg terminate with this error
end
end
end