add missing fields on server call context and improve robustness of finished calls

pull/11764/head
Alexander Polcyn 7 years ago
parent 59a19a9d5e
commit 7cc30c1155
  1. 14
      src/ruby/lib/grpc/generic/active_call.rb
  2. 1
      src/ruby/lib/grpc/generic/rpc_server.rb
  3. 166
      src/ruby/spec/generic/client_stub_spec.rb
  4. 145
      src/ruby/spec/generic/rpc_server_spec.rb

@ -44,9 +44,9 @@ module GRPC
include Core::TimeConsts
include Core::CallOps
extend Forwardable
attr_reader :deadline, :metadata_sent, :metadata_to_send
attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert
def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=,
:peer, :peer_cert, :trailing_metadata, :status
:trailing_metadata, :status
# client_invoke begins a client invocation.
#
@ -105,8 +105,13 @@ module GRPC
@input_stream_done = false
@call_finished = false
@call_finished_mu = Mutex.new
@client_call_executed = false
@client_call_executed_mu = Mutex.new
# set the peer now so that the accessor can still function
# after the server closes the call
@peer = call.peer
end
# Sends the initial metadata that has yet to be sent.
@ -541,6 +546,10 @@ module GRPC
end
end
def attach_peer_cert(peer_cert)
@peer_cert = peer_cert
end
private
# To be called once the "input stream" has been completelly
@ -612,6 +621,7 @@ module GRPC
# server client_streamer handlers.
MultiReqView = view_class(:cancelled?, :deadline, :each_queued_msg,
:each_remote_read, :metadata, :output_metadata,
:peer, :peer_cert,
:send_initial_metadata,
:metadata_to_send,
:merge_metadata_to_send,

@ -418,6 +418,7 @@ module GRPC
metadata_received: true,
started: false,
metadata_to_send: connect_md)
c.attach_peer_cert(an_rpc.call.peer_cert)
mth = an_rpc.method.to_sym
[c, mth]
end

