Merge pull request #1800 from tbetbetbe/grpc-ruby-fix-bidi-server-fails

Add's logging, removes an unnecessary thread join
pull/1823/head
Jan Tattermusch 10 years ago
commit bb517cabb7
  1. 7
      src/ruby/bin/interop/interop_server.rb
  2. 1
      src/ruby/lib/grpc/generic/active_call.rb
  3. 28
      src/ruby/lib/grpc/generic/bidi_call.rb
  4. 1
      src/ruby/lib/grpc/generic/rpc_desc.rb

@ -128,16 +128,19 @@ class TestTarget < Grpc::Testing::TestService::Service
cls = StreamingOutputCallResponse
Thread.new do
begin
GRPC.logger.info('interop-server: started receiving')
reqs.each do |req|
GRPC.logger.info("read #{req.inspect}")
resp_size = req.response_parameters[0].size
GRPC.logger.info("read a req, response size is #{resp_size}")
resp = cls.new(payload: Payload.new(type: req.response_type,
body: nulls(resp_size)))
q.push(resp)
end
GRPC.logger.info('finished reads')
GRPC.logger.info('interop-server: finished receiving')
q.push(self)
rescue StandardError => e
GRPC.logger.info('interop-server: failed')
GRPC.logger.warn(e)
q.push(e) # share the exception with the enumerator
end
end

@ -39,6 +39,7 @@ class Struct
return nil if status.nil?
fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED
if status.code != GRPC::Core::StatusCodes::OK
GRPC.logger.debug("Failing with status #{status}")
# raise BadStatus, propagating the metadata if present.
md = status.metadata
with_sym_keys = Hash[md.each_pair.collect { |x, y| [x.to_sym, y] }]

@ -100,6 +100,7 @@ module GRPC
replys = gen_each_reply.call(each_queued_msg)
@enq_th = start_write_loop(replys, is_client: false)
@loop_th = start_read_loop
@enq_th.join if @enq_th.alive?
end
private
@ -115,7 +116,7 @@ module GRPC
return enum_for(:each_queued_msg) unless block_given?
count = 0
loop do
GRPC.logger.debug("each_queued_msg: msg##{count}")
GRPC.logger.debug("each_queued_msg: waiting##{count}")
count += 1
req = @readq.pop
GRPC.logger.debug("each_queued_msg: req = #{req}")
@ -123,70 +124,73 @@ module GRPC
break if req.equal?(END_OF_READS)
yield req
end
@enq_th.join if @enq_th.alive?
end
# during bidi-streaming, read the requests to send from a separate thread
# read so that read_loop does not block waiting for requests to read.
def start_write_loop(requests, is_client: true)
Thread.new do # TODO: run on a thread pool
write_tag = Object.new
GRPC.logger.debug('bidi-write-loop: starting')
begin
write_tag = Object.new
count = 0
requests.each do |req|
GRPC.logger.debug("bidi-write_loop: #{count}")
GRPC.logger.debug("bidi-write-loop: #{count}")
count += 1
payload = @marshal.call(req)
@call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_MESSAGE => payload)
end
GRPC.logger.debug("bidi-write-loop: #{count} writes done")
if is_client
GRPC.logger.debug("bidi-write-loop: sent #{count}, waiting")
GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
@call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_CLOSE_FROM_CLIENT => nil)
batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_CLOSE_FROM_CLIENT => nil,
RECV_STATUS_ON_CLIENT => nil)
batch_result.check_status
end
rescue StandardError => e
GRPC.logger.warn('bidi-write_loop: failed')
GRPC.logger.warn('bidi-write-loop: failed')
GRPC.logger.warn(e)
raise e
end
GRPC.logger.debug('bidi-write-loop: finished')
end
end
# starts the read loop
def start_read_loop
Thread.new do
GRPC.logger.debug('bidi-read-loop: starting')
begin
read_tag = Object.new
count = 0
# queue the initial read before beginning the loop
loop do
GRPC.logger.debug("bidi-read_loop: #{count}")
GRPC.logger.debug("bidi-read-loop: #{count}")
count += 1
# TODO: ensure metadata is read if available, currently it's not
batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
RECV_MESSAGE => nil)
# handle the next message
if batch_result.message.nil?
GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
@readq.push(END_OF_READS)
GRPC.logger.debug('bidi-read-loop: done reading!')
break
end
# push the latest read onto the queue and continue reading
GRPC.logger.debug("received req: #{batch_result.message}")
res = @unmarshal.call(batch_result.message)
@readq.push(res)
end
rescue StandardError => e
GRPC.logger.warn('bidi: read_loop failed')
GRPC.logger.warn('bidi: read-loop failed')
GRPC.logger.warn(e)
@readq.push(e) # let each_queued_msg terminate with this error
end
GRPC.logger.debug('bidi-read-loop: finished')
end
end
end

@ -137,6 +137,7 @@ module GRPC
def send_status(active_client, code, details, **kw)
details = 'Not sure why' if details.nil?
GRPC.logger.debug("Sending status #{code}:#{details}")
active_client.send_status(code, details, code == OK, **kw)
rescue StandardError => e
GRPC.logger.warn("Could not send status #{code}:#{details}")

Loading…
Cancel
Save