Merge pull request #1871 from tbetbetbe/grpc_ruby_corrects_the_cancel_after_first_response_test

Corrects the cancel_after_first_response test
pull/1879/head
Jan Tattermusch 10 years ago
commit 67a3c164f5
  1. 4
      src/ruby/bin/interop/interop_client.rb
  2. 11
      src/ruby/lib/grpc/generic/active_call.rb
  3. 64
      src/ruby/lib/grpc/generic/bidi_call.rb

@ -274,6 +274,7 @@ class NamedTests
op = @stub.streaming_input_call(reqs, return_op: true) op = @stub.streaming_input_call(reqs, return_op: true)
op.cancel op.cancel
assert_raises(GRPC::Cancelled) { op.execute } assert_raises(GRPC::Cancelled) { op.execute }
assert(op.cancelled, 'call operation should be CANCELLED')
p 'OK: cancel_after_begin' p 'OK: cancel_after_begin'
end end
@ -282,7 +283,8 @@ class NamedTests
ppp = PingPongPlayer.new(msg_sizes) ppp = PingPongPlayer.new(msg_sizes)
op = @stub.full_duplex_call(ppp.each_item, return_op: true) op = @stub.full_duplex_call(ppp.each_item, return_op: true)
ppp.canceller_op = op # causes ppp to cancel after the 1st message ppp.canceller_op = op # causes ppp to cancel after the 1st message
assert_raises(GRPC::Cancelled) { op.execute.each { |r| ppp.queue.push(r) } } op.execute.each { |r| ppp.queue.push(r) }
assert(op.cancelled, 'call operation should be CANCELLED')
p 'OK: cancel_after_first_response' p 'OK: cancel_after_first_response'
end end

@ -55,7 +55,6 @@ module GRPC
# The ActiveCall class provides simple methods for sending marshallable # The ActiveCall class provides simple methods for sending marshallable
# data to a call # data to a call
class ActiveCall class ActiveCall
include Core::StatusCodes
include Core::TimeConsts include Core::TimeConsts
include Core::CallOps include Core::CallOps
extend Forwardable extend Forwardable
@ -129,6 +128,11 @@ module GRPC
@output_metadata ||= {} @output_metadata ||= {}
end end
# cancelled indicates if the call was cancelled
def cancelled
!@call.status.nil? && @call.status.code == Core::StatusCodes::CANCELLED
end
# multi_req_view provides a restricted view of this ActiveCall for use # multi_req_view provides a restricted view of this ActiveCall for use
# in a server client-streaming handler. # in a server client-streaming handler.
def multi_req_view def multi_req_view
@ -162,6 +166,7 @@ module GRPC
ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
return unless assert_finished return unless assert_finished
@call.status = batch_result.status
batch_result.check_status batch_result.check_status
end end
@ -178,6 +183,7 @@ module GRPC
@call.metadata.merge!(batch_result.status.metadata) @call.metadata.merge!(batch_result.status.metadata)
end end
end end
@call.status = batch_result.status
batch_result.check_status batch_result.check_status
end end
@ -410,9 +416,6 @@ module GRPC
start_call(**kw) unless @started start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
bd.run_on_client(requests, &blk) bd.run_on_client(requests, &blk)
rescue GRPC::Core::CallError => e
finished # checks for Cancelled
raise e
end end
# run_server_bidi orchestrates a BiDi stream processing on a server. # run_server_bidi orchestrates a BiDi stream processing on a server.

@ -78,11 +78,9 @@ module GRPC
# @param requests the Enumerable of requests to send # @param requests the Enumerable of requests to send
# @return an Enumerator of requests to yield # @return an Enumerator of requests to yield
def run_on_client(requests, &blk) def run_on_client(requests, &blk)
@enq_th = start_write_loop(requests) @enq_th = Thread.new { write_loop(requests) }
@loop_th = start_read_loop @loop_th = start_read_loop
replies = each_queued_msg each_queued_msg(&blk)
return replies if blk.nil?
replies.each { |r| blk.call(r) }
end end
# Begins orchestration of the Bidi stream for a server generating replies. # Begins orchestration of the Bidi stream for a server generating replies.
@ -98,9 +96,8 @@ module GRPC
# @param gen_each_reply [Proc] generates the BiDi stream replies. # @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply) def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg) replys = gen_each_reply.call(each_queued_msg)
@enq_th = start_write_loop(replys, is_client: false)
@loop_th = start_read_loop @loop_th = start_read_loop
@enq_th.join if @enq_th.alive? write_loop(replys, is_client: false)
end end
private private
@ -126,37 +123,32 @@ module GRPC
end end
end end
# during bidi-streaming, read the requests to send from a separate thread def write_loop(requests, is_client: true)
# read so that read_loop does not block waiting for requests to read. GRPC.logger.debug('bidi-write-loop: starting')
def start_write_loop(requests, is_client: true) write_tag = Object.new
Thread.new do # TODO: run on a thread pool count = 0
GRPC.logger.debug('bidi-write-loop: starting') requests.each do |req|
begin GRPC.logger.debug("bidi-write-loop: #{count}")
write_tag = Object.new count += 1
count = 0 payload = @marshal.call(req)
requests.each do |req| @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
GRPC.logger.debug("bidi-write-loop: #{count}") SEND_MESSAGE => payload)
count += 1 end
payload = @marshal.call(req) GRPC.logger.debug("bidi-write-loop: #{count} writes done")
@call.run_batch(@cq, write_tag, INFINITE_FUTURE, if is_client
SEND_MESSAGE => payload) GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
end batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
GRPC.logger.debug("bidi-write-loop: #{count} writes done") SEND_CLOSE_FROM_CLIENT => nil,
if is_client RECV_STATUS_ON_CLIENT => nil)
GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") @call.status = batch_result.status
@call.run_batch(@cq, write_tag, INFINITE_FUTURE, batch_result.check_status
SEND_CLOSE_FROM_CLIENT => nil) GRPC.logger.debug("bidi-write-loop: done status #{@call.status}")
batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
RECV_STATUS_ON_CLIENT => nil)
batch_result.check_status
end
rescue StandardError => e
GRPC.logger.warn('bidi-write-loop: failed')
GRPC.logger.warn(e)
raise e
end
GRPC.logger.debug('bidi-write-loop: finished')
end end
GRPC.logger.debug('bidi-write-loop: finished')
rescue StandardError => e
GRPC.logger.warn('bidi-write-loop: failed')
GRPC.logger.warn(e)
raise e
end end
# starts the read loop # starts the read loop

Loading…
Cancel
Save