@ -78,11 +78,9 @@ module GRPC
# @param requests the Enumerable of requests to send
# @return an Enumerator of requests to yield
def run_on_client ( requests , & blk )
@enq_th = start_ write_loop( requests )
@enq_th = Thread . new { write_loop ( requests ) }
@loop_th = start_read_loop
replies = each_queued_msg
return replies if blk . nil?
replies . each { | r | blk . call ( r ) }
each_queued_msg ( & blk )
end
# Begins orchestration of the Bidi stream for a server generating replies.
@ -98,9 +96,8 @@ module GRPC
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server ( gen_each_reply )
replys = gen_each_reply . call ( each_queued_msg )
@enq_th = start_write_loop ( replys , is_client : false )
@loop_th = start_read_loop
@enq_th . join if @enq_th . alive?
write_loop ( replys , is_client : false )
end
private
@ -126,37 +123,32 @@ module GRPC
end
end
# during bidi-streaming, read the requests to send from a separate thread
# read so that read_loop does not block waiting for requests to read.
def start_write_loop ( requests , is_client : true )
Thread . new do # TODO: run on a thread pool
GRPC . logger . debug ( 'bidi-write-loop: starting' )
begin
write_tag = Object . new
count = 0
requests . each do | req |
GRPC . logger . debug ( " bidi-write-loop: #{ count } " )
count += 1
payload = @marshal . call ( req )
@call . run_batch ( @cq , write_tag , INFINITE_FUTURE ,
SEND_MESSAGE = > payload )
end
GRPC . logger . debug ( " bidi-write-loop: #{ count } writes done " )
if is_client
GRPC . logger . debug ( " bidi-write-loop: client sent #{ count } , waiting " )
@call . run_batch ( @cq , write_tag , INFINITE_FUTURE ,
SEND_CLOSE_FROM_CLIENT = > nil )
batch_result = @call . run_batch ( @cq , write_tag , INFINITE_FUTURE ,
RECV_STATUS_ON_CLIENT = > nil )
batch_result . check_status
end
rescue StandardError = > e
GRPC . logger . warn ( 'bidi-write-loop: failed' )
GRPC . logger . warn ( e )
raise e
end
GRPC . logger . debug ( 'bidi-write-loop: finished' )
def write_loop ( requests , is_client : true )
GRPC . logger . debug ( 'bidi-write-loop: starting' )
write_tag = Object . new
count = 0
requests . each do | req |
GRPC . logger . debug ( " bidi-write-loop: #{ count } " )
count += 1
payload = @marshal . call ( req )
@call . run_batch ( @cq , write_tag , INFINITE_FUTURE ,
SEND_MESSAGE = > payload )
end
GRPC . logger . debug ( " bidi-write-loop: #{ count } writes done " )
if is_client
GRPC . logger . debug ( " bidi-write-loop: client sent #{ count } , waiting " )
batch_result = @call . run_batch ( @cq , write_tag , INFINITE_FUTURE ,
SEND_CLOSE_FROM_CLIENT = > nil ,
RECV_STATUS_ON_CLIENT = > nil )
@call . status = batch_result . status
batch_result . check_status
GRPC . logger . debug ( " bidi-write-loop: done status #{ @call . status } " )
end
GRPC . logger . debug ( 'bidi-write-loop: finished' )
rescue StandardError = > e
GRPC . logger . warn ( 'bidi-write-loop: failed' )
GRPC . logger . warn ( e )
raise e
end
# starts the read loop