Merge pull request #8511 from apolcyn/make_fewer_calls_to_run_batch_on_unary_calls_rebased

combine more core batch ops
reviewable/pr8239/r6^2
apolcyn 8 years ago committed by GitHub
commit d92b795b53
  1. 146
      src/ruby/lib/grpc/generic/active_call.rb
  2. 12
      src/ruby/lib/grpc/generic/client_stub.rb
  3. 43
      src/ruby/lib/grpc/generic/rpc_desc.rb
  4. 22
      src/ruby/spec/generic/active_call_spec.rb
  5. 81
      src/ruby/spec/generic/client_stub_spec.rb
  6. 24
      src/ruby/spec/generic/rpc_desc_spec.rb
  7. 1
      src/ruby/spec/generic/rpc_server_spec.rb
  8. 38
      src/ruby/spec/pb/health/checker_spec.rb

@ -43,7 +43,8 @@ class Struct
GRPC.logger.debug("Failing with status #{status}") GRPC.logger.debug("Failing with status #{status}")
# raise BadStatus, propagating the metadata if present. # raise BadStatus, propagating the metadata if present.
md = status.metadata md = status.metadata
fail GRPC::BadStatus.new(status.code, status.details, md) fail GRPC::BadStatus.new(status.code, status.details, md),
"status code: #{status.code}, details: #{status.details}"
end end
status status
end end
@ -156,41 +157,25 @@ module GRPC
Operation.new(self) Operation.new(self)
end end
# writes_done indicates that all writes are completed.
#
# It blocks until the remote endpoint acknowledges with at status unless
# assert_finished is set to false. Any calls to #remote_send after this
# call will fail.
#
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
def writes_done(assert_finished = true)
ops = {
SEND_CLOSE_FROM_CLIENT => nil
}
ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
batch_result = @call.run_batch(ops)
return unless assert_finished
unless batch_result.status.nil?
@call.trailing_metadata = batch_result.status.metadata
end
@call.status = batch_result.status
op_is_done
batch_result.check_status
end
# finished waits until a client call is completed. # finished waits until a client call is completed.
# #
# It blocks until the remote endpoint acknowledges by sending a status. # It blocks until the remote endpoint acknowledges by sending a status.
def finished def finished
batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
unless batch_result.status.nil? attach_status_results_and_complete_call(batch_result)
@call.trailing_metadata = batch_result.status.metadata
end end
@call.status = batch_result.status
op_is_done def attach_status_results_and_complete_call(recv_status_batch_result)
batch_result.check_status unless recv_status_batch_result.status.nil?
@call.trailing_metadata = recv_status_batch_result.status.metadata
end
@call.status = recv_status_batch_result.status
@call.close @call.close
op_is_done
# The RECV_STATUS in run_batch always succeeds
# Check the status for a bad status or failed run batch
recv_status_batch_result.check_status
end end
# remote_send sends a request to the remote endpoint. # remote_send sends a request to the remote endpoint.
@ -226,6 +211,23 @@ module GRPC
nil nil
end end
def server_unary_response(req, trailing_metadata: {},
code: Core::StatusCodes::OK, details: 'OK')
ops = {}
@send_initial_md_mutex.synchronize do
ops[SEND_INITIAL_METADATA] = @metadata_to_send unless @metadata_sent
@metadata_sent = true
end
payload = @marshal.call(req)
ops[SEND_MESSAGE] = payload
ops[SEND_STATUS_FROM_SERVER] = Struct::Status.new(
code, details, trailing_metadata)
ops[RECV_CLOSE_ON_SERVER] = nil
@call.run_batch(ops)
end
# remote_read reads a response from the remote endpoint. # remote_read reads a response from the remote endpoint.
# #
# It blocks until the remote endpoint replies with a message or status. # It blocks until the remote endpoint replies with a message or status.
@ -240,9 +242,13 @@ module GRPC
@call.metadata = batch_result.metadata @call.metadata = batch_result.metadata
@metadata_received = true @metadata_received = true
end end
unless batch_result.nil? || batch_result.message.nil? get_message_from_batch_result(batch_result)
res = @unmarshal.call(batch_result.message) end
return res
def get_message_from_batch_result(recv_message_batch_result)
unless recv_message_batch_result.nil? ||
recv_message_batch_result.message.nil?
return @unmarshal.call(recv_message_batch_result.message)
end end
GRPC.logger.debug('found nil; the final response has been sent') GRPC.logger.debug('found nil; the final response has been sent')
nil nil
@ -298,7 +304,6 @@ module GRPC
return enum_for(:each_remote_read_then_finish) unless block_given? return enum_for(:each_remote_read_then_finish) unless block_given?
loop do loop do
resp = remote_read resp = remote_read
break if resp.is_a? Struct::Status # is an OK status
if resp.nil? # the last response was received, but not finished yet if resp.nil? # the last response was received, but not finished yet
finished finished
break break
@ -315,15 +320,25 @@ module GRPC
# a list, multiple metadata for its key are sent # a list, multiple metadata for its key are sent
# @return [Object] the response received from the server # @return [Object] the response received from the server
def request_response(req, metadata: {}) def request_response(req, metadata: {})
merge_metadata_to_send(metadata) && send_initial_metadata ops = {
remote_send(req) SEND_MESSAGE => @marshal.call(req),
writes_done(false) SEND_CLOSE_FROM_CLIENT => nil,
response = remote_read RECV_INITIAL_METADATA => nil,
finished unless response.is_a? Struct::Status RECV_MESSAGE => nil,
response RECV_STATUS_ON_CLIENT => nil
rescue GRPC::Core::CallError => e }
finished # checks for Cancelled @send_initial_md_mutex.synchronize do
raise e # Metadata might have already been sent if this is an operation view
unless @metadata_sent
ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata)
end
@metadata_sent = true
end
batch_result = @call.run_batch(ops)
@call.metadata = batch_result.metadata
attach_status_results_and_complete_call(batch_result)
get_message_from_batch_result(batch_result)
end end
# client_streamer sends a stream of requests to a GRPC server, and # client_streamer sends a stream of requests to a GRPC server, and
@ -339,12 +354,20 @@ module GRPC
# a list, multiple metadata for its key are sent # a list, multiple metadata for its key are sent
# @return [Object] the response received from the server # @return [Object] the response received from the server
def client_streamer(requests, metadata: {}) def client_streamer(requests, metadata: {})
merge_metadata_to_send(metadata) && send_initial_metadata # Metadata might have already been sent if this is an operation view
requests.each { |r| remote_send(r) } merge_metadata_and_send_if_not_already_sent(metadata)
writes_done(false)
response = remote_read requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
finished unless response.is_a? Struct::Status batch_result = @call.run_batch(
response SEND_CLOSE_FROM_CLIENT => nil,
RECV_INITIAL_METADATA => nil,
RECV_MESSAGE => nil,
RECV_STATUS_ON_CLIENT => nil
)
@call.metadata = batch_result.metadata
attach_status_results_and_complete_call(batch_result)
get_message_from_batch_result(batch_result)
rescue GRPC::Core::CallError => e rescue GRPC::Core::CallError => e
finished # checks for Cancelled finished # checks for Cancelled
raise e raise e
@ -365,9 +388,18 @@ module GRPC
# a list, multiple metadata for its key are sent # a list, multiple metadata for its key are sent
# @return [Enumerator|nil] a response Enumerator # @return [Enumerator|nil] a response Enumerator
def server_streamer(req, metadata: {}) def server_streamer(req, metadata: {})
merge_metadata_to_send(metadata) && send_initial_metadata ops = {
remote_send(req) SEND_MESSAGE => @marshal.call(req),
writes_done(false) SEND_CLOSE_FROM_CLIENT => nil
}
@send_initial_md_mutex.synchronize do
# Metadata might have already been sent if this is an operation view
unless @metadata_sent
ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata)
end
@metadata_sent = true
end
@call.run_batch(ops)
replies = enum_for(:each_remote_read_then_finish) replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given? return replies unless block_given?
replies.each { |r| yield r } replies.each { |r| yield r }
@ -404,7 +436,8 @@ module GRPC
# a list, multiple metadata for its key are sent # a list, multiple metadata for its key are sent
# @return [Enumerator, nil] a response Enumerator # @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, metadata: {}, &blk) def bidi_streamer(requests, metadata: {}, &blk)
merge_metadata_to_send(metadata) && send_initial_metadata # Metadata might have already been sent if this is an operation view
merge_metadata_and_send_if_not_already_sent(metadata)
bd = BidiCall.new(@call, bd = BidiCall.new(@call,
@marshal, @marshal,
@unmarshal, @unmarshal,
@ -457,6 +490,15 @@ module GRPC
end end
end end
def merge_metadata_and_send_if_not_already_sent(new_metadata = {})
@send_initial_md_mutex.synchronize do
return if @metadata_sent
@metadata_to_send.merge!(new_metadata)
@call.run_batch(SEND_INITIAL_METADATA => @metadata_to_send)
@metadata_sent = true
end
end
private private
# Starts the call if not already started # Starts the call if not already started

@ -168,6 +168,7 @@ module GRPC
# return the operation view of the active_call; define #execute as a # return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #request_response. # new method for this instance that invokes #request_response.
c.merge_metadata_to_send(metadata)
op = c.operation op = c.operation
op.define_singleton_method(:execute) do op.define_singleton_method(:execute) do
c.request_response(req, metadata: metadata) c.request_response(req, metadata: metadata)
@ -231,9 +232,10 @@ module GRPC
# return the operation view of the active_call; define #execute as a # return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #client_streamer. # new method for this instance that invokes #client_streamer.
c.merge_metadata_to_send(metadata)
op = c.operation op = c.operation
op.define_singleton_method(:execute) do op.define_singleton_method(:execute) do
c.client_streamer(requests, metadata: metadata) c.client_streamer(requests)
end end
op op
end end
@ -309,9 +311,10 @@ module GRPC
# return the operation view of the active_call; define #execute # return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #server_streamer # as a new method for this instance that invokes #server_streamer
c.merge_metadata_to_send(metadata)
op = c.operation op = c.operation
op.define_singleton_method(:execute) do op.define_singleton_method(:execute) do
c.server_streamer(req, metadata: metadata, &blk) c.server_streamer(req, &blk)
end end
op op
end end
@ -417,15 +420,15 @@ module GRPC
deadline: deadline, deadline: deadline,
parent: parent, parent: parent,
credentials: credentials) credentials: credentials)
return c.bidi_streamer(requests, metadata: metadata, return c.bidi_streamer(requests, metadata: metadata,
&blk) unless return_op &blk) unless return_op
# return the operation view of the active_call; define #execute # return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #bidi_streamer # as a new method for this instance that invokes #bidi_streamer
c.merge_metadata_to_send(metadata)
op = c.operation op = c.operation
op.define_singleton_method(:execute) do op.define_singleton_method(:execute) do
c.bidi_streamer(requests, metadata: metadata, &blk) c.bidi_streamer(requests, &blk)
end end
op op
end end
@ -445,7 +448,6 @@ module GRPC
deadline: nil, deadline: nil,
parent: nil, parent: nil,
credentials: nil) credentials: nil)
deadline = from_relative_time(@timeout) if deadline.nil? deadline = from_relative_time(@timeout) if deadline.nil?
# Provide each new client call with its own completion queue # Provide each new client call with its own completion queue
call = @ch.create_call(parent, # parent call call = @ch.create_call(parent, # parent call

@ -62,25 +62,44 @@ module GRPC
proc { |o| unmarshal_class.method(unmarshal_method).call(o) } proc { |o| unmarshal_class.method(unmarshal_method).call(o) }
end end
def handle_request_response(active_call, mth)
req = active_call.remote_read
resp = mth.call(req, active_call.single_req_view)
active_call.server_unary_response(
resp, trailing_metadata: active_call.output_metadata)
end
def handle_client_streamer(active_call, mth)
resp = mth.call(active_call.multi_req_view)
active_call.server_unary_response(
resp, trailing_metadata: active_call.output_metadata)
end
def handle_server_streamer(active_call, mth)
req = active_call.remote_read
replys = mth.call(req, active_call.single_req_view)
replys.each { |r| active_call.remote_send(r) }
send_status(active_call, OK, 'OK', active_call.output_metadata)
end
def handle_bidi_streamer(active_call, mth)
active_call.run_server_bidi(mth)
send_status(active_call, OK, 'OK', active_call.output_metadata)
end
def run_server_method(active_call, mth) def run_server_method(active_call, mth)
# While a server method is running, it might be cancelled, its deadline # While a server method is running, it might be cancelled, its deadline
# might be reached, the handler could throw an unknown error, or a # might be reached, the handler could throw an unknown error, or a
# well-behaved handler could throw a StatusError. # well-behaved handler could throw a StatusError.
if request_response? if request_response?
req = active_call.remote_read handle_request_response(active_call, mth)
resp = mth.call(req, active_call.single_req_view)
active_call.remote_send(resp)
elsif client_streamer? elsif client_streamer?
resp = mth.call(active_call.multi_req_view) handle_client_streamer(active_call, mth)
active_call.remote_send(resp)
elsif server_streamer? elsif server_streamer?
req = active_call.remote_read handle_server_streamer(active_call, mth)
replys = mth.call(req, active_call.single_req_view)
replys.each { |r| active_call.remote_send(r) }
else # is a bidi_stream else # is a bidi_stream
active_call.run_server_bidi(mth) handle_bidi_streamer(active_call, mth)
end end
send_status(active_call, OK, 'OK', active_call.output_metadata)
rescue BadStatus => e rescue BadStatus => e
# this is raised by handlers that want GRPC to send an application error # this is raised by handlers that want GRPC to send an application error
# code and detail message and some additional app-specific metadata. # code and detail message and some additional app-specific metadata.
@ -91,7 +110,7 @@ module GRPC
# Log it, but don't notify the other endpoint.. # Log it, but don't notify the other endpoint..
GRPC.logger.warn("failed call: #{active_call}\n#{e}") GRPC.logger.warn("failed call: #{active_call}\n#{e}")
rescue Core::OutOfTime rescue Core::OutOfTime
# This is raised when active_call#method.call exceeeds the deadline # This is raised when active_call#method.call exceeds the deadline
# event. Send a status of deadline exceeded # event. Send a status of deadline exceeded
GRPC.logger.warn("late call: #{active_call}") GRPC.logger.warn("late call: #{active_call}")
send_status(active_call, DEADLINE_EXCEEDED, 'late') send_status(active_call, DEADLINE_EXCEEDED, 'late')
@ -100,7 +119,7 @@ module GRPC
# Send back a UNKNOWN status to the client # Send back a UNKNOWN status to the client
GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN") GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
GRPC.logger.warn(e) GRPC.logger.warn(e)
send_status(active_call, UNKNOWN, 'no reason given') send_status(active_call, UNKNOWN, 'unkown error handling call on server')
end end
def assert_arity_matches(mth) def assert_arity_matches(mth)

@ -402,7 +402,7 @@ describe GRPC::ActiveCall do
@pass_through, deadline) @pass_through, deadline)
msg = 'message is a string' msg = 'message is a string'
client_call.remote_send(msg) client_call.remote_send(msg)
client_call.writes_done(false) call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
server_call = expect_server_to_receive(msg) server_call = expect_server_to_receive(msg)
server_call.remote_send('server_response') server_call.remote_send('server_response')
server_call.send_status(OK, 'OK') server_call.send_status(OK, 'OK')
@ -460,7 +460,7 @@ describe GRPC::ActiveCall do
msg = 'message is a string' msg = 'message is a string'
reply = 'server_response' reply = 'server_response'
client_call.remote_send(msg) client_call.remote_send(msg)
client_call.writes_done(false) call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
server_call = expect_server_to_receive(msg) server_call = expect_server_to_receive(msg)
e = client_call.each_remote_read e = client_call.each_remote_read
n = 3 # arbitrary value > 1 n = 3 # arbitrary value > 1
@ -473,7 +473,7 @@ describe GRPC::ActiveCall do
end end
end end
describe '#writes_done' do describe '#closing the call from the client' do
it 'finishes ok if the server sends a status response' do it 'finishes ok if the server sends a status response' do
call = make_test_call call = make_test_call
ActiveCall.client_invoke(call) ActiveCall.client_invoke(call)
@ -481,7 +481,9 @@ describe GRPC::ActiveCall do
@pass_through, deadline) @pass_through, deadline)
msg = 'message is a string' msg = 'message is a string'
client_call.remote_send(msg) client_call.remote_send(msg)
expect { client_call.writes_done(false) }.to_not raise_error expect do
call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
end.to_not raise_error
server_call = expect_server_to_receive(msg) server_call = expect_server_to_receive(msg)
server_call.remote_send('server_response') server_call.remote_send('server_response')
expect(client_call.remote_read).to eq('server_response') expect(client_call.remote_read).to eq('server_response')
@ -500,11 +502,13 @@ describe GRPC::ActiveCall do
server_call.remote_send('server_response') server_call.remote_send('server_response')
server_call.send_status(OK, 'status code is OK') server_call.send_status(OK, 'status code is OK')
expect(client_call.remote_read).to eq('server_response') expect(client_call.remote_read).to eq('server_response')
expect { client_call.writes_done(false) }.to_not raise_error expect do
call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
end.to_not raise_error
expect { client_call.finished }.to_not raise_error expect { client_call.finished }.to_not raise_error
end end
it 'finishes ok if writes_done is true' do it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do
call = make_test_call call = make_test_call
ActiveCall.client_invoke(call) ActiveCall.client_invoke(call)
client_call = ActiveCall.new(call, @pass_through, client_call = ActiveCall.new(call, @pass_through,
@ -515,7 +519,11 @@ describe GRPC::ActiveCall do
server_call.remote_send('server_response') server_call.remote_send('server_response')
server_call.send_status(OK, 'status code is OK') server_call.send_status(OK, 'status code is OK')
expect(client_call.remote_read).to eq('server_response') expect(client_call.remote_read).to eq('server_response')
expect { client_call.writes_done(true) }.to_not raise_error expect do
call.run_batch(
CallOps::SEND_CLOSE_FROM_CLIENT => nil,
CallOps::RECV_STATUS_ON_CLIENT => nil)
end.to_not raise_error
end end
end end

@ -180,22 +180,35 @@ describe 'ClientStub' do
end end
describe 'via a call operation' do describe 'via a call operation' do
def get_response(stub) def get_response(stub, run_start_call_first: false)
op = stub.request_response(@method, @sent_msg, noop, noop, op = stub.request_response(@method, @sent_msg, noop, noop,
return_op: true, return_op: true,
metadata: { k1: 'v1', k2: 'v2' }, metadata: { k1: 'v1', k2: 'v2' },
deadline: from_relative_time(2)) deadline: from_relative_time(2))
expect(op).to be_a(GRPC::ActiveCall::Operation) expect(op).to be_a(GRPC::ActiveCall::Operation)
op.execute op.start_call if run_start_call_first
result = op.execute
op.wait # make sure wait doesn't hang
result
end end
it_behaves_like 'request response' it_behaves_like 'request response'
it 'sends metadata to the server ok when running start_call first' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_request_response(@sent_msg, @resp, @pass,
k1: 'v1', k2: 'v2')
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
expect(get_response(stub)).to eq(@resp)
th.join
end
end end
end end
describe '#client_streamer' do describe '#client_streamer' do
shared_examples 'client streaming' do
before(:each) do before(:each) do
Thread.abort_on_exception = true
server_port = create_test_server server_port = create_test_server
host = "localhost:#{server_port}" host = "localhost:#{server_port}"
@stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
@ -204,6 +217,7 @@ describe 'ClientStub' do
@resp = 'a_reply' @resp = 'a_reply'
end end
shared_examples 'client streaming' do
it 'should send requests to/receive a reply from a server' do it 'should send requests to/receive a reply from a server' do
th = run_client_streamer(@sent_msgs, @resp, @pass) th = run_client_streamer(@sent_msgs, @resp, @pass)
expect(get_response(@stub)).to eq(@resp) expect(get_response(@stub)).to eq(@resp)
@ -242,24 +256,33 @@ describe 'ClientStub' do
end end
describe 'via a call operation' do describe 'via a call operation' do
def get_response(stub) def get_response(stub, run_start_call_first: false)
op = stub.client_streamer(@method, @sent_msgs, noop, noop, op = stub.client_streamer(@method, @sent_msgs, noop, noop,
return_op: true, metadata: @metadata) return_op: true, metadata: @metadata)
expect(op).to be_a(GRPC::ActiveCall::Operation) expect(op).to be_a(GRPC::ActiveCall::Operation)
op.execute op.start_call if run_start_call_first
result = op.execute
op.wait # make sure wait doesn't hang
result
end end
it_behaves_like 'client streaming' it_behaves_like 'client streaming'
it 'sends metadata to the server ok when running start_call first' do
th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
expect(get_response(@stub, run_start_call_first: true)).to eq(@resp)
th.join
end
end end
end end
describe '#server_streamer' do describe '#server_streamer' do
shared_examples 'server streaming' do
before(:each) do before(:each) do
@sent_msg = 'a_msg' @sent_msg = 'a_msg'
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
end end
shared_examples 'server streaming' do
it 'should send a request to/receive replies from a server' do it 'should send a request to/receive replies from a server' do
server_port = create_test_server server_port = create_test_server
host = "localhost:#{server_port}" host = "localhost:#{server_port}"
@ -303,22 +326,36 @@ describe 'ClientStub' do
end end
describe 'via a call operation' do describe 'via a call operation' do
def get_responses(stub) after(:each) do
op = stub.server_streamer(@method, @sent_msg, noop, noop, @op.wait # make sure wait doesn't hang
end
def get_responses(stub, run_start_call_first: false)
@op = stub.server_streamer(@method, @sent_msg, noop, noop,
return_op: true, return_op: true,
metadata: { k1: 'v1', k2: 'v2' }) metadata: { k1: 'v1', k2: 'v2' })
expect(op).to be_a(GRPC::ActiveCall::Operation) expect(@op).to be_a(GRPC::ActiveCall::Operation)
e = op.execute @op.start_call if run_start_call_first
e = @op.execute
expect(e).to be_a(Enumerator) expect(e).to be_a(Enumerator)
e e
end end
it_behaves_like 'server streaming' it_behaves_like 'server streaming'
it 'should send metadata to the server ok when start_call is run first' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_server_streamer(@sent_msg, @replys, @fail,
k1: 'v1', k2: 'v2')
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
e = get_responses(stub, run_start_call_first: true)
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
th.join
end
end end
end end
describe '#bidi_streamer' do describe '#bidi_streamer' do
shared_examples 'bidi streaming' do
before(:each) do before(:each) do
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
@ -326,6 +363,7 @@ describe 'ClientStub' do
@host = "localhost:#{server_port}" @host = "localhost:#{server_port}"
end end
shared_examples 'bidi streaming' do
it 'supports sending all the requests first', bidi: true do it 'supports sending all the requests first', bidi: true do
th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
@pass) @pass)
@ -363,16 +401,29 @@ describe 'ClientStub' do
end end
describe 'via a call operation' do describe 'via a call operation' do
def get_responses(stub) after(:each) do
op = stub.bidi_streamer(@method, @sent_msgs, noop, noop, @op.wait # make sure wait doesn't hang
end
def get_responses(stub, run_start_call_first: false)
@op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
return_op: true) return_op: true)
expect(op).to be_a(GRPC::ActiveCall::Operation) expect(@op).to be_a(GRPC::ActiveCall::Operation)
e = op.execute @op.start_call if run_start_call_first
e = @op.execute
expect(e).to be_a(Enumerator) expect(e).to be_a(Enumerator)
e e
end end
it_behaves_like 'bidi streaming' it_behaves_like 'bidi streaming'
it 'can run start_call before executing the call' do
th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
@pass)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub, run_start_call_first: true)
expect(e.collect { |r| r }).to eq(@replys)
th.join
end
end end
end end

