Moved sending of initial metadata from server into server handler

methods
pull/7293/head
Alex Polcyn 9 years ago committed by Alexander Polcyn
parent 7c519b9243
commit d9892bdd80
  1. 47
      src/ruby/lib/grpc/generic/active_call.rb
  2. 16
      src/ruby/lib/grpc/generic/bidi_call.rb
  3. 9
      src/ruby/lib/grpc/generic/rpc_desc.rb
  4. 23
      src/ruby/lib/grpc/generic/rpc_server.rb
  5. 329
      src/ruby/spec/generic/active_call_spec.rb
  6. 14
      src/ruby/spec/generic/rpc_desc_spec.rb

@ -58,7 +58,7 @@ module GRPC
include Core::TimeConsts
include Core::CallOps
extend Forwardable
attr_reader(:deadline)
attr_reader :deadline, :metadata_sent, :metadata_to_send
def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=,
:peer, :peer_cert, :trailing_metadata
@ -101,7 +101,7 @@ module GRPC
# @param metadata_received [true|false] indicates if metadata has already
# been received. Should always be true for server calls
def initialize(call, marshal, unmarshal, deadline, started: true,
metadata_received: false)
metadata_received: false, metadata_to_send: nil)
fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
@call = call
@deadline = deadline
@ -110,6 +110,14 @@ module GRPC
@metadata_received = metadata_received
@metadata_sent = started
@op_notifier = nil
fail(ArgumentError, 'Already sent md') if started && metadata_to_send
@metadata_to_send = metadata_to_send || {} unless started
end
def send_initial_metadata
fail 'Already sent metadata' if @metadata_sent
start_call(@metadata_to_send)
end
# output_metadata are provides access to hash that can be used to
@ -187,7 +195,7 @@ module GRPC
# @param marshalled [false, true] indicates if the object is already
# marshalled.
def remote_send(req, marshalled = false)
# TODO(murgatroid99): ensure metadata was sent
start_call(@metadata_to_send) unless @metadata_sent
GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}")
payload = marshalled ? req : @marshal.call(req)
@call.run_batch(SEND_MESSAGE => payload)
@ -203,6 +211,7 @@ module GRPC
# list, mulitple metadata for its key are sent
def send_status(code = OK, details = '', assert_finished = false,
metadata: {})
start_call unless @metadata_sent
ops = {
SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, metadata)
}
@ -392,9 +401,12 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, metadata: {}, &blk)
start_call(metadata)
bd = BidiCall.new(@call, @marshal, @unmarshal,
start_call(metadata) unless @metadata_sent
bd = BidiCall.new(@call,
@marshal,
@unmarshal,
metadata_received: @metadata_received)
bd.run_on_client(requests, @op_notifier, &blk)
end
@ -410,8 +422,12 @@ module GRPC
#
# @param gen_each_reply [Proc] generates the BiDi stream replies
def run_server_bidi(gen_each_reply)
bd = BidiCall.new(@call, @marshal, @unmarshal,
metadata_received: @metadata_received)
bd = BidiCall.new(@call,
@marshal,
@unmarshal,
metadata_received: @metadata_received,
req_view: MultiReqView.new(self))
bd.run_on_server(gen_each_reply)
end
@ -428,6 +444,11 @@ module GRPC
@op_notifier.notify(self)
end
def merge_metadata_to_send(new_metadata = {})
fail('cant change metadata after already sent') if @metadata_sent
@metadata_to_send.merge!(new_metadata)
end
private
# Starts the call if not already started
@ -454,12 +475,20 @@ module GRPC
# SingleReqView limits access to an ActiveCall's methods for use in server
# handlers that receive just one request.
SingleReqView = view_class(:cancelled?, :deadline, :metadata,
:output_metadata, :peer, :peer_cert)
:output_metadata, :peer, :peer_cert,
:send_initial_metadata,
:metadata_to_send,
:merge_metadata_to_send,
:metadata_sent)
# MultiReqView limits access to an ActiveCall's methods for use in
# server client_streamer handlers.
MultiReqView = view_class(:cancelled?, :deadline, :each_queued_msg,
:each_remote_read, :metadata, :output_metadata)
:each_remote_read, :metadata, :output_metadata,
:send_initial_metadata,
:metadata_to_send,
:merge_metadata_to_send,
:metadata_sent)
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.

