Updates ActiveCall to use the new call API

pull/1188/head
Tim Emiola 10 years ago
parent 05e934fe16
commit 5684b4073c
  1. 183
      src/ruby/lib/grpc/generic/active_call.rb
  2. 7
      src/ruby/lib/grpc/generic/rpc_desc.rb
  3. 144
      src/ruby/spec/generic/active_call_spec.rb
  4. 57
      src/ruby/spec/generic/rpc_desc_spec.rb

@ -30,20 +30,14 @@
require 'forwardable'
require 'grpc/generic/bidi_call'
def assert_event_type(ev, want)
fail OutOfTime if ev.nil?
got = ev.type
fail "Unexpected rpc event: got #{got}, want #{want}" unless got == want
end
# GRPC contains the General RPC module.
module GRPC
# The ActiveCall class provides simple methods for sending marshallable
# data to a call
class ActiveCall
include Core::CompletionType
include Core::StatusCodes
include Core::TimeConsts
include Core::CallOps
attr_reader(:deadline)
# client_invoke begins a client invocation.
@ -61,15 +55,14 @@ module GRPC
# @param q [CompletionQueue] the completion queue
# @param deadline [Fixnum,TimeSpec] the deadline
def self.client_invoke(call, q, _deadline, **kw)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
fail(TypeError, '!Core::CompletionQueue')
end
call.add_metadata(kw) if kw.length > 0
client_metadata_read = Object.new
finished_tag = Object.new
call.invoke(q, client_metadata_read, finished_tag)
[finished_tag, client_metadata_read]
metadata_tag = Object.new
call.run_batch(q, metadata_tag, INFINITE_FUTURE,
SEND_INITIAL_METADATA => kw)
metadata_tag
end
# Creates an ActiveCall.
@ -91,25 +84,21 @@ module GRPC
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete
# @param finished_tag [Object] the object used as the call's finish tag,
# if the call has begun
# @param read_metadata_tag [Object] the object used as the call's finish
# tag, if the call has begun
# @param metadata_tag [Object] the object use obtain metadata for clients
# @param started [true|false] indicates if the call has begun
def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil,
read_metadata_tag: nil, started: true)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
def initialize(call, q, marshal, unmarshal, deadline, started: true,
metadata_tag: nil)
fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
fail(TypeError, '!Core::CompletionQueue')
end
@call = call
@cq = q
@deadline = deadline
@finished_tag = finished_tag
@read_metadata_tag = read_metadata_tag
@marshal = marshal
@started = started
@unmarshal = unmarshal
@metadata_tag = metadata_tag
end
# Obtains the status of the call.
@ -176,51 +165,38 @@ module GRPC
# writes_done indicates that all writes are completed.
#
# It blocks until the remote endpoint acknowledges by sending a FINISHED
# event, unless assert_finished is set to false. Any calls to
# #remote_send after this call will fail.
# It blocks until the remote endpoint acknowledges with at status unless
# assert_finished is set to false. Any calls to #remote_send after this
# call will fail.
#
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
def writes_done(assert_finished = true)
@call.writes_done(self)
ev = @cq.pluck(self, INFINITE_FUTURE)
begin
assert_event_type(ev, FINISH_ACCEPTED)
logger.debug("Writes done: waiting for finish? #{assert_finished}")
ensure
ev.close
end
ops = {
SEND_CLOSE_FROM_CLIENT => nil
}
ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
@call.run_batch(@cq, self, INFINITE_FUTURE, ops)
return unless assert_finished
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
fail 'unexpected nil event' if ev.nil?
ev.close
@call.status
end
# finished waits until the call is completed.
# finished waits until a client call is completed.
#
# It blocks until the remote endpoint acknowledges by sending a FINISHED
# event.
# It blocks until the remote endpoint acknowledges by sending a status.
def finished
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
begin
fail "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
if @call.metadata.nil?
@call.metadata = ev.result.metadata
else
@call.metadata.merge!(ev.result.metadata)
end
if ev.result.code != Core::StatusCodes::OK
fail BadStatus.new(ev.result.code, ev.result.details)
end
res = ev.result
ensure
ev.close
batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE,
RECV_STATUS_ON_CLIENT => nil)
if @call.metadata.nil?
@call.metadata = batch_result.metadata
elsif !batch_result.metadata.nil?
@call.metadata.merge!(batch_result.metadata)
end
res
if batch_result.status.code != Core::StatusCodes::OK
fail BadStatus.new(batch_result.status.code,
batch_result.status.details)
end
batch_result
end
# remote_send sends a request to the remote endpoint.
@ -232,72 +208,50 @@ module GRPC
# @param marshalled [false, true] indicates if the object is already
# marshalled.
def remote_send(req, marshalled = false)
assert_queue_is_ready
logger.debug("sending #{req.inspect}, marshalled? #{marshalled}")
if marshalled
payload = req
else
payload = @marshal.call(req)
end
@call.start_write(Core::ByteBuffer.new(payload), self)
# call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return
# until the flow control allows another send on this call.
ev = @cq.pluck(self, INFINITE_FUTURE)
begin
assert_event_type(ev, WRITE_ACCEPTED)
ensure
ev.close
end
@call.run_batch(@cq, self, INFINITE_FUTURE, SEND_MESSAGE => payload)
end
# send_status sends a status to the remote endpoint
# send_status sends a status to the remote endpoint.
#
# @param code [int] the status code to send
# @param details [String] details
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
def send_status(code = OK, details = '', assert_finished = false)
assert_queue_is_ready
@call.start_write_status(code, details, self)
ev = @cq.pluck(self, INFINITE_FUTURE)
begin
assert_event_type(ev, FINISH_ACCEPTED)
ensure
ev.close
end
logger.debug("Status sent: #{code}:'#{details}'")
return finished if assert_finished
ops = {
SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details)
}
ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
@call.run_batch(@cq, self, INFINITE_FUTURE, ops)
nil
end
# remote_read reads a response from the remote endpoint.
#
# It blocks until the remote endpoint sends a READ or FINISHED event. On
# a READ, it returns the response after unmarshalling it. On
# FINISHED, it returns nil if the status is OK, otherwise raising
# BadStatus
# It blocks until the remote endpoint replies with a message or status.
# On receiving a message, it returns the response after unmarshalling it.
# On receiving a status, it returns nil if the status is OK, otherwise
# raising BadStatus
def remote_read
if @call.metadata.nil? && !@read_metadata_tag.nil?
ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE)
assert_event_type(ev, CLIENT_METADATA_READ)
@call.metadata = ev.result
@read_metadata_tag = nil
ops = { RECV_MESSAGE => nil }
ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil?
batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
unless @metadata_tag.nil?
@call.metadata = batch_result.metadata
@metadata_tag = nil
end
@call.start_read(self)
ev = @cq.pluck(self, INFINITE_FUTURE)
begin
assert_event_type(ev, READ)
logger.debug("received req: #{ev.result.inspect}")
unless ev.result.nil?
logger.debug("received req.to_s: #{ev.result}")
res = @unmarshal.call(ev.result.to_s)
logger.debug("received_req (unmarshalled): #{res.inspect}")
return res
end
ensure
ev.close
logger.debug("received req: #{batch_result}")
unless batch_result.nil? || batch_result.message.nil?
logger.debug("received req.to_s: #{batch_result.message}")
res = @unmarshal.call(batch_result.message)
logger.debug("received_req (unmarshalled): #{res.inspect}")
return res
end
logger.debug('found nil; the final response has been sent')
nil
@ -324,7 +278,6 @@ module GRPC
return enum_for(:each_remote_read) unless block_given?
loop do
resp = remote_read
break if resp.is_a? Struct::Status # is an OK status
break if resp.nil? # the last response was received
yield resp
end
@ -461,8 +414,7 @@ module GRPC
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, **kw, &blk)
start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
@finished_tag)
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
bd.run_on_client(requests, &blk)
end
@ -478,8 +430,7 @@ module GRPC
#
# @param gen_each_reply [Proc] generates the BiDi stream replies
def run_server_bidi(gen_each_reply)
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
@finished_tag)
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
bd.run_on_server(gen_each_reply)
end
@ -516,21 +467,5 @@ module GRPC
# a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute,
:metadata, :status)
# confirms that no events are enqueued, and that the queue is not
# shutdown.
def assert_queue_is_ready
ev = nil
begin
ev = @cq.pluck(self, ZERO)
fail "unexpected event #{ev.inspect}" unless ev.nil?
rescue OutOfTime
logging.debug('timed out waiting for next event')
# expected, nothing should be on the queue and the deadline was ZERO,
# except things using another tag
ensure
ev.close unless ev.nil?
end
end
end
end