@ -48,7 +48,7 @@ describe GRPC::RpcDesc do
@bidi_streamer = RpcDesc.new('ss', Stream.new(Object.new), @bidi_streamer = RpcDesc.new('ss', Stream.new(Object.new),
Stream.new(Object.new), 'encode', 'decode') Stream.new(Object.new), 'encode', 'decode')
@bs_code = INTERNAL @bs_code = INTERNAL
@no_reason = 'no reason given' @no_reason = 'unkown error handling call on server'
@ok_response = Object.new @ok_response = Object.new
end end
@ -83,6 +83,7 @@ describe GRPC::RpcDesc do
before(:each) do before(:each) do
@call = double('active_call') @call = double('active_call')
allow(@call).to receive(:single_req_view).and_return(@call) allow(@call).to receive(:single_req_view).and_return(@call)
allow(@call).to receive(:output_metadata).and_return(@call)
end end
it_behaves_like 'it handles errors' it_behaves_like 'it handles errors'
@ -90,10 +91,10 @@ describe GRPC::RpcDesc do
it 'sends a response and closes the stream if there no errors' do it 'sends a response and closes the stream if there no errors' do
req = Object.new req = Object.new
expect(@call).to receive(:remote_read).once.and_return(req) expect(@call).to receive(:remote_read).once.and_return(req)
expect(@call).to receive(:remote_send).once.with(@ok_response) expect(@call).to receive(:output_metadata).once.and_return(fake_md)
expect(@call).to receive(:output_metadata).and_return(fake_md) expect(@call).to receive(:server_unary_response).once
expect(@call).to receive(:send_status).once.with(OK, 'OK', true, .with(@ok_response, trailing_metadata: fake_md)
metadata: fake_md)
this_desc.run_server_method(@call, method(:fake_reqresp)) this_desc.run_server_method(@call, method(:fake_reqresp))
end end
end end
@ -117,7 +118,9 @@ describe GRPC::RpcDesc do
end end
it 'absorbs CallError with no further action' do it 'absorbs CallError with no further action' do
expect(@call).to receive(:remote_send).once.and_raise(CallError) expect(@call).to receive(:server_unary_response).once.and_raise(
CallError)
allow(@call).to receive(:output_metadata).and_return({})
blk = proc do blk = proc do
@client_streamer.run_server_method(@call, method(:fake_clstream)) @client_streamer.run_server_method(@call, method(:fake_clstream))
end end
@ -125,10 +128,11 @@ describe GRPC::RpcDesc do
end end
it 'sends a response and closes the stream if there no errors' do it 'sends a response and closes the stream if there no errors' do
expect(@call).to receive(:remote_send).once.with(@ok_response) expect(@call).to receive(:output_metadata).and_return(
expect(@call).to receive(:output_metadata).and_return(fake_md) fake_md)
expect(@call).to receive(:send_status).once.with(OK, 'OK', true, expect(@call).to receive(:server_unary_response).once
metadata: fake_md) .with(@ok_response, trailing_metadata: fake_md)
@client_streamer.run_server_method(@call, method(:fake_clstream)) @client_streamer.run_server_method(@call, method(:fake_clstream))
end end
end end