@ -56,7 +56,8 @@ module GRPC
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param metadata_received [true|false] indicates if metadata has already
# been received. Should always be true for server calls
def initialize(call, marshal, unmarshal, metadata_received: false)
def initialize(call, marshal, unmarshal, metadata_received: false,
req_view: nil)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
@call = call
@marshal = marshal
@ -68,6 +69,7 @@ module GRPC
@writes_complete = false
@complete = false
@done_mutex = Mutex.new
@req_view = req_view
end
# Begins orchestration of the Bidi stream for a client sending requests.
@ -97,7 +99,15 @@ module GRPC
#
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg)
# Pass in the optional call object parameter if possible
if gen_each_reply.arity == 1
replys = gen_each_reply.call(each_queued_msg)
elsif gen_each_reply.arity == 2
replys = gen_each_reply.call(each_queued_msg, @req_view)
else
fail 'Illegal arity of reply generator'
end
@loop_th = start_read_loop(is_client: false)
write_loop(replys, is_client: false)
end
@ -162,6 +172,8 @@ module GRPC
payload = @marshal.call(req)
# Fails if status already received
begin
@req_view.send_initial_metadata unless
@req_view.nil? || @req_view.metadata_sent
@call.run_batch(SEND_MESSAGE => payload)
rescue GRPC::Core::CallError => e
# This is almost definitely caused by a status arriving while still

@ -104,7 +104,14 @@ module GRPC
end
def assert_arity_matches(mth)
if request_response? || server_streamer?
# A bidi handler function can optionally be passed a second
# call object parameter for access to metadata, cancelling, etc.
if bidi_streamer?
if mth.arity != 2 && mth.arity != 1
fail arity_error(mth, 2, "should be #{mth.name}(req, call) or " \
"#{mth.name}(req)")
end
elsif request_response? || server_streamer?
if mth.arity != 2
fail arity_error(mth, 2, "should be #{mth.name}(req, call)")
end

@ -339,8 +339,11 @@ module GRPC
return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
noop = proc { |x| x }
# Create a new active call that knows that metadata hasn't been
# sent yet
c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
metadata_received: true)
metadata_received: true, started: false)
c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '')
nil
end
@ -351,8 +354,11 @@ module GRPC
return an_rpc if rpc_descs.key?(mth)
GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
noop = proc { |x| x }
# Create a new active call that knows that
# metadata hasn't been sent yet
c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
metadata_received: true)
metadata_received: true, started: false)
c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '')
nil
end
@ -400,17 +406,20 @@ module GRPC
unless @connect_md_proc.nil?
connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
end
an_rpc.call.run_batch(SEND_INITIAL_METADATA => connect_md)
return nil unless available?(an_rpc)
return nil unless implemented?(an_rpc)
# Create the ActiveCall
# Create the ActiveCall. Indicate that metadata hasnt been sent yet.
GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
rpc_desc = rpc_descs[an_rpc.method.to_sym]
c = ActiveCall.new(an_rpc.call, rpc_desc.marshal_proc,
rpc_desc.unmarshal_proc(:input), an_rpc.deadline,
metadata_received: true)
c = ActiveCall.new(an_rpc.call,
rpc_desc.marshal_proc,
rpc_desc.unmarshal_proc(:input),
an_rpc.deadline,
metadata_received: true,
started: false,
metadata_to_send: connect_md)
mth = an_rpc.method.to_sym
[c, mth]
end

