From d62d345b836f20ca0bc1c4fbe57072fee0f8375d Mon Sep 17 00:00:00 2001 From: Tim Emiola Date: Tue, 2 Jun 2015 11:28:03 -0700 Subject: [PATCH] Corrects the cancel_after_first_response test - stops attempting to verify by catching the CancelledError - instead the test examines the operation state after cancellation, which is equally valid and more stable. Also - simplifies bidi on the server --- src/ruby/bin/interop/interop_client.rb | 4 +- src/ruby/lib/grpc/generic/active_call.rb | 11 ++-- src/ruby/lib/grpc/generic/bidi_call.rb | 64 +++++++++++------------- 3 files changed, 38 insertions(+), 41 deletions(-) diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb index 8df03ffb3cd..16fb1b199dd 100755 --- a/src/ruby/bin/interop/interop_client.rb +++ b/src/ruby/bin/interop/interop_client.rb @@ -274,6 +274,7 @@ class NamedTests op = @stub.streaming_input_call(reqs, return_op: true) op.cancel assert_raises(GRPC::Cancelled) { op.execute } + assert(op.cancelled, 'call operation should be CANCELLED') p 'OK: cancel_after_begin' end @@ -282,7 +283,8 @@ class NamedTests ppp = PingPongPlayer.new(msg_sizes) op = @stub.full_duplex_call(ppp.each_item, return_op: true) ppp.canceller_op = op # causes ppp to cancel after the 1st message - assert_raises(GRPC::Cancelled) { op.execute.each { |r| ppp.queue.push(r) } } + op.execute.each { |r| ppp.queue.push(r) } + assert(op.cancelled, 'call operation should be CANCELLED') p 'OK: cancel_after_first_response' end diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 04abab8ac37..3814ef34b41 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -55,7 +55,6 @@ module GRPC # The ActiveCall class provides simple methods for sending marshallable # data to a call class ActiveCall - include Core::StatusCodes include Core::TimeConsts include Core::CallOps extend Forwardable @@ -129,6 +128,11 @@ module GRPC @output_metadata ||= {} end + # cancelled indicates if the call was cancelled + def cancelled + !@call.status.nil? && @call.status.code == Core::StatusCodes::CANCELLED + end + # multi_req_view provides a restricted view of this ActiveCall for use # in a server client-streaming handler. def multi_req_view @@ -162,6 +166,7 @@ module GRPC ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) return unless assert_finished + @call.status = batch_result.status batch_result.check_status end @@ -178,6 +183,7 @@ module GRPC @call.metadata.merge!(batch_result.status.metadata) end end + @call.status = batch_result.status batch_result.check_status end @@ -410,9 +416,6 @@ module GRPC start_call(**kw) unless @started bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) bd.run_on_client(requests, &blk) - rescue GRPC::Core::CallError => e - finished # checks for Cancelled - raise e end # run_server_bidi orchestrates a BiDi stream processing on a server. diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index f1b9f6b00da..489dd5162a0 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -78,11 +78,9 @@ module GRPC # @param requests the Enumerable of requests to send # @return an Enumerator of requests to yield def run_on_client(requests, &blk) - @enq_th = start_write_loop(requests) + @enq_th = Thread.new { write_loop(requests) } @loop_th = start_read_loop - replies = each_queued_msg - return replies if blk.nil? - replies.each { |r| blk.call(r) } + each_queued_msg(&blk) end # Begins orchestration of the Bidi stream for a server generating replies. @@ -98,9 +96,8 @@ module GRPC # @param gen_each_reply [Proc] generates the BiDi stream replies. def run_on_server(gen_each_reply) replys = gen_each_reply.call(each_queued_msg) - @enq_th = start_write_loop(replys, is_client: false) @loop_th = start_read_loop - @enq_th.join if @enq_th.alive? + write_loop(replys, is_client: false) end private @@ -126,37 +123,32 @@ module GRPC end end - # during bidi-streaming, read the requests to send from a separate thread - # read so that read_loop does not block waiting for requests to read. - def start_write_loop(requests, is_client: true) - Thread.new do # TODO: run on a thread pool - GRPC.logger.debug('bidi-write-loop: starting') - begin - write_tag = Object.new - count = 0 - requests.each do |req| - GRPC.logger.debug("bidi-write-loop: #{count}") - count += 1 - payload = @marshal.call(req) - @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - SEND_MESSAGE => payload) - end - GRPC.logger.debug("bidi-write-loop: #{count} writes done") - if is_client - GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") - @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - SEND_CLOSE_FROM_CLIENT => nil) - batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - RECV_STATUS_ON_CLIENT => nil) - batch_result.check_status - end - rescue StandardError => e - GRPC.logger.warn('bidi-write-loop: failed') - GRPC.logger.warn(e) - raise e - end - GRPC.logger.debug('bidi-write-loop: finished') + def write_loop(requests, is_client: true) + GRPC.logger.debug('bidi-write-loop: starting') + write_tag = Object.new + count = 0 + requests.each do |req| + GRPC.logger.debug("bidi-write-loop: #{count}") + count += 1 + payload = @marshal.call(req) + @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_MESSAGE => payload) + end + GRPC.logger.debug("bidi-write-loop: #{count} writes done") + if is_client + GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") + batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_CLOSE_FROM_CLIENT => nil, + RECV_STATUS_ON_CLIENT => nil) + @call.status = batch_result.status + batch_result.check_status + GRPC.logger.debug("bidi-write-loop: done status #{@call.status}") end + GRPC.logger.debug('bidi-write-loop: finished') + rescue StandardError => e + GRPC.logger.warn('bidi-write-loop: failed') + GRPC.logger.warn(e) + raise e end # starts the read loop