@ -37,7 +37,9 @@ include GRPC::Core::TimeConsts
include GRPC::Core::CallOps
# check that methods on a finished/closed call t crash
def check_op_view_of_finished_client_call_is_robust(op_view)
def check_op_view_of_finished_client_call(op_view,
expected_metadata,
expected_trailing_metadata)
# use read_response_stream to try to iterate through
# possible response stream
fail('need something to attempt reads') unless block_given?
@ -48,21 +50,39 @@ def check_op_view_of_finished_client_call_is_robust(op_view)
expect { op_view.start_call }.to raise_error(RuntimeError)
sanity_check_values_of_accessors(op_view,
expected_metadata,
expected_trailing_metadata)
expect do
op_view.wait
op_view.cancel
op_view.metadata
op_view.trailing_metadata
op_view.status
op_view.cancelled?
op_view.deadline
op_view.write_flag
op_view.write_flag = 1
end.to_not raise_error
end
def sanity_check_values_of_accessors(op_view,
expected_metadata,
expected_trailing_metadata)
expected_status = Struct::Status.new
expected_status.code = 0
expected_status.details = 'OK'
expected_status.metadata = expected_trailing_metadata
expect(op_view.status).to eq(expected_status)
expect(op_view.metadata).to eq(expected_metadata)
expect(op_view.trailing_metadata).to eq(expected_trailing_metadata)
expect(op_view.cancelled?).to be(false)
expect(op_view.write_flag).to be(nil)
# The deadline attribute of a call can be either
# a GRPC::Core::TimeSpec or a Time, which are mutually exclusive.
# TODO: fix so that the accessor always returns the same type.
expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) ||
op_view.deadline.is_a?(Time)).to be(true)
end
describe 'ClientStub' do
let(:noop) { proc { |x| x } }
@ -154,7 +174,7 @@ describe 'ClientStub' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_request_response(@sent_msg, @resp, @pass,
k1: 'v1', k2: 'v2')
expected_metadata: { k1: 'v1', k2: 'v2' })
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
expect(get_response(stub)).to eq(@resp)
th.join
@ -261,8 +281,14 @@ describe 'ClientStub' do
def run_op_view_metadata_test(run_start_call_first)
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_request_response(@sent_msg, @resp, @pass,
k1: 'v1', k2: 'v2')
@server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
@server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
th = run_request_response(
@sent_msg, @resp, @pass,
expected_metadata: @metadata,
server_initial_md: @server_initial_md,
server_trailing_md: @server_trailing_md)
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
expect(
get_response(stub,
@ -272,12 +298,14 @@ describe 'ClientStub' do
it 'sends metadata to the server ok when running start_call first' do
run_op_view_metadata_test(true)
check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
check_op_view_of_finished_client_call(
@op, @server_initial_md, @server_trailing_md) { |r| p r }
end
it 'does not crash when used after the call has been finished' do
run_op_view_metadata_test(false)
check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
check_op_view_of_finished_client_call(
@op, @server_initial_md, @server_trailing_md) { |r| p r }
end
end
end
@ -300,7 +328,8 @@ describe 'ClientStub' do
end
it 'should send metadata to the server ok' do
th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
th = run_client_streamer(@sent_msgs, @resp, @pass,
expected_metadata: @metadata)
expect(get_response(@stub)).to eq(@resp)
th.join
end
@ -347,7 +376,13 @@ describe 'ClientStub' do
it_behaves_like 'client streaming'
def run_op_view_metadata_test(run_start_call_first)
th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
@server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
@server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
th = run_client_streamer(
@sent_msgs, @resp, @pass,
expected_metadata: @metadata,
server_initial_md: @server_initial_md,
server_trailing_md: @server_trailing_md)
expect(
get_response(@stub,
run_start_call_first: run_start_call_first)).to eq(@resp)
@ -356,12 +391,14 @@ describe 'ClientStub' do
it 'sends metadata to the server ok when running start_call first' do
run_op_view_metadata_test(true)
check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
check_op_view_of_finished_client_call(
@op, @server_initial_md, @server_trailing_md) { |r| p r }
end
it 'does not crash when used after the call has been finished' do
run_op_view_metadata_test(false)
check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
check_op_view_of_finished_client_call(
@op, @server_initial_md, @server_trailing_md) { |r| p r }
end
end
end
@ -396,7 +433,7 @@ describe 'ClientStub' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_server_streamer(@sent_msg, @replys, @fail,
k1: 'v1', k2: 'v2')
expected_metadata: { k1: 'v1', k2: 'v2' })
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
e = get_responses(stub)
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
@ -459,24 +496,31 @@ describe 'ClientStub' do
def run_op_view_metadata_test(run_start_call_first)
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_server_streamer(@sent_msg, @replys, @fail,
k1: 'v1', k2: 'v2')
@server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
@server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
th = run_server_streamer(
@sent_msg, @replys, @pass,
expected_metadata: @metadata,
server_initial_md: @server_initial_md,
server_trailing_md: @server_trailing_md)
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
e = get_responses(stub, run_start_call_first: run_start_call_first)
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
expect(e.collect { |r| r }).to eq(@replys)
th.join
end
it 'should send metadata to the server ok when start_call is run first' do
run_op_view_metadata_test(true)
check_op_view_of_finished_client_call_is_robust(@op) do |responses|
check_op_view_of_finished_client_call(
@op, @server_initial_md, @server_trailing_md) do |responses|
responses.each { |r| p r }
end
end
it 'does not crash when used after the call has been finished' do
run_op_view_metadata_test(false)
check_op_view_of_finished_client_call_is_robust(@op) do |responses|
check_op_view_of_finished_client_call(
@op, @server_initial_md, @server_trailing_md) do |responses|
responses.each { |r| p r }
end
end
@ -530,7 +574,7 @@ describe 'ClientStub' do
it 'should send metadata to the server ok' do
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true,
**@metadata)
expected_metadata: @metadata)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@sent_msgs)
@ -567,40 +611,52 @@ describe 'ClientStub' do
it_behaves_like 'bidi streaming'
def run_op_view_metadata_test(run_start_call_first)
th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
@pass)
@server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
@server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
th = run_bidi_streamer_echo_ping_pong(
@sent_msgs, @pass, true,
expected_metadata: @metadata,
server_initial_md: @server_initial_md,
server_trailing_md: @server_trailing_md)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub, run_start_call_first: run_start_call_first)
expect(e.collect { |r| r }).to eq(@replys)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
end
it 'can run start_call before executing the call' do
run_op_view_metadata_test(true)
check_op_view_of_finished_client_call_is_robust(@op) do |responses|
check_op_view_of_finished_client_call(
@op, @server_initial_md, @server_trailing_md) do |responses|
responses.each { |r| p r }
end
end
it 'doesnt crash when op_view used after call has finished' do
run_op_view_metadata_test(false)
check_op_view_of_finished_client_call_is_robust(@op) do |responses|
check_op_view_of_finished_client_call(
@op, @server_initial_md, @server_trailing_md) do |responses|
responses.each { |r| p r }
end
end
end
end
def run_server_streamer(expected_input, replys, status, **kw)
wanted_metadata = kw.clone
def run_server_streamer(expected_input, replys, status,
expected_metadata: {},
server_initial_md: {},
server_trailing_md: {})
wanted_metadata = expected_metadata.clone
wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier)
c = expect_server_to_be_invoked(
notifier, metadata_to_send: server_initial_md)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
expect(c.remote_read).to eq(expected_input)
replys.each { |r| c.remote_send(r) }
c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
metadata: server_trailing_md)
end
end
@ -615,10 +671,13 @@ describe 'ClientStub' do
end
def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts,
**kw)
wanted_metadata = kw.clone
expected_metadata: {},
server_initial_md: {},
server_trailing_md: {})
wanted_metadata = expected_metadata.clone
wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier)
c = expect_server_to_be_invoked(
notifier, metadata_to_send: server_initial_md)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
@ -631,33 +690,44 @@ describe 'ClientStub' do
expect(c.remote_read).to eq(i)
end
end
c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
metadata: server_trailing_md)
end
end
def run_client_streamer(expected_inputs, resp, status, **kw)
wanted_metadata = kw.clone
def run_client_streamer(expected_inputs, resp, status,
expected_metadata: {},
server_initial_md: {},
server_trailing_md: {})
wanted_metadata = expected_metadata.clone
wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier)
c = expect_server_to_be_invoked(
notifier, metadata_to_send: server_initial_md)
expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
c.remote_send(resp)
c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
metadata: server_trailing_md)
end
end
def run_request_response(expected_input, resp, status, **kw)
wanted_metadata = kw.clone
def run_request_response(expected_input, resp, status,
expected_metadata: {},
server_initial_md: {},
server_trailing_md: {})
wanted_metadata = expected_metadata.clone
wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier)
c = expect_server_to_be_invoked(
notifier, metadata_to_send: server_initial_md)
expect(c.remote_read).to eq(expected_input)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
c.remote_send(resp)
c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
metadata: server_trailing_md)
end
end
@ -675,13 +745,13 @@ describe 'ClientStub' do
@server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
end
def expect_server_to_be_invoked(notifier)
def expect_server_to_be_invoked(notifier, metadata_to_send: nil)
@server.start
notifier.notify(nil)
recvd_rpc = @server.request_call
recvd_call = recvd_rpc.call
recvd_call.metadata = recvd_rpc.metadata
recvd_call.run_batch(SEND_INITIAL_METADATA => nil)
recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send)
GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE,
metadata_received: true)
end