@ -60,8 +60,10 @@ describe GRPC::ActiveCall do
end
describe '#multi_req_view' do
it 'exposes a fixed subset of the ActiveCall methods' do
want = %w(cancelled?, deadline, each_remote_read, metadata, shutdown)
it 'exposes a fixed subset of the ActiveCall.methods' do
want = %w(cancelled?, deadline, each_remote_read, metadata, \
shutdown, peer, peer_cert, send_initial_metadata, \
initial_metadata_sent)
v = @client_call.multi_req_view
want.each do |w|
expect(v.methods.include?(w))
@ -70,8 +72,10 @@ describe GRPC::ActiveCall do
end
describe '#single_req_view' do
it 'exposes a fixed subset of the ActiveCall methods' do
want = %w(cancelled?, deadline, metadata, shutdown)
it 'exposes a fixed subset of the ActiveCall.methods' do
want = %w(cancelled?, deadline, metadata, shutdown, \
send_initial_metadata, metadata_to_send, \
merge_metadata_to_send, initial_metadata_sent)
v = @client_call.single_req_view
want.each do |w|
expect(v.methods.include?(w))
@ -149,6 +153,158 @@ describe GRPC::ActiveCall do
end
end
describe 'sending initial metadata', send_initial_metadata: true do
it 'sends metadata before sending a message if it hasnt been sent yet' do
call = make_test_call
@client_call = ActiveCall.new(
call,
@pass_through,
@pass_through,
deadline,
started: false)
metadata = { key: 'dummy_val', other: 'other_val' }
expect(@client_call.metadata_sent).to eq(false)
@client_call.merge_metadata_to_send(metadata)
message = 'dummy message'
expect(call).to(
receive(:run_batch)
.with(
hash_including(
CallOps::SEND_INITIAL_METADATA => metadata)).once)
expect(call).to(
receive(:run_batch).with(hash_including(
CallOps::SEND_MESSAGE => message)).once)
@client_call.remote_send(message)
expect(@client_call.metadata_sent).to eq(true)
end
it 'doesnt send metadata if it thinks its already been sent' do
call = make_test_call
@client_call = ActiveCall.new(call,
@pass_through,
@pass_through,
deadline)
expect(@client_call.metadata_sent).to eql(true)
expect(call).to(
receive(:run_batch).with(hash_including(
CallOps::SEND_INITIAL_METADATA)).never)
@client_call.remote_send('test message')
end
it 'sends metadata if it is explicitly sent and ok to do so' do
call = make_test_call
@client_call = ActiveCall.new(call,
@pass_through,
@pass_through,
deadline,
started: false)
expect(@client_call.metadata_sent).to eql(false)
metadata = { test_key: 'val' }
@client_call.merge_metadata_to_send(metadata)
expect(@client_call.metadata_to_send).to eq(metadata)
expect(call).to(
receive(:run_batch).with(hash_including(
CallOps::SEND_INITIAL_METADATA =>
metadata)).once)
@client_call.send_initial_metadata
end
it 'explicit sending fails if metadata has already been sent' do
call = make_test_call
@client_call = ActiveCall.new(call,
@pass_through,
@pass_through,
deadline)
expect(@client_call.metadata_sent).to eql(true)
blk = proc do
@client_call.send_initial_metadata
end
expect { blk.call }.to raise_error
end
end
describe '#merge_metadata_to_send', merge_metadata_to_send: true do
it 'adds to existing metadata when there is existing metadata to send' do
call = make_test_call
starting_metadata = { k1: 'key1_val', k2: 'key2_val' }
@client_call = ActiveCall.new(
call,
@pass_through, @pass_through,
deadline,
started: false,
metadata_to_send: starting_metadata)
expect(@client_call.metadata_to_send).to eq(starting_metadata)
@client_call.merge_metadata_to_send(
k3: 'key3_val',
k4: 'key4_val')
expected_md_to_send = {
k1: 'key1_val',
k2: 'key2_val',
k3: 'key3_val',
k4: 'key4_val' }
expect(@client_call.metadata_to_send).to eq(expected_md_to_send)
@client_call.merge_metadata_to_send(k5: 'key5_val')
expected_md_to_send.merge!(k5: 'key5_val')
expect(@client_call.metadata_to_send).to eq(expected_md_to_send)
end
it 'overrides existing metadata if adding metadata with an existing key' do
call = make_test_call
starting_metadata = { k1: 'key1_val', k2: 'key2_val' }
@client_call = ActiveCall.new(
call,
@pass_through,
@pass_through,
deadline,
started: false,
metadata_to_send: starting_metadata)
expect(@client_call.metadata_to_send).to eq(starting_metadata)
@client_call.merge_metadata_to_send(k1: 'key1_new_val')
expect(@client_call.metadata_to_send).to eq(k1: 'key1_new_val',
k2: 'key2_val')
end
it 'fails when initial metadata has already been sent' do
call = make_test_call
@client_call = ActiveCall.new(
call,
@pass_through,
@pass_through,
deadline,
started: true)
expect(@client_call.metadata_sent).to eq(true)
blk = proc do
@client_call.merge_metadata_to_send(k1: 'key1_val')
end
expect { blk.call }.to raise_error
end
end
describe '#client_invoke' do
it 'sends metadata to the server when present' do
call = make_test_call
@ -163,7 +319,26 @@ describe GRPC::ActiveCall do
end
end
describe '#remote_read' do
describe '#send_status', send_status: true do
it 'works when no metadata or messages have been sent yet' do
call = make_test_call
ActiveCall.client_invoke(call)
recvd_rpc = @server.request_call
server_call = ActiveCall.new(
recvd_rpc.call,
@pass_through,
@pass_through,
deadline,
started: false)
expect(server_call.metadata_sent).to eq(false)
blk = proc { server_call.send_status(OK) }
expect { blk.call }.to_not raise_error
end
end
describe '#remote_read', remote_read: true do
it 'reads the response sent by a server' do
call = make_test_call
ActiveCall.client_invoke(call)
@ -205,6 +380,31 @@ describe GRPC::ActiveCall do
expect(client_call.metadata).to eq(expected)
end
it 'get a status from server when nothing else sent from server' do
client_call = make_test_call
ActiveCall.client_invoke(client_call)
recvd_rpc = @server.request_call
recvd_call = recvd_rpc.call
server_call = ActiveCall.new(
recvd_call,
@pass_through,
@pass_through,
deadline,
started: false)
server_call.send_status(OK, 'OK')
# Check that we can receive initial metadata and a status
client_call.run_batch(
CallOps::RECV_INITIAL_METADATA => nil)
batch_result = client_call.run_batch(
CallOps::RECV_STATUS_ON_CLIENT => nil)
expect(batch_result.status.code).to eq(OK)
end
it 'get a nil msg before a status when an OK status is sent' do
call = make_test_call
ActiveCall.client_invoke(call)
@ -329,6 +529,125 @@ describe GRPC::ActiveCall do
end
end
# Test sending of the initial metadata in #run_server_bidi
# from the server handler both implicitly and explicitly,
# when the server handler function has one argument and two arguments
describe '#run_server_bidi sanity tests', run_server_bidi: true do
it 'sends the initial metadata implicitly if not already sent' do
requests = ['first message', 'second message']
server_to_client_metadata = { 'test_key' => 'test_val' }
server_status = OK
client_call = make_test_call
client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {})
recvd_rpc = @server.request_call
recvd_call = recvd_rpc.call
server_call = ActiveCall.new(recvd_call,
@pass_through,
@pass_through,
deadline,
metadata_received: true,
started: false,
metadata_to_send: server_to_client_metadata)
# Server handler that doesn't have access to a "call"
# It echoes the requests
fake_gen_each_reply_with_no_call_param = proc do |msgs|
msgs
end
server_thread = Thread.new do
server_call.run_server_bidi(
fake_gen_each_reply_with_no_call_param)
server_call.send_status(server_status)
end
# Send the requests and send a close so the server can send a status
requests.each do |message|
client_call.run_batch(CallOps::SEND_MESSAGE => message)
end
client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
server_thread.join
# Expect that initial metadata was sent,
# the requests were echoed, and a status was sent
batch_result = client_call.run_batch(
CallOps::RECV_INITIAL_METADATA => nil)
expect(batch_result.metadata).to eq(server_to_client_metadata)
requests.each do |message|
batch_result = client_call.run_batch(
CallOps::RECV_MESSAGE => nil)
expect(batch_result.message).to eq(message)
end
batch_result = client_call.run_batch(
CallOps::RECV_STATUS_ON_CLIENT => nil)
expect(batch_result.status.code).to eq(server_status)
end
it 'sends the metadata when sent explicitly and not already sent' do
requests = ['first message', 'second message']
server_to_client_metadata = { 'test_key' => 'test_val' }
server_status = OK
client_call = make_test_call
client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {})
recvd_rpc = @server.request_call
recvd_call = recvd_rpc.call
server_call = ActiveCall.new(recvd_call,
@pass_through,
@pass_through,
deadline,
metadata_received: true,
started: false)
# Fake server handler that has access to a "call" object and
# uses it to explicitly update and sent the initial metadata
fake_gen_each_reply_with_call_param = proc do |msgs, call_param|
call_param.merge_metadata_to_send(server_to_client_metadata)
call_param.send_initial_metadata
msgs
end
server_thread = Thread.new do
server_call.run_server_bidi(
fake_gen_each_reply_with_call_param)
server_call.send_status(server_status)
end
# Send requests and a close from the client so the server
# can send a status
requests.each do |message|
client_call.run_batch(
CallOps::SEND_MESSAGE => message)
end
client_call.run_batch(
CallOps::SEND_CLOSE_FROM_CLIENT => nil)
server_thread.join
# Verify that the correct metadata was sent, the requests
# were echoed, and the correct status was sent
batch_result = client_call.run_batch(
CallOps::RECV_INITIAL_METADATA => nil)
expect(batch_result.metadata).to eq(server_to_client_metadata)
requests.each do |message|
batch_result = client_call.run_batch(
CallOps::RECV_MESSAGE => nil)
expect(batch_result.message).to eq(message)
end
batch_result = client_call.run_batch(
CallOps::RECV_STATUS_ON_CLIENT => nil)
expect(batch_result.status.code).to eq(server_status)
end
end
def expect_server_to_receive(sent_text, **kw)
c = expect_server_to_be_invoked(**kw)
expect(c.remote_read).to eq(sent_text)

@ -196,6 +196,9 @@ describe GRPC::RpcDesc do
def fake_svstream(_arg1, _arg2)
end
def fake_three_args(_arg1, _arg2, _arg3)
end
it 'raises when a request_response does not have 2 args' do
[:fake_clstream, :no_arg].each do |mth|
blk = proc do
@ -244,8 +247,8 @@ describe GRPC::RpcDesc do
expect(&blk).to_not raise_error
end
it 'raises when a bidi streamer does not have 1 arg' do
[:fake_svstream, :no_arg].each do |mth|
it 'raises when a bidi streamer does not have 1 or 2 args' do
[:fake_three_args, :no_arg].each do |mth|
blk = proc do
@bidi_streamer.assert_arity_matches(method(mth))
end
@ -259,6 +262,13 @@ describe GRPC::RpcDesc do
end
expect(&blk).to_not raise_error
end
it 'passes when a bidi streamer has 2 args' do
blk = proc do
@bidi_streamer.assert_arity_matches(method(:fake_svstream))
end
expect(&blk).to_not raise_error
end
end
describe '#request_response?' do

Loading…
Cancel
Save