Adds an explicit Cancellation exception

- uses Forwardable to provide access the @call within an ActiveCall
- removes redundant methods from ActiveCall
pull/1293/head
Tim Emiola 10 years ago
parent 72d70fc0af
commit 1b39916bba
  1. 4
      src/ruby/lib/grpc/errors.rb
  2. 94
      src/ruby/lib/grpc/generic/active_call.rb
  3. 4
      src/ruby/spec/generic/rpc_server_spec.rb

@ -54,4 +54,8 @@ module GRPC
Status.new(code, details)
end
end
# Cancelled is an exception class that indicates that an rpc was cancelled.
class Cancelled < StandardError
end
end

@ -30,6 +30,22 @@
require 'forwardable'
require 'grpc/generic/bidi_call'
class Struct
# BatchResult is the struct returned by calls to call#start_batch.
class BatchResult
# check_status returns the status, raising an error if the status
# is non-nil and not OK.
def check_status
return nil if status.nil?
fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED
if status.code != GRPC::Core::StatusCodes::OK
fail GRPC::BadStatus.new(status.code, status.details)
end
status
end
end
end
# GRPC contains the General RPC module.
module GRPC
# The ActiveCall class provides simple methods for sending marshallable
@ -38,7 +54,9 @@ module GRPC
include Core::StatusCodes
include Core::TimeConsts
include Core::CallOps
extend Forwardable
attr_reader(:deadline)
def_delegators :@call, :cancel, :metadata
# client_invoke begins a client invocation.
#
@ -101,50 +119,6 @@ module GRPC
@metadata_tag = metadata_tag
end
# Obtains the status of the call.
#
# this value is nil until the call completes
# @return this call's status
def status
@call.status
end
# Obtains the metadata of the call.
#
# At the start of the call this will be nil. During the call this gets
# some values as soon as the other end of the connection acknowledges the
# request.
#
# @return this calls's metadata
def metadata
@call.metadata
end
# Cancels the call.
#
# Cancels the call. The call does not return any result, but once this it
# has been called, the call should eventually terminate. Due to potential
# races between the execution of the cancel and the in-flight request, the
# result of the call after calling #cancel is indeterminate:
#
# - the call may terminate with a BadStatus exception, with code=CANCELLED
# - the call may terminate with OK Status, and return a response
# - the call may terminate with a different BadStatus exception if that
# was happening
def cancel
@call.cancel
end
# indicates if the call is shutdown
def shutdown
@shutdown ||= false
end
# indicates if the call is cancelled.
def cancelled
@cancelled ||= false
end
# multi_req_view provides a restricted view of this ActiveCall for use
# in a server client-streaming handler.
def multi_req_view
@ -176,9 +150,9 @@ module GRPC
SEND_CLOSE_FROM_CLIENT => nil
}
ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
@call.run_batch(@cq, self, INFINITE_FUTURE, ops)
batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
return unless assert_finished
@call.status
batch_result.check_status
end
# finished waits until a client call is completed.
@ -192,17 +166,12 @@ module GRPC
elsif !batch_result.metadata.nil?
@call.metadata.merge!(batch_result.metadata)
end
if batch_result.status.code != Core::StatusCodes::OK
fail BadStatus.new(batch_result.status.code,
batch_result.status.details)
end
batch_result
batch_result.check_status
end
# remote_send sends a request to the remote endpoint.
#
# It blocks until the remote endpoint acknowledges by sending a
# WRITE_ACCEPTED. req can be marshalled already.
# It blocks until the remote endpoint accepts the message.
#
# @param req [Object, String] the object to send or it's marshal form.
# @param marshalled [false, true] indicates if the object is already
@ -332,6 +301,9 @@ module GRPC
response = remote_read
finished unless response.is_a? Struct::Status
response
rescue GRPC::Core::CallError => e
finished # checks for Cancelled
raise e
end
# client_streamer sends a stream of requests to a GRPC server, and
@ -355,6 +327,9 @@ module GRPC
response = remote_read
finished unless response.is_a? Struct::Status
response
rescue GRPC::Core::CallError => e
finished # checks for Cancelled
raise e
end
# server_streamer sends one request to the GRPC server, which yields a
@ -381,6 +356,9 @@ module GRPC
replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given?
replies.each { |r| yield r }
rescue GRPC::Core::CallError => e
finished # checks for Cancelled
raise e
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields
@ -416,6 +394,9 @@ module GRPC
start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
bd.run_on_client(requests, &blk)
rescue GRPC::Core::CallError => e
finished # checks for Cancelled
raise e
end
# run_server_bidi orchestrates a BiDi stream processing on a server.
@ -436,9 +417,10 @@ module GRPC
private
# Starts the call if not already started
def start_call(**kw)
tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
@finished_tag, @read_metadata_tag = tags
return if @started
@metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
@started = true
end
@ -466,6 +448,6 @@ module GRPC
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute,
:metadata, :status)
:metadata, :status, :start_call)
end
end

@ -396,11 +396,11 @@ describe GRPC::RpcServer do
req = EchoMsg.new
stub = SlowStub.new(@host, **@client_opts)
op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
cancel_thread = Thread.new do
Thread.new do # cancel the call
sleep 0.1
op.cancel
end
expect{op.execute}.to raise_error GRPC::Cancelled
expect { op.execute }.to raise_error GRPC::Cancelled
@srv.stop
t.join
end

Loading…
Cancel
Save