@ -81,7 +81,6 @@ module GRPC
active_call.run_server_bidi(mth)
end
send_status(active_call, OK, 'OK')
active_call.finished
rescue BadStatus => e
# this is raised by handlers that want GRPC to send an application
# error code and detail message.
@ -96,10 +95,6 @@ module GRPC
# event. Send a status of deadline exceeded
logger.warn("late call: #{active_call}")
send_status(active_call, DEADLINE_EXCEEDED, 'late')
rescue Core::EventError => e
# This is raised by GRPC internals but should rarely, if ever happen.
# Log it, but don't notify the other endpoint..
logger.warn("failed call: #{active_call}\n#{e}")
rescue StandardError => e
# This will usuaally be an unhandled error in the handling code.
# Send back a UNKNOWN status to the client
@ -142,7 +137,7 @@ module GRPC
def send_status(active_client, code, details)
details = 'Not sure why' if details.nil?
active_client.send_status(code, details)
active_client.send_status(code, details, code == OK)
rescue StandardError => e
logger.warn("Could not send status #{code}:#{details}")
logger.warn(e)

@ -34,12 +34,11 @@ include GRPC::Core::StatusCodes
describe GRPC::ActiveCall do
ActiveCall = GRPC::ActiveCall
Call = GRPC::Core::Call
CompletionType = GRPC::Core::CompletionType
CallOps = GRPC::Core::CallOps
before(:each) do
@pass_through = proc { |x| x }
@server_tag = Object.new
@server_done_tag = Object.new
@tag = Object.new
@client_queue = GRPC::Core::CompletionQueue.new
@ -48,7 +47,7 @@ describe GRPC::ActiveCall do
@server = GRPC::Core::Server.new(@server_queue, nil)
server_port = @server.add_http2_port(host)
@server.start
@ch = GRPC::Core::Channel.new("localhost:#{server_port}", nil)
@ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil)
end
after(:each) do
@ -58,12 +57,10 @@ describe GRPC::ActiveCall do
describe 'restricted view methods' do
before(:each) do
call = make_test_call
done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
deadline)
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
@client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: done_tag,
read_metadata_tag: meta_tag)
metadata_tag: md_tag)
end
describe '#multi_req_view' do
@ -90,48 +87,45 @@ describe GRPC::ActiveCall do
describe '#remote_send' do
it 'allows a client to send a payload to the server' do
call = make_test_call
done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
deadline)
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
@client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: done_tag,
read_metadata_tag: meta_tag)
metadata_tag: md_tag)
msg = 'message is a string'
@client_call.remote_send(msg)
# check that server rpc new was received
@server.request_call(@server_tag)
ev = @server_queue.next(deadline)
expect(ev.type).to be(CompletionType::SERVER_RPC_NEW)
expect(ev.call).to be_a(Call)
expect(ev.tag).to be(@server_tag)
recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
expect(recvd_rpc).to_not eq nil
recvd_call = recvd_rpc.call
# Accept the call, and verify that the server reads the response ok.
ev.call.server_accept(@client_queue, @server_tag)
ev.call.server_end_initial_metadata
server_call = ActiveCall.new(ev.call, @client_queue, @pass_through,
server_ops = {
CallOps::SEND_INITIAL_METADATA => {}
}
recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
@pass_through, deadline)
expect(server_call.remote_read).to eq(msg)
end
it 'marshals the payload using the marshal func' do
call = make_test_call
done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
deadline)
ActiveCall.client_invoke(call, @client_queue, deadline)
marshal = proc { |x| 'marshalled:' + x }
client_call = ActiveCall.new(call, @client_queue, marshal,
@pass_through, deadline,
finished_tag: done_tag,
read_metadata_tag: meta_tag)
@pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
# confirm that the message was marshalled
@server.request_call(@server_tag)
ev = @server_queue.next(deadline)
ev.call.server_accept(@client_queue, @server_tag)
ev.call.server_end_initial_metadata
server_call = ActiveCall.new(ev.call, @client_queue, @pass_through,
recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
recvd_call = recvd_rpc.call
server_ops = {
CallOps::SEND_INITIAL_METADATA => nil
}
recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
@pass_through, deadline)
expect(server_call.remote_read).to eq('marshalled:' + msg)
end
@ -142,23 +136,22 @@ describe GRPC::ActiveCall do
call = make_test_call
ActiveCall.client_invoke(call, @client_queue, deadline,
k1: 'v1', k2: 'v2')
@server.request_call(@server_tag)
ev = @server_queue.next(deadline)
expect(ev).to_not be_nil
expect(ev.result.metadata['k1']).to eq('v1')
expect(ev.result.metadata['k2']).to eq('v2')
recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
recvd_call = recvd_rpc.call
expect(recvd_call).to_not be_nil
expect(recvd_rpc.metadata).to_not be_nil
expect(recvd_rpc.metadata['k1']).to eq('v1')
expect(recvd_rpc.metadata['k2']).to eq('v2')
end
end
describe '#remote_read' do
it 'reads the response sent by a server' do
call = make_test_call
done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
deadline)
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: done_tag,
read_metadata_tag: meta_tag)
metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@ -168,12 +161,10 @@ describe GRPC::ActiveCall do
it 'saves no metadata when the server adds no metadata' do
call = make_test_call
done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
deadline)
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: done_tag,
read_metadata_tag: meta_tag)
metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@ -185,12 +176,10 @@ describe GRPC::ActiveCall do
it 'saves metadata add by the server' do
call = make_test_call
done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
deadline)
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: done_tag,
read_metadata_tag: meta_tag)
metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg, k1: 'v1', k2: 'v2')
@ -203,12 +192,10 @@ describe GRPC::ActiveCall do
it 'get a nil msg before a status when an OK status is sent' do
call = make_test_call
done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
deadline)
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: done_tag,
read_metadata_tag: meta_tag)
metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
client_call.writes_done(false)
@ -222,13 +209,11 @@ describe GRPC::ActiveCall do
it 'unmarshals the response using the unmarshal func' do
call = make_test_call
done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
deadline)
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
unmarshal = proc { |x| 'unmarshalled:' + x }
client_call = ActiveCall.new(call, @client_queue, @pass_through,
unmarshal, deadline,
finished_tag: done_tag,
read_metadata_tag: meta_tag)
metadata_tag: md_tag)
# confirm the client receives the unmarshalled message
msg = 'message is a string'
@ -249,13 +234,11 @@ describe GRPC::ActiveCall do
it 'the returns an enumerator that can read n responses' do
call = make_test_call
done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
deadline)
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: done_tag,
read_metadata_tag: meta_tag)
msg = 'message is 4a string'
metadata_tag: md_tag)
msg = 'message is a string'
reply = 'server_response'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@ -269,12 +252,10 @@ describe GRPC::ActiveCall do
it 'the returns an enumerator that stops after an OK Status' do
call = make_test_call
done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
deadline)
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
read_metadata_tag: meta_tag,
finished_tag: done_tag)
metadata_tag: md_tag)
msg = 'message is a string'
reply = 'server_response'
client_call.remote_send(msg)
@ -294,12 +275,10 @@ describe GRPC::ActiveCall do
describe '#writes_done' do
it 'finishes ok if the server sends a status response' do
call = make_test_call
done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
deadline)
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: done_tag,
read_metadata_tag: meta_tag)
metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
expect { client_call.writes_done(false) }.to_not raise_error
@ -312,12 +291,10 @@ describe GRPC::ActiveCall do
it 'finishes ok if the server sends an early status response' do
call = make_test_call
done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
deadline)
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
read_metadata_tag: meta_tag,
finished_tag: done_tag)
metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@ -330,12 +307,10 @@ describe GRPC::ActiveCall do
it 'finishes ok if writes_done is true' do
call = make_test_call
done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
deadline)
md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
read_metadata_tag: meta_tag,
finished_tag: done_tag)
metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@ -353,21 +328,20 @@ describe GRPC::ActiveCall do
end
def expect_server_to_be_invoked(**kw)
@server.request_call(@server_tag)
ev = @server_queue.next(deadline)
ev.call.add_metadata(kw)
ev.call.server_accept(@client_queue, @server_done_tag)
ev.call.server_end_initial_metadata
ActiveCall.new(ev.call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: @server_done_tag)
recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
expect(recvd_rpc).to_not eq nil
recvd_call = recvd_rpc.call
recvd_call.run_batch(@server_queue, @server_tag, deadline,
CallOps::SEND_INITIAL_METADATA => kw)
ActiveCall.new(recvd_call, @server_queue, @pass_through,
@pass_through, deadline)
end
def make_test_call
@ch.create_call(@client_queue, 'dummy_method', 'dummy_host', deadline)
@ch.create_call(@client_queue, '/method', 'a.dummy.host', deadline)
end
def deadline
Time.now + 1 # in 1 second; arbitrary
Time.now + 2 # in 2 seconds; arbitrary
end
end