@ -462,6 +462,7 @@ describe GRPC::RpcServer do
'connect_k1' => 'connect_v1' 'connect_k1' => 'connect_v1'
} }
wanted_md.each do |key, value| wanted_md.each do |key, value|
puts "key: #{key}"
expect(op.metadata[key]).to eq(value) expect(op.metadata[key]).to eq(value)
end end
@srv.stop @srv.stop

@ -97,15 +97,17 @@ describe Grpc::Health::Checker do
context 'initialization' do context 'initialization' do
it 'can be constructed with no args' do it 'can be constructed with no args' do
expect(subject).to_not be(nil) checker = Grpc::Health::Checker.new
expect(checker).to_not be(nil)
end end
end end
context 'method `add_status` and `check`' do context 'method `add_status` and `check`' do
success_tests.each do |t| success_tests.each do |t|
it "should succeed when #{t[:desc]}" do it "should succeed when #{t[:desc]}" do
subject.add_status(t[:service], ServingStatus::NOT_SERVING) checker = Grpc::Health::Checker.new
got = subject.check(HCReq.new(service: t[:service]), nil) checker.add_status(t[:service], ServingStatus::NOT_SERVING)
got = checker.check(HCReq.new(service: t[:service]), nil)
want = HCResp.new(status: ServingStatus::NOT_SERVING) want = HCResp.new(status: ServingStatus::NOT_SERVING)
expect(got).to eq(want) expect(got).to eq(want)
end end
@ -115,8 +117,9 @@ describe Grpc::Health::Checker do
context 'method `check`' do context 'method `check`' do
success_tests.each do |t| success_tests.each do |t|
it "should fail with NOT_FOUND when #{t[:desc]}" do it "should fail with NOT_FOUND when #{t[:desc]}" do
checker = Grpc::Health::Checker.new
blk = proc do blk = proc do
subject.check(HCReq.new(service: t[:service]), nil) checker.check(HCReq.new(service: t[:service]), nil)
end end
expected_msg = /#{StatusCodes::NOT_FOUND}/ expected_msg = /#{StatusCodes::NOT_FOUND}/
expect(&blk).to raise_error GRPC::BadStatus, expected_msg expect(&blk).to raise_error GRPC::BadStatus, expected_msg
@ -127,14 +130,15 @@ describe Grpc::Health::Checker do
context 'method `clear_status`' do context 'method `clear_status`' do
success_tests.each do |t| success_tests.each do |t|
it "should fail after clearing status when #{t[:desc]}" do it "should fail after clearing status when #{t[:desc]}" do
subject.add_status(t[:service], ServingStatus::NOT_SERVING) checker = Grpc::Health::Checker.new
got = subject.check(HCReq.new(service: t[:service]), nil) checker.add_status(t[:service], ServingStatus::NOT_SERVING)
got = checker.check(HCReq.new(service: t[:service]), nil)
want = HCResp.new(status: ServingStatus::NOT_SERVING) want = HCResp.new(status: ServingStatus::NOT_SERVING)
expect(got).to eq(want) expect(got).to eq(want)
subject.clear_status(t[:service]) checker.clear_status(t[:service])
blk = proc do blk = proc do
subject.check(HCReq.new(service: t[:service]), nil) checker.check(HCReq.new(service: t[:service]), nil)
end end
expected_msg = /#{StatusCodes::NOT_FOUND}/ expected_msg = /#{StatusCodes::NOT_FOUND}/
expect(&blk).to raise_error GRPC::BadStatus, expected_msg expect(&blk).to raise_error GRPC::BadStatus, expected_msg
@ -144,18 +148,19 @@ describe Grpc::Health::Checker do
context 'method `clear_all`' do context 'method `clear_all`' do
it 'should return NOT_FOUND after being invoked' do it 'should return NOT_FOUND after being invoked' do
checker = Grpc::Health::Checker.new
success_tests.each do |t| success_tests.each do |t|
subject.add_status(t[:service], ServingStatus::NOT_SERVING) checker.add_status(t[:service], ServingStatus::NOT_SERVING)
got = subject.check(HCReq.new(service: t[:service]), nil) got = checker.check(HCReq.new(service: t[:service]), nil)
want = HCResp.new(status: ServingStatus::NOT_SERVING) want = HCResp.new(status: ServingStatus::NOT_SERVING)
expect(got).to eq(want) expect(got).to eq(want)
end end
subject.clear_all checker.clear_all
success_tests.each do |t| success_tests.each do |t|
blk = proc do blk = proc do
subject.check(HCReq.new(service: t[:service]), nil) checker.check(HCReq.new(service: t[:service]), nil)
end end
expected_msg = /#{StatusCodes::NOT_FOUND}/ expected_msg = /#{StatusCodes::NOT_FOUND}/
expect(&blk).to raise_error GRPC::BadStatus, expected_msg expect(&blk).to raise_error GRPC::BadStatus, expected_msg
@ -184,8 +189,10 @@ describe Grpc::Health::Checker do
end end
it 'should receive the correct status', server: true do it 'should receive the correct status', server: true do
@srv.handle(subject) Thread.abort_on_exception = true
subject.add_status('', ServingStatus::NOT_SERVING) checker = Grpc::Health::Checker.new
@srv.handle(checker)
checker.add_status('', ServingStatus::NOT_SERVING)
t = Thread.new { @srv.run } t = Thread.new { @srv.run }
@srv.wait_till_running @srv.wait_till_running
@ -198,7 +205,8 @@ describe Grpc::Health::Checker do
end end
it 'should fail on unknown services', server: true do it 'should fail on unknown services', server: true do
@srv.handle(subject) checker = Grpc::Health::Checker.new
@srv.handle(checker)
t = Thread.new { @srv.run } t = Thread.new { @srv.run }
@srv.wait_till_running @srv.wait_till_running
blk = proc do blk = proc do

Loading…
Cancel
Save