@ -111,6 +111,47 @@ end
SlowStub = SlowService.rpc_stub_class
# a test service that hangs onto call objects
# and uses them after the server-side call has been
# finished
class CheckCallAfterFinishedService
include GRPC::GenericService
rpc :an_rpc, EchoMsg, EchoMsg
rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg
rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)
attr_reader :server_side_call
def an_rpc(req, call)
fail 'shouldnt reuse service' unless @call.nil?
@server_side_call = call
req
end
def a_client_streaming_rpc(call)
fail 'shouldnt reuse service' unless @call.nil?
@server_side_call = call
# iterate through requests so call can complete
call.each_remote_read.each { |r| p r }
EchoMsg.new
end
def a_server_streaming_rpc(_, call)
fail 'shouldnt reuse service' unless @call.nil?
@server_side_call = call
[EchoMsg.new, EchoMsg.new]
end
def a_bidi_rpc(requests, call)
fail 'shouldnt reuse service' unless @call.nil?
@server_side_call = call
requests.each { |r| p r }
[EchoMsg.new, EchoMsg.new]
end
end
CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class
describe GRPC::RpcServer do
RpcServer = GRPC::RpcServer
StatusCodes = GRPC::Core::StatusCodes
@ -505,5 +546,109 @@ describe GRPC::RpcServer do
t.join
end
end
context 'when call objects are used after calls have completed' do
before(:each) do
server_opts = {
poll_period: 1
}
@srv = RpcServer.new(**server_opts)
alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
@alt_host = "0.0.0.0:#{alt_port}"
@service = CheckCallAfterFinishedService.new
@srv.handle(@service)
@srv_thd = Thread.new { @srv.run }
@srv.wait_till_running
end
# check that the server-side call is still in a usable state even
# after it has finished
def check_single_req_view_of_finished_call(call)
common_check_of_finished_server_call(call)
expect(call.peer).to be_a(String)
expect(call.peer_cert).to be(nil)
end
def check_multi_req_view_of_finished_call(call)
common_check_of_finished_server_call(call)
expect do
call.each_remote_read.each { |r| p r }
end.to raise_error(GRPC::Core::CallError)
end
def common_check_of_finished_server_call(call)
expect do
call.merge_metadata_to_send({})
end.to raise_error(RuntimeError)
expect do
call.send_initial_metadata
end.to_not raise_error
expect(call.cancelled?).to be(false)
expect(call.metadata).to be_a(Hash)
expect(call.metadata['user-agent']).to be_a(String)
expect(call.metadata_sent).to be(true)
expect(call.output_metadata).to eq({})
expect(call.metadata_to_send).to eq({})
expect(call.deadline.is_a?(Time)).to be(true)
end
it 'should not crash when call used after an unary call is finished' do
req = EchoMsg.new
stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
:this_channel_is_insecure)
resp = stub.an_rpc(req)
expect(resp).to be_a(EchoMsg)
@srv.stop
@srv_thd.join
check_single_req_view_of_finished_call(@service.server_side_call)
end
it 'should not crash when call used after client streaming finished' do
requests = [EchoMsg.new, EchoMsg.new]
stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
:this_channel_is_insecure)
resp = stub.a_client_streaming_rpc(requests)
expect(resp).to be_a(EchoMsg)
@srv.stop
@srv_thd.join
check_multi_req_view_of_finished_call(@service.server_side_call)
end
it 'should not crash when call used after server streaming finished' do
req = EchoMsg.new
stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
:this_channel_is_insecure)
responses = stub.a_server_streaming_rpc(req)
responses.each do |r|
expect(r).to be_a(EchoMsg)
end
@srv.stop
@srv_thd.join
check_single_req_view_of_finished_call(@service.server_side_call)
end
it 'should not crash when call used after a bidi call is finished' do
requests = [EchoMsg.new, EchoMsg.new]
stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
:this_channel_is_insecure)
responses = stub.a_bidi_rpc(requests)
responses.each do |r|
expect(r).to be_a(EchoMsg)
end
@srv.stop
@srv_thd.join
check_multi_req_view_of_finished_call(@service.server_side_call)
end
end
end
end

Loading…
Cancel
Save