[ruby] refactor flaky test and expose cancel_with_status (#37410)

Fixes https://github.com/grpc/grpc/issues/37234

Following up on the problem described in https://github.com/grpc/grpc/pull/36903, there are a number of paths in `client_server_spec.rb` and a few other tests where client call objects can leak due to RPC lifecycles not being properly completed, leading to a thread not terminating.

Some of the tests, which don't use the surface-level APIs, are changed to manually close calls (and not rely on GC which might not happen before shutdown of ruby threads). `client_server_spec.rb` is updated to use surface level APIs, which manages call lifecycles correctly (this also improves the test's fidelity).

While we're here: expose `cancel_with_status` on call operations. This was only accidentally private so far. The test refactoring caught it.

Closes #37410

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37410 from apolcyn:fix_call_leak b23047251c
PiperOrigin-RevId: 660430463
pull/37426/head
apolcyn 6 months ago committed by Copybara-Service
parent 36123761a1
commit 0940f8e670
  1. 13
      src/ruby/lib/grpc/generic/active_call.rb
  2. 93
      src/ruby/spec/call_spec.rb
  3. 6
      src/ruby/spec/channel_spec.rb
  4. 655
      src/ruby/spec/client_server_spec.rb
  5. 150
      src/ruby/spec/generic/active_call_spec.rb
  6. 3
      src/ruby/spec/support/services.rb

@ -44,8 +44,8 @@ module GRPC
include Core::CallOps
extend Forwardable
attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert
def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=,
:trailing_metadata, :status
def_delegators :@call, :cancel, :cancel_with_status, :metadata,
:write_flag, :write_flag=, :trailing_metadata, :status
# client_invoke begins a client invocation.
#
@ -620,6 +620,8 @@ module GRPC
# @param metadata [Hash] metadata to be sent to the server. If a value is
# a list, multiple metadata for its key are sent
def start_call(metadata = {})
# TODO(apolcyn): we should cancel and clean up the call in case this
# send initial MD op fails.
merge_metadata_to_send(metadata) && send_initial_metadata
end
@ -665,9 +667,10 @@ module GRPC
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled?, :deadline, :execute,
:metadata, :status, :start_call, :wait, :write_flag,
:write_flag=, :trailing_metadata)
# TODO(apolcyn): expose peer getter
Operation = view_class(:cancel, :cancel_with_status, :cancelled?, :deadline,
:execute, :metadata, :status, :start_call, :wait,
:write_flag, :write_flag=, :trailing_metadata)
# InterceptableView further limits access to an ActiveCall's methods
# for use in interceptors on the client, exposing only the deadline

@ -90,88 +90,101 @@ describe GRPC::Core::Call do
describe '#status' do
it 'can save the status and read it back' do
call = make_test_call
sts = Struct::Status.new(OK, 'OK')
expect { call.status = sts }.not_to raise_error
expect(call.status).to eq(sts)
make_test_call do |call|
sts = Struct::Status.new(OK, 'OK')
expect { call.status = sts }.not_to raise_error
expect(call.status).to eq(sts)
end
end
it 'must be set to a status' do
call = make_test_call
bad_sts = Object.new
expect { call.status = bad_sts }.to raise_error(TypeError)
make_test_call do |call|
bad_sts = Object.new
expect { call.status = bad_sts }.to raise_error(TypeError)
end
end
it 'can be set to nil' do
call = make_test_call
expect { call.status = nil }.not_to raise_error
make_test_call do |call|
expect { call.status = nil }.not_to raise_error
end
end
end
describe '#metadata' do
it 'can save the metadata hash and read it back' do
call = make_test_call
md = { 'k1' => 'v1', 'k2' => 'v2' }
expect { call.metadata = md }.not_to raise_error
expect(call.metadata).to be(md)
make_test_call do |call|
md = { 'k1' => 'v1', 'k2' => 'v2' }
expect { call.metadata = md }.not_to raise_error
expect(call.metadata).to be(md)
end
end
it 'must be set with a hash' do
call = make_test_call
bad_md = Object.new
expect { call.metadata = bad_md }.to raise_error(TypeError)
make_test_call do |call|
bad_md = Object.new
expect { call.metadata = bad_md }.to raise_error(TypeError)
end
end
it 'can be set to nil' do
call = make_test_call
expect { call.metadata = nil }.not_to raise_error
make_test_call do |call|
expect { call.metadata = nil }.not_to raise_error
end
end
end
describe '#set_credentials!' do
it 'can set a valid CallCredentials object' do
call = make_test_call
auth_proc = proc { { 'plugin_key' => 'plugin_value' } }
creds = GRPC::Core::CallCredentials.new auth_proc
expect { call.set_credentials! creds }.not_to raise_error
make_test_call do |call|
auth_proc = proc { { 'plugin_key' => 'plugin_value' } }
creds = GRPC::Core::CallCredentials.new auth_proc
expect { call.set_credentials! creds }.not_to raise_error
end
end
end
describe '#cancel' do
it 'completes ok' do
call = make_test_call
expect { call.cancel }.not_to raise_error
make_test_call do |call|
expect { call.cancel }.not_to raise_error
end
end
it 'completes ok when the call is closed' do
call = make_test_call
call.close
expect { call.cancel }.not_to raise_error
make_test_call do |call|
call.close
expect { call.cancel }.not_to raise_error
end
end
end
describe '#cancel_with_status' do
it 'completes ok' do
call = make_test_call
expect do
call.cancel_with_status(0, 'test status')
end.not_to raise_error
expect do
call.cancel_with_status(0, nil)
end.to raise_error(TypeError)
make_test_call do |call|
expect do
call.cancel_with_status(0, 'test status')
end.not_to raise_error
expect do
call.cancel_with_status(0, nil)
end.to raise_error(TypeError)
end
end
it 'completes ok when the call is closed' do
call = make_test_call
call.close
expect do
call.cancel_with_status(0, 'test status')
end.not_to raise_error
make_test_call do |call|
call.close
expect do
call.cancel_with_status(0, 'test status')
end.not_to raise_error
end
end
end
def make_test_call
@ch.create_call(nil, nil, 'phony_method', nil, deadline)
call = @ch.create_call(nil, nil, 'phony_method', nil, deadline)
yield call
call.close
end
def deadline

