diff --git a/src/ruby/bin/interop/interop_server.rb b/src/ruby/bin/interop/interop_server.rb index 78cb8dd8364..2ba8d2c19ea 100755 --- a/src/ruby/bin/interop/interop_server.rb +++ b/src/ruby/bin/interop/interop_server.rb @@ -128,16 +128,19 @@ class TestTarget < Grpc::Testing::TestService::Service cls = StreamingOutputCallResponse Thread.new do begin + GRPC.logger.info('interop-server: started receiving') reqs.each do |req| - GRPC.logger.info("read #{req.inspect}") resp_size = req.response_parameters[0].size + GRPC.logger.info("read a req, response size is #{resp_size}") resp = cls.new(payload: Payload.new(type: req.response_type, body: nulls(resp_size))) q.push(resp) end - GRPC.logger.info('finished reads') + GRPC.logger.info('interop-server: finished receiving') q.push(self) rescue StandardError => e + GRPC.logger.info('interop-server: failed') + GRPC.logger.warn(e) q.push(e) # share the exception with the enumerator end end diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 5f7beb5ab1e..04abab8ac37 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -39,6 +39,7 @@ class Struct return nil if status.nil? fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED if status.code != GRPC::Core::StatusCodes::OK + GRPC.logger.debug("Failing with status #{status}") # raise BadStatus, propagating the metadata if present. md = status.metadata with_sym_keys = Hash[md.each_pair.collect { |x, y| [x.to_sym, y] }] diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 67143d40cf5..f1b9f6b00da 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -100,6 +100,7 @@ module GRPC replys = gen_each_reply.call(each_queued_msg) @enq_th = start_write_loop(replys, is_client: false) @loop_th = start_read_loop + @enq_th.join if @enq_th.alive? end private @@ -115,7 +116,7 @@ module GRPC return enum_for(:each_queued_msg) unless block_given? count = 0 loop do - GRPC.logger.debug("each_queued_msg: msg##{count}") + GRPC.logger.debug("each_queued_msg: waiting##{count}") count += 1 req = @readq.pop GRPC.logger.debug("each_queued_msg: req = #{req}") @@ -123,70 +124,73 @@ module GRPC break if req.equal?(END_OF_READS) yield req end - @enq_th.join if @enq_th.alive? end # during bidi-streaming, read the requests to send from a separate thread # read so that read_loop does not block waiting for requests to read. def start_write_loop(requests, is_client: true) Thread.new do # TODO: run on a thread pool - write_tag = Object.new + GRPC.logger.debug('bidi-write-loop: starting') begin + write_tag = Object.new count = 0 requests.each do |req| - GRPC.logger.debug("bidi-write_loop: #{count}") + GRPC.logger.debug("bidi-write-loop: #{count}") count += 1 payload = @marshal.call(req) @call.run_batch(@cq, write_tag, INFINITE_FUTURE, SEND_MESSAGE => payload) end + GRPC.logger.debug("bidi-write-loop: #{count} writes done") if is_client - GRPC.logger.debug("bidi-write-loop: sent #{count}, waiting") + GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") + @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_CLOSE_FROM_CLIENT => nil) batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - SEND_CLOSE_FROM_CLIENT => nil, RECV_STATUS_ON_CLIENT => nil) batch_result.check_status end rescue StandardError => e - GRPC.logger.warn('bidi-write_loop: failed') + GRPC.logger.warn('bidi-write-loop: failed') GRPC.logger.warn(e) raise e end + GRPC.logger.debug('bidi-write-loop: finished') end end # starts the read loop def start_read_loop Thread.new do + GRPC.logger.debug('bidi-read-loop: starting') begin read_tag = Object.new count = 0 - # queue the initial read before beginning the loop loop do - GRPC.logger.debug("bidi-read_loop: #{count}") + GRPC.logger.debug("bidi-read-loop: #{count}") count += 1 # TODO: ensure metadata is read if available, currently it's not batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, RECV_MESSAGE => nil) # handle the next message if batch_result.message.nil? + GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}") @readq.push(END_OF_READS) GRPC.logger.debug('bidi-read-loop: done reading!') break end # push the latest read onto the queue and continue reading - GRPC.logger.debug("received req: #{batch_result.message}") res = @unmarshal.call(batch_result.message) @readq.push(res) end - rescue StandardError => e - GRPC.logger.warn('bidi: read_loop failed') + GRPC.logger.warn('bidi: read-loop failed') GRPC.logger.warn(e) @readq.push(e) # let each_queued_msg terminate with this error end + GRPC.logger.debug('bidi-read-loop: finished') end end end diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 2fd61c5f7e7..dd90d8d91dc 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -137,6 +137,7 @@ module GRPC def send_status(active_client, code, details, **kw) details = 'Not sure why' if details.nil? + GRPC.logger.debug("Sending status #{code}:#{details}") active_client.send_status(code, details, code == OK, **kw) rescue StandardError => e GRPC.logger.warn("Could not send status #{code}:#{details}")