@ -37,7 +37,6 @@ describe GRPC::RpcDesc do
INTERNAL = GRPC::Core::StatusCodes::INTERNAL
UNKNOWN = GRPC::Core::StatusCodes::UNKNOWN
CallError = GRPC::Core::CallError
EventError = GRPC::Core::EventError
before(:each) do
@request_response = RpcDesc.new('rr', Object.new, Object.new, 'encode',
@ -63,24 +62,17 @@ describe GRPC::RpcDesc do
it 'sends the specified status if BadStatus is raised' do
expect(@call).to receive(:remote_read).once.and_return(Object.new)
expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK')
expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false)
@request_response.run_server_method(@call, method(:bad_status))
end
it 'sends status UNKNOWN if other StandardErrors are raised' do
expect(@call).to receive(:remote_read).once.and_return(Object.new)
expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason)
expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason,
false)
@request_response.run_server_method(@call, method(:other_error))
end
it 'absorbs EventError with no further action' do
expect(@call).to receive(:remote_read).once.and_raise(EventError)
blk = proc do
@request_response.run_server_method(@call, method(:fake_reqresp))
end
expect(&blk).to_not raise_error
end
it 'absorbs CallError with no further action' do
expect(@call).to receive(:remote_read).once.and_raise(CallError)
blk = proc do
@ -93,8 +85,7 @@ describe GRPC::RpcDesc do
req = Object.new
expect(@call).to receive(:remote_read).once.and_return(req)
expect(@call).to receive(:remote_send).once.with(@ok_response)
expect(@call).to receive(:send_status).once.with(OK, 'OK')
expect(@call).to receive(:finished).once
expect(@call).to receive(:send_status).once.with(OK, 'OK', true)
@request_response.run_server_method(@call, method(:fake_reqresp))
end
end
@ -107,23 +98,16 @@ describe GRPC::RpcDesc do
end
it 'sends the specified status if BadStatus is raised' do
expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK')
expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false)
@client_streamer.run_server_method(@call, method(:bad_status_alt))
end
it 'sends status UNKNOWN if other StandardErrors are raised' do
expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason)
expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason,
false)
@client_streamer.run_server_method(@call, method(:other_error_alt))
end
it 'absorbs EventError with no further action' do
expect(@call).to receive(:remote_send).once.and_raise(EventError)
blk = proc do
@client_streamer.run_server_method(@call, method(:fake_clstream))
end
expect(&blk).to_not raise_error
end
it 'absorbs CallError with no further action' do
expect(@call).to receive(:remote_send).once.and_raise(CallError)
blk = proc do
@ -134,8 +118,7 @@ describe GRPC::RpcDesc do
it 'sends a response and closes the stream if there no errors' do
expect(@call).to receive(:remote_send).once.with(@ok_response)
expect(@call).to receive(:send_status).once.with(OK, 'OK')
expect(@call).to receive(:finished).once
expect(@call).to receive(:send_status).once.with(OK, 'OK', true)
@client_streamer.run_server_method(@call, method(:fake_clstream))
end
end
@ -149,24 +132,17 @@ describe GRPC::RpcDesc do
it 'sends the specified status if BadStatus is raised' do
expect(@call).to receive(:remote_read).once.and_return(Object.new)
expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK')
expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false)
@server_streamer.run_server_method(@call, method(:bad_status))
end
it 'sends status UNKNOWN if other StandardErrors are raised' do
expect(@call).to receive(:remote_read).once.and_return(Object.new)
expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason)
expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason,
false)
@server_streamer.run_server_method(@call, method(:other_error))
end
it 'absorbs EventError with no further action' do
expect(@call).to receive(:remote_read).once.and_raise(EventError)
blk = proc do
@server_streamer.run_server_method(@call, method(:fake_svstream))
end
expect(&blk).to_not raise_error
end
it 'absorbs CallError with no further action' do
expect(@call).to receive(:remote_read).once.and_raise(CallError)
blk = proc do
@ -179,8 +155,7 @@ describe GRPC::RpcDesc do
req = Object.new
expect(@call).to receive(:remote_read).once.and_return(req)
expect(@call).to receive(:remote_send).twice.with(@ok_response)
expect(@call).to receive(:send_status).once.with(OK, 'OK')
expect(@call).to receive(:finished).once
expect(@call).to receive(:send_status).once.with(OK, 'OK', true)
@server_streamer.run_server_method(@call, method(:fake_svstream))
end
end
@ -197,20 +172,20 @@ describe GRPC::RpcDesc do
it 'sends the specified status if BadStatus is raised' do
e = GRPC::BadStatus.new(@bs_code, 'NOK')
expect(@call).to receive(:run_server_bidi).and_raise(e)
expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK')
expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false)
@bidi_streamer.run_server_method(@call, method(:bad_status_alt))
end
it 'sends status UNKNOWN if other StandardErrors are raised' do
expect(@call).to receive(:run_server_bidi).and_raise(StandardError)
expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason)
expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason,
false)
@bidi_streamer.run_server_method(@call, method(:other_error_alt))
end
it 'closes the stream if there no errors' do
expect(@call).to receive(:run_server_bidi)
expect(@call).to receive(:send_status).once.with(OK, 'OK')
expect(@call).to receive(:finished).once
expect(@call).to receive(:send_status).once.with(OK, 'OK', true)
@bidi_streamer.run_server_method(@call, method(:fake_bidistream))
end
end

Loading…
Cancel
Save