@ -118,7 +118,8 @@ describe GRPC::Core::Channel do
deadline = Time.now + 5
blk = proc do
ch.create_call(nil, nil, 'phony_method', nil, deadline)
call = ch.create_call(nil, nil, 'phony_method', nil, deadline)
call.close
end
expect(&blk).to_not raise_error
end
@ -132,8 +133,9 @@ describe GRPC::Core::Channel do
deadline = Time.now + 5
blk = proc do
ch.create_call(nil, nil, 'phony_method', nil, deadline)
call = ch.create_call(nil, nil, 'phony_method', nil, deadline)
STDERR.puts "#{Time.now}: created call"
call.close
end
expect(&blk).to raise_error(RuntimeError)
STDERR.puts "#{Time.now}: finished: raises an error if called on a closed channel"

@ -16,36 +16,8 @@ require 'spec_helper'
include GRPC::Core
shared_context 'setup: tags' do
let(:sent_message) { 'sent message' }
let(:reply_text) { 'the reply' }
def deadline
Time.now + 5
end
def server_allows_client_to_proceed(metadata = {})
recvd_rpc = @server.request_call
expect(recvd_rpc).to_not eq nil
server_call = recvd_rpc.call
ops = { CallOps::SEND_INITIAL_METADATA => metadata }
server_batch = server_call.run_batch(ops)
expect(server_batch.send_metadata).to be true
server_call
end
def new_client_call
@ch.create_call(nil, nil, '/method', nil, deadline)
end
def ok_status
Struct::Status.new(StatusCodes::OK, 'OK')
end
end
shared_examples 'basic GRPC message delivery is OK' do
include GRPC::Core
include_context 'setup: tags'
context 'the test channel' do
it 'should have a target' do
@ -53,272 +25,45 @@ shared_examples 'basic GRPC message delivery is OK' do
end
end
context 'a client call' do
it 'should have a peer' do
expect(new_client_call.peer).to be_a(String)
end
end
it 'calls have peer info' do
call = new_client_call
expect(call.peer).to be_a(String)
end
it 'servers receive requests from clients and can respond' do
call = new_client_call
server_call = nil
server_thread = Thread.new do
server_call = server_allows_client_to_proceed
end
client_ops = {
CallOps::SEND_INITIAL_METADATA => {},
CallOps::SEND_MESSAGE => sent_message,
CallOps::SEND_CLOSE_FROM_CLIENT => nil
}
client_batch = call.run_batch(client_ops)
expect(client_batch.send_metadata).to be true
expect(client_batch.send_message).to be true
expect(client_batch.send_close).to be true
# confirm the server can read the inbound message
server_thread.join
server_ops = {
CallOps::RECV_MESSAGE => nil
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.message).to eq(sent_message)
server_ops = {
CallOps::RECV_CLOSE_ON_SERVER => nil,
CallOps::SEND_STATUS_FROM_SERVER => ok_status
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.send_close).to be true
expect(server_batch.send_status).to be true
# finish the call
final_client_batch = call.run_batch(
CallOps::RECV_INITIAL_METADATA => nil,
CallOps::RECV_STATUS_ON_CLIENT => nil)
expect(final_client_batch.metadata).to eq({})
expect(final_client_batch.status.code).to eq(0)
end
it 'responses written by servers are received by the client' do
call = new_client_call
server_call = nil
server_thread = Thread.new do
server_call = server_allows_client_to_proceed
end
client_ops = {
CallOps::SEND_INITIAL_METADATA => {},
CallOps::SEND_MESSAGE => sent_message,
CallOps::SEND_CLOSE_FROM_CLIENT => nil
}
client_batch = call.run_batch(client_ops)
expect(client_batch.send_metadata).to be true
expect(client_batch.send_message).to be true
expect(client_batch.send_close).to be true
# confirm the server can read the inbound message
server_thread.join
server_ops = {
CallOps::RECV_MESSAGE => nil
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.message).to eq(sent_message)
server_ops = {
CallOps::RECV_CLOSE_ON_SERVER => nil,
CallOps::SEND_MESSAGE => reply_text,
CallOps::SEND_STATUS_FROM_SERVER => ok_status
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.send_close).to be true
expect(server_batch.send_message).to be true
expect(server_batch.send_status).to be true
# finish the call
final_client_batch = call.run_batch(
CallOps::RECV_INITIAL_METADATA => nil,
CallOps::RECV_MESSAGE => nil,
CallOps::RECV_STATUS_ON_CLIENT => nil)
expect(final_client_batch.metadata).to eq({})
expect(final_client_batch.message).to eq(reply_text)
expect(final_client_batch.status.code).to eq(0)
end
it 'compressed messages can be sent and received' do
call = new_client_call
server_call = nil
long_request_str = '0' * 2000
long_response_str = '1' * 2000
md = { 'grpc-internal-encoding-request' => 'gzip' }
server_thread = Thread.new do
server_call = server_allows_client_to_proceed(md)
it 'unary calls work' do
run_services_on_server(@server, services: [EchoService]) do
call = @stub.an_rpc(EchoMsg.new, return_op: true)
expect(call.execute).to be_a(EchoMsg)
end
client_ops = {
CallOps::SEND_INITIAL_METADATA => md,
CallOps::SEND_MESSAGE => long_request_str,
CallOps::SEND_CLOSE_FROM_CLIENT => nil
}
client_batch = call.run_batch(client_ops)
expect(client_batch.send_metadata).to be true
expect(client_batch.send_message).to be true
expect(client_batch.send_close).to be true
# confirm the server can read the inbound message
server_thread.join
server_ops = {
CallOps::RECV_MESSAGE => nil
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.message).to eq(long_request_str)
server_ops = {
CallOps::RECV_CLOSE_ON_SERVER => nil,
CallOps::SEND_MESSAGE => long_response_str,
CallOps::SEND_STATUS_FROM_SERVER => ok_status
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.send_close).to be true
expect(server_batch.send_message).to be true
expect(server_batch.send_status).to be true
client_ops = {
CallOps::RECV_INITIAL_METADATA => nil,
CallOps::RECV_MESSAGE => nil,
CallOps::RECV_STATUS_ON_CLIENT => nil
}
final_client_batch = call.run_batch(client_ops)
expect(final_client_batch.metadata).to eq({})
expect(final_client_batch.message).to eq long_response_str
expect(final_client_batch.status.code).to eq(0)
end
it 'servers can ignore a client write and send a status' do
call = new_client_call
server_call = nil
server_thread = Thread.new do
server_call = server_allows_client_to_proceed
it 'unary calls work when enabling compression' do
run_services_on_server(@server, services: [EchoService]) do
long_request_str = '0' * 2000
md = { 'grpc-internal-encoding-request' => 'gzip' }
call = @stub.an_rpc(EchoMsg.new(msg: long_request_str),
return_op: true,
metadata: md)
response = call.execute
expect(response).to be_a(EchoMsg)
expect(response.msg).to eq(long_request_str)
end
client_ops = {
CallOps::SEND_INITIAL_METADATA => {},
CallOps::SEND_MESSAGE => sent_message,
CallOps::SEND_CLOSE_FROM_CLIENT => nil
}
client_batch = call.run_batch(client_ops)
expect(client_batch.send_metadata).to be true
expect(client_batch.send_message).to be true
expect(client_batch.send_close).to be true
# confirm the server can read the inbound message
the_status = Struct::Status.new(StatusCodes::OK, 'OK')
server_thread.join
server_ops = {
CallOps::SEND_STATUS_FROM_SERVER => the_status
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.message).to eq nil
expect(server_batch.send_status).to be true
final_client_batch = call.run_batch(
CallOps::RECV_INITIAL_METADATA => nil,
CallOps::RECV_STATUS_ON_CLIENT => nil)
expect(final_client_batch.metadata).to eq({})
expect(final_client_batch.status.code).to eq(0)
end
it 'completes calls by sending status to client and server' do
call = new_client_call
server_call = nil
server_thread = Thread.new do
server_call = server_allows_client_to_proceed
end
client_ops = {
CallOps::SEND_INITIAL_METADATA => {},
CallOps::SEND_MESSAGE => sent_message
}
client_batch = call.run_batch(client_ops)
expect(client_batch.send_metadata).to be true
expect(client_batch.send_message).to be true
# confirm the server can read the inbound message and respond
the_status = Struct::Status.new(StatusCodes::OK, 'OK', {})
server_thread.join
server_ops = {
CallOps::RECV_MESSAGE => nil
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.message).to eq sent_message
server_ops = {
CallOps::SEND_MESSAGE => reply_text,
CallOps::SEND_STATUS_FROM_SERVER => the_status
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.send_status).to be true
expect(server_batch.send_message).to be true
# confirm the client can receive the server response and status.
client_ops = {
CallOps::SEND_CLOSE_FROM_CLIENT => nil,
CallOps::RECV_INITIAL_METADATA => nil,
CallOps::RECV_MESSAGE => nil,
CallOps::RECV_STATUS_ON_CLIENT => nil
}
final_client_batch = call.run_batch(client_ops)
expect(final_client_batch.send_close).to be true
expect(final_client_batch.message).to eq reply_text
expect(final_client_batch.status).to eq the_status
# confirm the server can receive the client close.
server_ops = {
CallOps::RECV_CLOSE_ON_SERVER => nil
}
final_server_batch = server_call.run_batch(server_ops)
expect(final_server_batch.send_close).to be true
end
def client_cancel_test(cancel_proc, expected_code,
expected_details)
call = new_client_call
server_call = nil
server_thread = Thread.new do
server_call = server_allows_client_to_proceed
call = @stub.an_rpc(EchoMsg.new, return_op: true)
run_services_on_server(@server, services: [EchoService]) do
# start the call, but don't send a message yet
call.start_call
# cancel the call
cancel_proc.call(call)
# check the client's status
failed = false
begin
call.execute
rescue GRPC::BadStatus => e
failed = true
expect(e.code).to be expected_code
expect(e.details).to eq expected_details
end
expect(failed).to be(true)
end
client_ops = {
CallOps::SEND_INITIAL_METADATA => {},
CallOps::RECV_INITIAL_METADATA => nil
}
client_batch = call.run_batch(client_ops)
expect(client_batch.send_metadata).to be true
expect(client_batch.metadata).to eq({})
cancel_proc.call(call)
server_thread.join
server_ops = {
CallOps::RECV_CLOSE_ON_SERVER => nil
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.send_close).to be true
client_ops = {
CallOps::RECV_STATUS_ON_CLIENT => {}
}
client_batch = call.run_batch(client_ops)
expect(client_batch.status.code).to be expected_code
expect(client_batch.status.details).to eq expected_details
end
it 'clients can cancel a call on the server' do
@ -344,8 +89,6 @@ shared_examples 'basic GRPC message delivery is OK' do
end
shared_examples 'GRPC metadata delivery works OK' do
include_context 'setup: tags'
describe 'from client => server' do
before(:example) do
n = 7 # arbitrary number of metadata
@ -364,53 +107,31 @@ shared_examples 'GRPC metadata delivery works OK' do
it 'raises an exception if a metadata key is invalid' do
@bad_keys.each do |md|
call = new_client_call
client_ops = {
CallOps::SEND_INITIAL_METADATA => md
}
blk = proc do
call.run_batch(client_ops)
# NOTE: no need to run a server in this test b/c the failure
# happens while validating metadata to send.
failed = false
begin
@stub.an_rpc(EchoMsg.new, metadata: md)
rescue TypeError => e
failed = true
expect(e.message).to eq('grpc_rb_md_ary_fill_hash_cb: bad type for key parameter')
end
expect(&blk).to raise_error
expect(failed).to be(true)
end
end
it 'sends all the metadata pairs when keys and values are valid' do
@valid_metadata.each do |md|
recvd_rpc = nil
rcv_thread = Thread.new do
recvd_rpc = @server.request_call
service = EchoService.new
run_services_on_server(@server, services: [service]) do
@valid_metadata.each_with_index do |md, i|
expect(@stub.an_rpc(EchoMsg.new, metadata: md)).to be_a(EchoMsg)
# confirm the server can receive the client metadata
# finish the call
expect(service.received_md.length).to eq(i + 1)
md.each do |k, v|
expect(service.received_md[i][k.to_s]).to eq(v)
end
end
call = new_client_call
client_ops = {
CallOps::SEND_INITIAL_METADATA => md,
CallOps::SEND_CLOSE_FROM_CLIENT => nil
}
client_batch = call.run_batch(client_ops)
expect(client_batch.send_metadata).to be true
# confirm the server can receive the client metadata
rcv_thread.join
expect(recvd_rpc).to_not eq nil
recvd_md = recvd_rpc.metadata
replace_symbols = Hash[md.each_pair.collect { |x, y| [x.to_s, y] }]
expect(recvd_md).to eq(recvd_md.merge(replace_symbols))
# finish the call
final_server_batch = recvd_rpc.call.run_batch(
CallOps::RECV_CLOSE_ON_SERVER => nil,
CallOps::SEND_INITIAL_METADATA => nil,
CallOps::SEND_STATUS_FROM_SERVER => ok_status)
expect(final_server_batch.send_close).to be(true)
expect(final_server_batch.send_metadata).to be(true)
expect(final_server_batch.send_status).to be(true)
final_client_batch = call.run_batch(
CallOps::RECV_INITIAL_METADATA => nil,
CallOps::RECV_STATUS_ON_CLIENT => nil)
expect(final_client_batch.metadata).to eq({})
expect(final_client_batch.status.code).to eq(0)
end
end
end
@ -432,120 +153,61 @@ shared_examples 'GRPC metadata delivery works OK' do
end
it 'raises an exception if a metadata key is invalid' do
@bad_keys.each do |md|
recvd_rpc = nil
rcv_thread = Thread.new do
recvd_rpc = @server.request_call
end
call = new_client_call
# client signals that it's done sending metadata to allow server to
# respond
client_ops = {
CallOps::SEND_INITIAL_METADATA => nil
}
call.run_batch(client_ops)
# server gets the invocation
rcv_thread.join
expect(recvd_rpc).to_not eq nil
server_ops = {
CallOps::SEND_INITIAL_METADATA => md
}
blk = proc do
recvd_rpc.call.run_batch(server_ops)
service = EchoService.new
run_services_on_server(@server, services: [service]) do
@bad_keys.each do |md|
proceed = Queue.new
server_exception = nil
service.on_call_started = proc do |call|
call.send_initial_metadata(md)
rescue TypeError => e
server_exception = e
ensure
proceed.push(1)
end
client_exception = nil
client_call = @stub.an_rpc(EchoMsg.new, return_op: true)
thr = Thread.new do
client_call.execute
rescue GRPC::BadStatus => e
client_exception = e
end
proceed.pop
# TODO(apolcyn): we shouldn't need this cancel here. It's
# only currently needed b/c the server does not seem to properly
# terminate the RPC if it fails to send initial metadata. That
# should be fixed, in which case this cancellation can be removed.
client_call.cancel
thr.join
p client_exception
expect(client_exception.nil?).to be(false)
expect(server_exception.nil?).to be(false)
expect(server_exception.message).to eq(
'grpc_rb_md_ary_fill_hash_cb: bad type for key parameter')
end
expect(&blk).to raise_error
# cancel the call so the server can shut down immediately
call.cancel
end
end
it 'sends an empty hash if no metadata is added' do
recvd_rpc = nil
rcv_thread = Thread.new do
recvd_rpc = @server.request_call
run_services_on_server(@server, services: [EchoService]) do
call = @stub.an_rpc(EchoMsg.new, return_op: true)
expect(call.execute).to be_a(EchoMsg)
expect(call.metadata).to eq({})
end
call = new_client_call
# client signals that it's done sending metadata to allow server to
# respond
client_ops = {
CallOps::SEND_INITIAL_METADATA => nil,
CallOps::SEND_CLOSE_FROM_CLIENT => nil
}
client_batch = call.run_batch(client_ops)
expect(client_batch.send_metadata).to be true
expect(client_batch.send_close).to be true
# server gets the invocation but sends no metadata back
rcv_thread.join
expect(recvd_rpc).to_not eq nil
server_call = recvd_rpc.call
server_ops = {
# receive close and send status to finish the call
CallOps::RECV_CLOSE_ON_SERVER => nil,
CallOps::SEND_INITIAL_METADATA => nil,
CallOps::SEND_STATUS_FROM_SERVER => ok_status
}
srv_batch = server_call.run_batch(server_ops)
expect(srv_batch.send_close).to be true
expect(srv_batch.send_metadata).to be true
expect(srv_batch.send_status).to be true
# client receives nothing as expected
client_ops = {
CallOps::RECV_INITIAL_METADATA => nil,
# receive status to finish the call
CallOps::RECV_STATUS_ON_CLIENT => nil
}
final_client_batch = call.run_batch(client_ops)
expect(final_client_batch.metadata).to eq({})
expect(final_client_batch.status.code).to eq(0)
end
it 'sends all the pairs when keys and values are valid' do
@valid_metadata.each do |md|
recvd_rpc = nil
rcv_thread = Thread.new do
recvd_rpc = @server.request_call
service = EchoService.new
run_services_on_server(@server, services: [service]) do
@valid_metadata.each do |md|
service.on_call_started = proc do |call|
call.send_initial_metadata(md)
end
call = @stub.an_rpc(EchoMsg.new, return_op: true)
call.execute
replace_symbols = Hash[md.each_pair.collect { |x, y| [x.to_s, y] }]
expect(call.metadata).to eq(replace_symbols)
end
call = new_client_call
# client signals that it's done sending metadata to allow server to
# respond
client_ops = {
CallOps::SEND_INITIAL_METADATA => nil,
CallOps::SEND_CLOSE_FROM_CLIENT => nil
}
client_batch = call.run_batch(client_ops)
expect(client_batch.send_metadata).to be true
expect(client_batch.send_close).to be true
# server gets the invocation but sends no metadata back
rcv_thread.join
expect(recvd_rpc).to_not eq nil
server_call = recvd_rpc.call
server_ops = {
CallOps::RECV_CLOSE_ON_SERVER => nil,
CallOps::SEND_INITIAL_METADATA => md,
CallOps::SEND_STATUS_FROM_SERVER => ok_status
}
srv_batch = server_call.run_batch(server_ops)
expect(srv_batch.send_close).to be true
expect(srv_batch.send_metadata).to be true
expect(srv_batch.send_status).to be true
# client receives nothing as expected
client_ops = {
CallOps::RECV_INITIAL_METADATA => nil,
CallOps::RECV_STATUS_ON_CLIENT => nil
}
final_client_batch = call.run_batch(client_ops)
replace_symbols = Hash[md.each_pair.collect { |x, y| [x.to_s, y] }]
expect(final_client_batch.metadata).to eq(replace_symbols)
expect(final_client_batch.status.code).to eq(0)
end
end
end
@ -554,16 +216,11 @@ end
describe 'the http client/server' do
before(:example) do
server_host = '0.0.0.0:0'
@server = new_core_server_for_testing(nil)
@server = new_rpc_server_for_testing
server_port = @server.add_http2_port(server_host, :this_port_is_insecure)
@server.start
@ch = Channel.new("0.0.0.0:#{server_port}", nil, :this_channel_is_insecure)
end
after(:example) do
@ch.close
@server.shutdown_and_notify(deadline)
@server.close
@stub = EchoStub.new(
"0.0.0.0:#{server_port}", nil, channel_override: @ch)
end
it_behaves_like 'basic GRPC message delivery is OK' do
@ -574,8 +231,6 @@ describe 'the http client/server' do
end
describe 'the secure http client/server' do
include_context 'setup: tags'
def load_test_certs
test_root = File.join(File.dirname(__FILE__), 'testdata')
files = ['ca.pem', 'server1.key', 'server1.pem']
@ -587,17 +242,14 @@ describe 'the secure http client/server' do
server_host = '0.0.0.0:0'
server_creds = GRPC::Core::ServerCredentials.new(
nil, [{ private_key: certs[1], cert_chain: certs[2] }], false)
@server = new_core_server_for_testing(nil)
@server = new_rpc_server_for_testing
server_port = @server.add_http2_port(server_host, server_creds)
@server.start
args = { Channel::SSL_TARGET => 'foo.test.google.fr' }
@ch = Channel.new("0.0.0.0:#{server_port}", args,
GRPC::Core::ChannelCredentials.new(certs[0], nil, nil))
end
after(:example) do
@server.shutdown_and_notify(deadline)
@server.close
@ch = Channel.new(
"0.0.0.0:#{server_port}", args,
GRPC::Core::ChannelCredentials.new(certs[0], nil, nil))
@stub = EchoStub.new(
"0.0.0.0:#{server_port}", nil, channel_override: @ch)
end
it_behaves_like 'basic GRPC message delivery is OK' do
@ -606,59 +258,25 @@ describe 'the secure http client/server' do
it_behaves_like 'GRPC metadata delivery works OK' do
end
def credentials_update_test(creds_update_md)
auth_proc = proc { creds_update_md }
it 'modifies metadata with CallCredentials' do
# create call creds
auth_proc = proc { { 'k1' => 'v1' } }
call_creds = GRPC::Core::CallCredentials.new(auth_proc)
initial_md_key = 'k2'
initial_md_val = 'v2'
initial_md = { initial_md_key => initial_md_val }
expected_md = creds_update_md.clone
fail 'bad test param' unless expected_md[initial_md_key].nil?
expected_md[initial_md_key] = initial_md_val
recvd_rpc = nil
rcv_thread = Thread.new do
recvd_rpc = @server.request_call
# create arbitrary custom metadata
custom_md = { 'k2' => 'v2' }
# perform an RPC
echo_service = EchoService.new
run_services_on_server(@server, services: [echo_service]) do
expect(@stub.an_rpc(EchoMsg.new,
credentials: call_creds,
metadata: custom_md)).to be_a(EchoMsg)
end
# call creds metadata should be merged with custom MD
expect(echo_service.received_md.length).to eq(1)
expected_md = { 'k1' => 'v1', 'k2' => 'v2' }
expected_md.each do |k, v|
expect(echo_service.received_md[0][k]).to eq(v)
end
call = new_client_call
call.set_credentials! call_creds
client_batch = call.run_batch(
CallOps::SEND_INITIAL_METADATA => initial_md,
CallOps::SEND_CLOSE_FROM_CLIENT => nil)
expect(client_batch.send_metadata).to be true
expect(client_batch.send_close).to be true
# confirm the server can receive the client metadata
rcv_thread.join
expect(recvd_rpc).to_not eq nil
recvd_md = recvd_rpc.metadata
replace_symbols = Hash[expected_md.each_pair.collect { |x, y| [x.to_s, y] }]
expect(recvd_md).to eq(recvd_md.merge(replace_symbols))
credentials_update_test_finish_call(call, recvd_rpc.call)
end
def credentials_update_test_finish_call(client_call, server_call)
final_server_batch = server_call.run_batch(
CallOps::RECV_CLOSE_ON_SERVER => nil,
CallOps::SEND_INITIAL_METADATA => nil,
CallOps::SEND_STATUS_FROM_SERVER => ok_status)
expect(final_server_batch.send_close).to be(true)
expect(final_server_batch.send_metadata).to be(true)
expect(final_server_batch.send_status).to be(true)
final_client_batch = client_call.run_batch(
CallOps::RECV_INITIAL_METADATA => nil,
CallOps::RECV_STATUS_ON_CLIENT => nil)
expect(final_client_batch.metadata).to eq({})
expect(final_client_batch.status.code).to eq(0)
end
it 'modifies metadata with CallCredentials' do
credentials_update_test('k1' => 'updated-v1')
end
it 'modifies large metadata with CallCredentials' do
@ -666,11 +284,34 @@ describe 'the secure http client/server' do
'00000000000000000000000000000000000000000000000000000000000000',
'11111111111111111111111111111111111111111111111111111111111111',
)
md = {
k3: val_array,
k4: '0000000000000000000000000000000000000000000000000000000000',
keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey5: 'v1'
# create call creds
auth_proc = proc do
{
k2: val_array,
k3: '0000000000000000000000000000000000000000000000000000000000',
keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey4: 'v4'
}
end
call_creds = GRPC::Core::CallCredentials.new(auth_proc)
# create arbitrary custom metadata
custom_md = { k1: 'v1' }
# perform an RPC
echo_service = EchoService.new
run_services_on_server(@server, services: [echo_service]) do
expect(@stub.an_rpc(EchoMsg.new,
credentials: call_creds,
metadata: custom_md)).to be_a(EchoMsg)
end
# call creds metadata should be merged with custom MD
expect(echo_service.received_md.length).to eq(1)
expected_md = {
k1: 'v1',
k2: val_array,
k3: '0000000000000000000000000000000000000000000000000000000000',
keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey4: 'v4'
}
credentials_update_test(md)
expected_md.each do |k, v|
expect(echo_service.received_md[0][k.to_s]).to eq(v)
end
end
end

@ -55,17 +55,20 @@ describe GRPC::ActiveCall do
end
@ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil,
:this_channel_is_insecure)
@call = make_test_call
end
after(:each) do
@server.shutdown_and_notify(deadline)
@server.close
@server_thread.join
# Don't rely on GC to unref the call, since that can prevent
# the channel connectivity state polling thread from shutting down.
@call.close
end
describe 'restricted view methods' do
before(:each) do
@call = make_test_call
ActiveCall.client_invoke(@call)
@client_call = ActiveCall.new(@call, @pass_through,
@pass_through, deadline)
@ -117,9 +120,8 @@ describe GRPC::ActiveCall do
describe '#remote_send' do
it 'allows a client to send a payload to the server', test: true do
call = make_test_call
ActiveCall.client_invoke(call)
client_call = ActiveCall.new(call, @pass_through,
ActiveCall.client_invoke(@call)
client_call = ActiveCall.new(@call, @pass_through,
@pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
@ -137,15 +139,14 @@ describe GRPC::ActiveCall do
expect(server_call.remote_read).to eq(msg)
# finish the call
server_call.send_initial_metadata
call.run_batch(CallOps::RECV_INITIAL_METADATA => nil)
send_and_receive_close_and_status(call, recvd_call)
@call.run_batch(CallOps::RECV_INITIAL_METADATA => nil)
send_and_receive_close_and_status(@call, recvd_call)
end
it 'marshals the payload using the marshal func' do
call = make_test_call
ActiveCall.client_invoke(call)
ActiveCall.client_invoke(@call)
marshal = proc { |x| 'marshalled:' + x }
client_call = ActiveCall.new(call, marshal, @pass_through, deadline)
client_call = ActiveCall.new(@call, marshal, @pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
@ -161,23 +162,22 @@ describe GRPC::ActiveCall do
metadata_received: true)
expect(server_call.remote_read).to eq('marshalled:' + msg)
# finish the call
call.run_batch(CallOps::RECV_INITIAL_METADATA => nil)
send_and_receive_close_and_status(call, recvd_call)
@call.run_batch(CallOps::RECV_INITIAL_METADATA => nil)
send_and_receive_close_and_status(@call, recvd_call)
end
TEST_WRITE_FLAGS = [WriteFlags::BUFFER_HINT, WriteFlags::NO_COMPRESS]
TEST_WRITE_FLAGS.each do |f|
it "successfully makes calls with write_flag set to #{f}" do
call = make_test_call
ActiveCall.client_invoke(call)
ActiveCall.client_invoke(@call)
marshal = proc { |x| 'marshalled:' + x }
client_call = ActiveCall.new(call, marshal,
client_call = ActiveCall.new(@call, marshal,
@pass_through, deadline)
msg = 'message is a string'
client_call.write_flag = f
client_call.remote_send(msg)
# flush the message in case writes are set to buffered
call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) if f == 1
@call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) if f == 1
# confirm that the message was marshalled
recvd_rpc = @received_rpcs_queue.pop
@ -199,9 +199,8 @@ describe GRPC::ActiveCall do
describe 'sending initial metadata', send_initial_metadata: true do
it 'sends metadata before sending a message if it hasnt been sent yet' do
call = make_test_call
@client_call = ActiveCall.new(
call,
@call,
@pass_through,
@pass_through,
deadline,
@ -213,13 +212,13 @@ describe GRPC::ActiveCall do
message = 'phony message'
expect(call).to(
expect(@call).to(
receive(:run_batch)
.with(
hash_including(
CallOps::SEND_INITIAL_METADATA => metadata)).once)
expect(call).to(
expect(@call).to(
receive(:run_batch).with(hash_including(
CallOps::SEND_MESSAGE => message)).once)
@client_call.remote_send(message)
@ -228,14 +227,12 @@ describe GRPC::ActiveCall do
end
it 'doesnt send metadata if it thinks its already been sent' do
call = make_test_call
@client_call = ActiveCall.new(call,
@client_call = ActiveCall.new(@call,
@pass_through,
@pass_through,
deadline)
expect(@client_call.metadata_sent).to eql(true)
expect(call).to(
expect(@call).to(
receive(:run_batch).with(hash_including(
CallOps::SEND_INITIAL_METADATA)).never)
@ -243,9 +240,7 @@ describe GRPC::ActiveCall do
end
it 'sends metadata if it is explicitly sent and ok to do so' do
call = make_test_call
@client_call = ActiveCall.new(call,
@client_call = ActiveCall.new(@call,
@pass_through,
@pass_through,
deadline,
@ -257,7 +252,7 @@ describe GRPC::ActiveCall do
@client_call.merge_metadata_to_send(metadata)
expect(@client_call.metadata_to_send).to eq(metadata)
expect(call).to(
expect(@call).to(
receive(:run_batch).with(hash_including(
CallOps::SEND_INITIAL_METADATA =>
metadata)).once)
@ -265,9 +260,7 @@ describe GRPC::ActiveCall do
end
it 'explicit sending does nothing if metadata has already been sent' do
call = make_test_call
@client_call = ActiveCall.new(call,
@client_call = ActiveCall.new(@call,
@pass_through,
@pass_through,
deadline)
@ -284,7 +277,6 @@ describe GRPC::ActiveCall do
describe '#merge_metadata_to_send', merge_metadata_to_send: true do
it 'adds to existing metadata when there is existing metadata to send' do
call = make_test_call
starting_metadata = {
k1: 'key1_val',
k2: 'key2_val',
@ -292,7 +284,7 @@ describe GRPC::ActiveCall do
}
@client_call = ActiveCall.new(
call,
@call,
@pass_through, @pass_through,
deadline,
started: false,
@ -318,9 +310,8 @@ describe GRPC::ActiveCall do
end
it 'fails when initial metadata has already been sent' do
call = make_test_call
@client_call = ActiveCall.new(
call,
@call,
@pass_through,
@pass_through,
deadline,
@ -338,9 +329,8 @@ describe GRPC::ActiveCall do
describe '#client_invoke' do
it 'sends metadata to the server when present' do
call = make_test_call
metadata = { k1: 'v1', k2: 'v2' }
ActiveCall.client_invoke(call, metadata)
ActiveCall.client_invoke(@call, metadata)
recvd_rpc = @received_rpcs_queue.pop
recvd_call = recvd_rpc.call
expect(recvd_call).to_not be_nil
@ -349,15 +339,14 @@ describe GRPC::ActiveCall do
expect(recvd_rpc.metadata['k2']).to eq('v2')
# finish the call
recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => {})
call.run_batch(CallOps::RECV_INITIAL_METADATA => nil)
send_and_receive_close_and_status(call, recvd_call)
@call.run_batch(CallOps::RECV_INITIAL_METADATA => nil)
send_and_receive_close_and_status(@call, recvd_call)
end
end
describe '#send_status', send_status: true do
it 'works when no metadata or messages have been sent yet' do
call = make_test_call
ActiveCall.client_invoke(call)
ActiveCall.client_invoke(@call)
recvd_rpc = @received_rpcs_queue.pop
server_call = ActiveCall.new(
@ -375,9 +364,8 @@ describe GRPC::ActiveCall do
describe '#remote_read', remote_read: true do
it 'reads the response sent by a server' do
call = make_test_call
ActiveCall.client_invoke(call)
client_call = ActiveCall.new(call, @pass_through,
ActiveCall.client_invoke(@call)
client_call = ActiveCall.new(@call, @pass_through,
@pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
@ -385,13 +373,12 @@ describe GRPC::ActiveCall do
server_call.remote_send('server_response')
expect(client_call.remote_read).to eq('server_response')
send_and_receive_close_and_status(
call, inner_call_of_active_call(server_call))
@call, inner_call_of_active_call(server_call))
end
it 'saves no metadata when the server adds no metadata' do
call = make_test_call
ActiveCall.client_invoke(call)
client_call = ActiveCall.new(call, @pass_through,
ActiveCall.client_invoke(@call)
client_call = ActiveCall.new(@call, @pass_through,
@pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
@ -401,13 +388,12 @@ describe GRPC::ActiveCall do
client_call.remote_read
expect(client_call.metadata).to eq({})
send_and_receive_close_and_status(
call, inner_call_of_active_call(server_call))
@call, inner_call_of_active_call(server_call))
end
it 'saves metadata add by the server' do
call = make_test_call
ActiveCall.client_invoke(call)
client_call = ActiveCall.new(call, @pass_through,
ActiveCall.client_invoke(@call)
client_call = ActiveCall.new(@call, @pass_through,
@pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
@ -418,12 +404,11 @@ describe GRPC::ActiveCall do
expected = { 'k1' => 'v1', 'k2' => 'v2' }
expect(client_call.metadata).to eq(expected)
send_and_receive_close_and_status(
call, inner_call_of_active_call(server_call))
@call, inner_call_of_active_call(server_call))
end
it 'get a status from server when nothing else sent from server' do
client_call = make_test_call
ActiveCall.client_invoke(client_call)
ActiveCall.client_invoke(@call)
recvd_rpc = @received_rpcs_queue.pop
recvd_call = recvd_rpc.call
@ -438,22 +423,21 @@ describe GRPC::ActiveCall do
server_call.send_status(OK, 'OK')
# Check that we can receive initial metadata and a status
client_call.run_batch(
@call.run_batch(
CallOps::RECV_INITIAL_METADATA => nil)
batch_result = client_call.run_batch(
batch_result = @call.run_batch(
CallOps::RECV_STATUS_ON_CLIENT => nil)
expect(batch_result.status.code).to eq(OK)
end
it 'get a nil msg before a status when an OK status is sent' do
call = make_test_call
ActiveCall.client_invoke(call)
client_call = ActiveCall.new(call, @pass_through,
ActiveCall.client_invoke(@call)
client_call = ActiveCall.new(@call, @pass_through,
@pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
@call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
server_call = expect_server_to_receive(msg)
server_call.remote_send('server_response')
server_call.send_status(OK, 'OK')
@ -463,10 +447,9 @@ describe GRPC::ActiveCall do
end
it 'unmarshals the response using the unmarshal func' do
call = make_test_call
ActiveCall.client_invoke(call)
ActiveCall.client_invoke(@call)
unmarshal = proc { |x| 'unmarshalled:' + x }
client_call = ActiveCall.new(call, @pass_through,
client_call = ActiveCall.new(@call, @pass_through,
unmarshal, deadline)
# confirm the client receives the unmarshalled message
@ -476,14 +459,13 @@ describe GRPC::ActiveCall do
server_call.remote_send('server_response')
expect(client_call.remote_read).to eq('unmarshalled:server_response')
send_and_receive_close_and_status(
call, inner_call_of_active_call(server_call))
@call, inner_call_of_active_call(server_call))
end
end
describe '#each_remote_read' do
it 'creates an Enumerator' do
call = make_test_call
client_call = ActiveCall.new(call, @pass_through,
client_call = ActiveCall.new(@call, @pass_through,
@pass_through, deadline)
expect(client_call.each_remote_read).to be_a(Enumerator)
# finish the call
@ -491,9 +473,8 @@ describe GRPC::ActiveCall do
end
it 'the returned enumerator can read n responses' do
call = make_test_call
ActiveCall.client_invoke(call)
client_call = ActiveCall.new(call, @pass_through,
ActiveCall.client_invoke(@call)
client_call = ActiveCall.new(@call, @pass_through,
@pass_through, deadline)
msg = 'message is a string'
reply = 'server_response'
@ -506,18 +487,17 @@ describe GRPC::ActiveCall do
expect(e.next).to eq(reply)
end
send_and_receive_close_and_status(
call, inner_call_of_active_call(server_call))
@call, inner_call_of_active_call(server_call))
end
it 'the returns an enumerator that stops after an OK Status' do
call = make_test_call
ActiveCall.client_invoke(call)
client_call = ActiveCall.new(call, @pass_through,
ActiveCall.client_invoke(@call)
client_call = ActiveCall.new(@call, @pass_through,
@pass_through, deadline)
msg = 'message is a string'
reply = 'server_response'
client_call.remote_send(msg)
call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
@call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
server_call = expect_server_to_receive(msg)
e = client_call.each_remote_read
n = 3 # arbitrary value > 1
@ -532,14 +512,13 @@ describe GRPC::ActiveCall do
describe '#closing the call from the client' do
it 'finishes ok if the server sends a status response' do
call = make_test_call
ActiveCall.client_invoke(call)
client_call = ActiveCall.new(call, @pass_through,
ActiveCall.client_invoke(@call)
client_call = ActiveCall.new(@call, @pass_through,
@pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
expect do
call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
@call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
end.to_not raise_error
server_call = expect_server_to_receive(msg)
server_call.remote_send('server_response')
@ -549,9 +528,8 @@ describe GRPC::ActiveCall do
end
it 'finishes ok if the server sends an early status response' do
call = make_test_call
ActiveCall.client_invoke(call)
client_call = ActiveCall.new(call, @pass_through,
ActiveCall.client_invoke(@call)
client_call = ActiveCall.new(@call, @pass_through,
@pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
@ -560,15 +538,14 @@ describe GRPC::ActiveCall do
server_call.send_status(OK, 'status code is OK')
expect(client_call.remote_read).to eq('server_response')
expect do
call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
@call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
end.to_not raise_error
expect { client_call.receive_and_check_status }.to_not raise_error
end
it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do
call = make_test_call
ActiveCall.client_invoke(call)
client_call = ActiveCall.new(call, @pass_through,
ActiveCall.client_invoke(@call)
client_call = ActiveCall.new(@call, @pass_through,
@pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
@ -577,7 +554,7 @@ describe GRPC::ActiveCall do
server_call.send_status(OK, 'status code is OK')
expect(client_call.remote_read).to eq('server_response')
expect do
call.run_batch(
@call.run_batch(
CallOps::SEND_CLOSE_FROM_CLIENT => nil,
CallOps::RECV_STATUS_ON_CLIENT => nil)
end.to_not raise_error
@ -631,6 +608,7 @@ describe GRPC::ActiveCall do
batch_result = @client_call.run_batch(
CallOps::RECV_STATUS_ON_CLIENT => nil)
expect(batch_result.status.code).to eq(@server_status)
@client_call.close
end
it 'sends the initial metadata implicitly if not already sent' do

@ -41,14 +41,17 @@ class EchoService
rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)
rpc :a_client_streaming_rpc_unimplemented, stream(EchoMsg), EchoMsg
attr_reader :received_md
attr_accessor :on_call_started
def initialize(**kw)
@trailing_metadata = kw
@received_md = []
@on_call_started = nil
end
def an_rpc(req, call)
GRPC.logger.info('echo service received a request')
on_call_started&.call(call)
call.output_metadata.update(@trailing_metadata)
@received_md << call.metadata unless call.metadata.nil?
req

Loading…
Cancel
Save