@ -36,6 +36,53 @@ include GRPC::Core::StatusCodes
include GRPC :: Core :: TimeConsts
include GRPC :: Core :: CallOps
# check that methods on a finished/closed call t crash
def check_op_view_of_finished_client_call ( op_view ,
expected_metadata ,
expected_trailing_metadata )
# use read_response_stream to try to iterate through
# possible response stream
fail ( 'need something to attempt reads' ) unless block_given?
expect do
resp = op_view . execute
yield resp
end . to raise_error ( GRPC :: Core :: CallError )
expect { op_view . start_call } . to raise_error ( RuntimeError )
sanity_check_values_of_accessors ( op_view ,
expected_metadata ,
expected_trailing_metadata )
expect do
op_view . wait
op_view . cancel
op_view . write_flag = 1
end . to_not raise_error
end
def sanity_check_values_of_accessors ( op_view ,
expected_metadata ,
expected_trailing_metadata )
expected_status = Struct :: Status . new
expected_status . code = 0
expected_status . details = 'OK'
expected_status . metadata = expected_trailing_metadata
expect ( op_view . status ) . to eq ( expected_status )
expect ( op_view . metadata ) . to eq ( expected_metadata )
expect ( op_view . trailing_metadata ) . to eq ( expected_trailing_metadata )
expect ( op_view . cancelled? ) . to be ( false )
expect ( op_view . write_flag ) . to be ( nil )
# The deadline attribute of a call can be either
# a GRPC::Core::TimeSpec or a Time, which are mutually exclusive.
# TODO: fix so that the accessor always returns the same type.
expect ( op_view . deadline . is_a? ( GRPC :: Core :: TimeSpec ) ||
op_view . deadline . is_a? ( Time ) ) . to be ( true )
end
describe 'ClientStub' do
let ( :noop ) { proc { | x | x } }
@ -45,6 +92,7 @@ describe 'ClientStub' do
@method = 'an_rpc_method'
@pass = OK
@fail = INTERNAL
@metadata = { k1 : 'v1' , k2 : 'v2' }
end
after ( :each ) do
@ -107,7 +155,7 @@ describe 'ClientStub' do
end
end
describe '#request_response' do
describe '#request_response' , request_response : true do
before ( :each ) do
@sent_msg , @resp = 'a_msg' , 'a_reply'
end
@ -126,7 +174,7 @@ describe 'ClientStub' do
server_port = create_test_server
host = " localhost: #{ server_port } "
th = run_request_response ( @sent_msg , @resp , @pass ,
k1 : 'v1' , k2 : 'v2' )
expected_metadata : { k1 : 'v1' , k2 : 'v2' } )
stub = GRPC :: ClientStub . new ( host , :this_channel_is_insecure )
expect ( get_response ( stub ) ) . to eq ( @resp )
th . join
@ -187,13 +235,24 @@ describe 'ClientStub' do
# Kill the server thread so tests can complete
th . kill
end
it 'should raise ArgumentError if metadata contains invalid values' do
@metadata . merge! ( k3 : 3 )
server_port = create_test_server
host = " localhost: #{ server_port } "
stub = GRPC :: ClientStub . new ( host , :this_channel_is_insecure )
expect do
get_response ( stub )
end . to raise_error ( ArgumentError ,
/ Header values must be of type string or array / )
end
end
describe 'without a call operation' do
def get_response ( stub , credentials : nil )
puts credentials . inspect
stub . request_response ( @method , @sent_msg , noop , noop ,
metadata : { k1 : 'v1' , k2 : 'v2' } ,
metadata : @metadata ,
credentials : credentials )
end
@ -201,40 +260,62 @@ describe 'ClientStub' do
end
describe 'via a call operation' do
after ( :each ) do
# make sure op.wait doesn't hang, even if there's a bad status
@op . wait
end
def get_response ( stub , run_start_call_first : false , credentials : nil )
op = stub . request_response ( @method , @sent_msg , noop , noop ,
return_op : true ,
metadata : { k1 : 'v1' , k2 : 'v2' } ,
deadline : from_relative_time ( 2 ) ,
credentials : credentials )
expect ( op ) . to be_a ( GRPC :: ActiveCall :: Operation )
op . start_call if run_start_call_first
result = op . execute
op . wait # make sure wait doesn't hang
@op = stub . request_response ( @method , @sent_msg , noop , noop ,
return_op : true ,
metadata : @metadata ,
deadline : from_relative_time ( 2 ) ,
credentials : credentials )
expect ( @op ) . to be_a ( GRPC :: ActiveCall :: Operation )
@op . start_call if run_start_call_first
result = @op . execute
result
end
it_behaves_like 'request response'
it 'sends metadata to the server ok when running start_call first' do
def run_op_view_metadata_test ( run_start_call_first )
server_port = create_test_server
host = " localhost: #{ server_port } "
th = run_request_response ( @sent_msg , @resp , @pass ,
k1 : 'v1' , k2 : 'v2' )
@server_initial_md = { 'sk1' = > 'sv1' , 'sk2' = > 'sv2' }
@server_trailing_md = { 'tk1' = > 'tv1' , 'tk2' = > 'tv2' }
th = run_request_response (
@sent_msg , @resp , @pass ,
expected_metadata : @metadata ,
server_initial_md : @server_initial_md ,
server_trailing_md : @server_trailing_md )
stub = GRPC :: ClientStub . new ( host , :this_channel_is_insecure )
expect ( get_response ( stub ) ) . to eq ( @resp )
expect (
get_response ( stub ,
run_start_call_first : run_start_call_first ) ) . to eq ( @resp )
th . join
end
it 'sends metadata to the server ok when running start_call first' do
run_op_view_metadata_test ( true )
check_op_view_of_finished_client_call (
@op , @server_initial_md , @server_trailing_md ) { | r | p r }
end
it 'does not crash when used after the call has been finished' do
run_op_view_metadata_test ( false )
check_op_view_of_finished_client_call (
@op , @server_initial_md , @server_trailing_md ) { | r | p r }
end
end
end
describe '#client_streamer' do
describe '#client_streamer' , client_streamer : true do
before ( :each ) do
Thread . abort_on_exception = true
server_port = create_test_server
host = " localhost: #{ server_port } "
@stub = GRPC :: ClientStub . new ( host , :this_channel_is_insecure )
@metadata = { k1 : 'v1' , k2 : 'v2' }
@sent_msgs = Array . new ( 3 ) { | i | 'msg_' + ( i + 1 ) . to_s }
@resp = 'a_reply'
end
@ -247,7 +328,8 @@ describe 'ClientStub' do
end
it 'should send metadata to the server ok' do
th = run_client_streamer ( @sent_msgs , @resp , @pass , ** @metadata )
th = run_client_streamer ( @sent_msgs , @resp , @pass ,
expected_metadata : @metadata )
expect ( get_response ( @stub ) ) . to eq ( @resp )
th . join
end
@ -278,27 +360,50 @@ describe 'ClientStub' do
end
describe 'via a call operation' do
after ( :each ) do
# make sure op.wait doesn't hang, even if there's a bad status
@op . wait
end
def get_response ( stub , run_start_call_first : false )
op = stub . client_streamer ( @method , @sent_msgs , noop , noop ,
return_op : true , metadata : @metadata )
expect ( op ) . to be_a ( GRPC :: ActiveCall :: Operation )
op . start_call if run_start_call_first
result = op . execute
op . wait # make sure wait doesn't hang
@op = stub . client_streamer ( @method , @sent_msgs , noop , noop ,
return_op : true , metadata : @metadata )
expect ( @op ) . to be_a ( GRPC :: ActiveCall :: Operation )
@op . start_call if run_start_call_first
result = @op . execute
result
end
it_behaves_like 'client streaming'
it 'sends metadata to the server ok when running start_call first' do
th = run_client_streamer ( @sent_msgs , @resp , @pass , ** @metadata )
expect ( get_response ( @stub , run_start_call_first : true ) ) . to eq ( @resp )
def run_op_view_metadata_test ( run_start_call_first )
@server_initial_md = { 'sk1' = > 'sv1' , 'sk2' = > 'sv2' }
@server_trailing_md = { 'tk1' = > 'tv1' , 'tk2' = > 'tv2' }
th = run_client_streamer (
@sent_msgs , @resp , @pass ,
expected_metadata : @metadata ,
server_initial_md : @server_initial_md ,
server_trailing_md : @server_trailing_md )
expect (
get_response ( @stub ,
run_start_call_first : run_start_call_first ) ) . to eq ( @resp )
th . join
end
it 'sends metadata to the server ok when running start_call first' do
run_op_view_metadata_test ( true )
check_op_view_of_finished_client_call (
@op , @server_initial_md , @server_trailing_md ) { | r | p r }
end
it 'does not crash when used after the call has been finished' do
run_op_view_metadata_test ( false )
check_op_view_of_finished_client_call (
@op , @server_initial_md , @server_trailing_md ) { | r | p r }
end
end
end
describe '#server_streamer' do
describe '#server_streamer' , server_streamer : true do
before ( :each ) do
@sent_msg = 'a_msg'
@replys = Array . new ( 3 ) { | i | 'reply_' + ( i + 1 ) . to_s }
@ -328,18 +433,42 @@ describe 'ClientStub' do
server_port = create_test_server
host = " localhost: #{ server_port } "
th = run_server_streamer ( @sent_msg , @replys , @fail ,
k1 : 'v1' , k2 : 'v2' )
expected_metadata : { k1 : 'v1' , k2 : 'v2' } )
stub = GRPC :: ClientStub . new ( host , :this_channel_is_insecure )
e = get_responses ( stub )
expect { e . collect { | r | r } } . to raise_error ( GRPC :: BadStatus )
th . join
end
it 'should raise ArgumentError if metadata contains invalid values' do
@metadata . merge! ( k3 : 3 )
server_port = create_test_server
host = " localhost: #{ server_port } "
stub = GRPC :: ClientStub . new ( host , :this_channel_is_insecure )
expect do
get_responses ( stub )
end . to raise_error ( ArgumentError ,
/ Header values must be of type string or array / )
end
it 'the call terminates when there is an unmarshalling error' do
server_port = create_test_server
host = " localhost: #{ server_port } "
th = run_server_streamer ( @sent_msg , @replys , @pass )
stub = GRPC :: ClientStub . new ( host , :this_channel_is_insecure )
unmarshal = proc { fail ( ArgumentError , 'test unmarshalling error' ) }
expect do
get_responses ( stub , unmarshal : unmarshal ) . collect { | r | r }
end . to raise_error ( ArgumentError , 'test unmarshalling error' )
th . join
end
end
describe 'without a call operation' do
def get_responses ( stub )
e = stub . server_streamer ( @method , @sent_msg , noop , noop ,
metadata : { k1 : 'v1' , k2 : 'v2' } )
def get_responses ( stub , unmarshal : noop )
e = stub . server_streamer ( @method , @sent_msg , noop , unmarshal ,
metadata : @metadata )
expect ( e ) . to be_a ( Enumerator )
e
end
@ -351,10 +480,10 @@ describe 'ClientStub' do
after ( :each ) do
@op . wait # make sure wait doesn't hang
end
def get_responses ( stub , run_start_call_first : false )
@op = stub . server_streamer ( @method , @sent_msg , noop , noop ,
def get_responses ( stub , run_start_call_first : false , unmarshal : noop )
@op = stub . server_streamer ( @method , @sent_msg , noop , unmarshal ,
return_op : true ,
metadata : { k1 : 'v1' , k2 : 'v2' } )
metadata : @metadata )
expect ( @op ) . to be_a ( GRPC :: ActiveCall :: Operation )
@op . start_call if run_start_call_first
e = @op . execute
@ -364,20 +493,41 @@ describe 'ClientStub' do
it_behaves_like 'server streaming'
it 'should send metadata to the server ok when start_call is run first' do
def run_op_view_metadata_test ( run_start_call_first )
server_port = create_test_server
host = " localhost: #{ server_port } "
th = run_server_streamer ( @sent_msg , @replys , @fail ,
k1 : 'v1' , k2 : 'v2' )
@server_initial_md = { 'sk1' = > 'sv1' , 'sk2' = > 'sv2' }
@server_trailing_md = { 'tk1' = > 'tv1' , 'tk2' = > 'tv2' }
th = run_server_streamer (
@sent_msg , @replys , @pass ,
expected_metadata : @metadata ,
server_initial_md : @server_initial_md ,
server_trailing_md : @server_trailing_md )
stub = GRPC :: ClientStub . new ( host , :this_channel_is_insecure )
e = get_responses ( stub , run_start_call_first : true )
expect { e . collect { | r | r } } . to raise_error ( GRPC :: BadStatus )
e = get_responses ( stub , run_start_call_first : run_start_call_first )
expect ( e . collect { | r | r } ) . to eq ( @reply s)
th . join
end
it 'should send metadata to the server ok when start_call is run first' do
run_op_view_metadata_test ( true )
check_op_view_of_finished_client_call (
@op , @server_initial_md , @server_trailing_md ) do | responses |
responses . each { | r | p r }
end
end
it 'does not crash when used after the call has been finished' do
run_op_view_metadata_test ( false )
check_op_view_of_finished_client_call (
@op , @server_initial_md , @server_trailing_md ) do | responses |
responses . each { | r | p r }
end
end
end
end
describe '#bidi_streamer' do
describe '#bidi_streamer' , bidi : true do
before ( :each ) do
@sent_msgs = Array . new ( 3 ) { | i | 'msg_' + ( i + 1 ) . to_s }
@replys = Array . new ( 3 ) { | i | 'reply_' + ( i + 1 ) . to_s }
@ -386,7 +536,7 @@ describe 'ClientStub' do
end
shared_examples 'bidi streaming' do
it 'supports sending all the requests first' , bidi : true do
it 'supports sending all the requests first' do
th = run_bidi_streamer_handle_inputs_first ( @sent_msgs , @replys ,
@pass )
stub = GRPC :: ClientStub . new ( @host , :this_channel_is_insecure )
@ -395,7 +545,7 @@ describe 'ClientStub' do
th . join
end
it 'supports client-initiated ping pong' , bidi : true do
it 'supports client-initiated ping pong' do
th = run_bidi_streamer_echo_ping_pong ( @sent_msgs , @pass , true )
stub = GRPC :: ClientStub . new ( @host , :this_channel_is_insecure )
e = get_responses ( stub )
@ -403,18 +553,39 @@ describe 'ClientStub' do
th . join
end
it 'supports a server-initiated ping pong' , bidi : true do
it 'supports a server-initiated ping pong' do
th = run_bidi_streamer_echo_ping_pong ( @sent_msgs , @pass , false )
stub = GRPC :: ClientStub . new ( @host , :this_channel_is_insecure )
e = get_responses ( stub )
expect ( e . collect { | r | r } ) . to eq ( @sent_msgs )
th . join
end
it 'should raise an error if the status is not ok' do
th = run_bidi_streamer_echo_ping_pong ( @sent_msgs , @fail , false )
stub = GRPC :: ClientStub . new ( @host , :this_channel_is_insecure )
e = get_responses ( stub )
expect { e . collect { | r | r } } . to raise_error ( GRPC :: BadStatus )
th . join
end
# TODO: add test for metadata-related ArgumentError in a bidi call once
# issue mentioned in https://github.com/grpc/grpc/issues/10526 is fixed
it 'should send metadata to the server ok' do
th = run_bidi_streamer_echo_ping_pong ( @sent_msgs , @pass , true ,
expected_metadata : @metadata )
stub = GRPC :: ClientStub . new ( @host , :this_channel_is_insecure )
e = get_responses ( stub )
expect ( e . collect { | r | r } ) . to eq ( @sent_msgs )
th . join
end
end
describe 'without a call operation' do
def get_responses ( stub )
e = stub . bidi_streamer ( @method , @sent_msgs , noop , noop )
e = stub . bidi_streamer ( @method , @sent_msgs , noop , noop ,
metadata : @metadata )
expect ( e ) . to be_a ( Enumerator )
e
end
@ -428,7 +599,8 @@ describe 'ClientStub' do
end
def get_responses ( stub , run_start_call_first : false )
@op = stub . bidi_streamer ( @method , @sent_msgs , noop , noop ,
return_op : true )
return_op : true ,
metadata : @metadata )
expect ( @op ) . to be_a ( GRPC :: ActiveCall :: Operation )
@op . start_call if run_start_call_first
e = @op . execute
@ -438,27 +610,53 @@ describe 'ClientStub' do
it_behaves_like 'bidi streaming'
it 'can run start_call before executing the call' do
th = run_bidi_streamer_handle_inputs_first ( @sent_msgs , @replys ,
@pass )
def run_op_view_metadata_test ( run_start_call_first )
@server_initial_md = { 'sk1' = > 'sv1' , 'sk2' = > 'sv2' }
@server_trailing_md = { 'tk1' = > 'tv1' , 'tk2' = > 'tv2' }
th = run_bidi_streamer_echo_ping_pong (
@sent_msgs , @pass , true ,
expected_metadata : @metadata ,
server_initial_md : @server_initial_md ,
server_trailing_md : @server_trailing_md )
stub = GRPC :: ClientStub . new ( @host , :this_channel_is_insecure )
e = get_responses ( stub , run_start_call_first : true )
expect ( e . collect { | r | r } ) . to eq ( @replys )
e = get_responses ( stub , run_start_call_first : run_start_call_first )
expect ( e . collect { | r | r } ) . to eq ( @sent_msg s )
th . join
end
it 'can run start_call before executing the call' do
run_op_view_metadata_test ( true )
check_op_view_of_finished_client_call (
@op , @server_initial_md , @server_trailing_md ) do | responses |
responses . each { | r | p r }
end
end
it 'doesnt crash when op_view used after call has finished' do
run_op_view_metadata_test ( false )
check_op_view_of_finished_client_call (
@op , @server_initial_md , @server_trailing_md ) do | responses |
responses . each { | r | p r }
end
end
end
end
def run_server_streamer ( expected_input , replys , status , ** kw )
wanted_metadata = kw . clone
def run_server_streamer ( expected_input , replys , status ,
expected_metadata : { } ,
server_initial_md : { } ,
server_trailing_md : { } )
wanted_metadata = expected_metadata . clone
wakey_thread do | notifier |
c = expect_server_to_be_invoked ( notifier )
c = expect_server_to_be_invoked (
notifier , metadata_to_send : server_initial_md )
wanted_metadata . each do | k , v |
expect ( c . metadata [ k . to_s ] ) . to eq ( v )
end
expect ( c . remote_read ) . to eq ( expected_input )
replys . each { | r | c . remote_send ( r ) }
c . send_status ( status , status == @pass ? 'OK' : 'NOK' , true )
c . send_status ( status , status == @pass ? 'OK' : 'NOK' , true ,
metadata : server_trailing_md )
end
end
@ -472,9 +670,17 @@ describe 'ClientStub' do
end
end
def run_bidi_streamer_echo_ping_pong ( expected_inputs , status , client_starts )
def run_bidi_streamer_echo_ping_pong ( expected_inputs , status , client_starts ,
expected_metadata : { } ,
server_initial_md : { } ,
server_trailing_md : { } )
wanted_metadata = expected_metadata . clone
wakey_thread do | notifier |
c = expect_server_to_be_invoked ( notifier )
c = expect_server_to_be_invoked (
notifier , metadata_to_send : server_initial_md )
wanted_metadata . each do | k , v |
expect ( c . metadata [ k . to_s ] ) . to eq ( v )
end
expected_inputs . each do | i |
if client_starts
expect ( c . remote_read ) . to eq ( i )
@ -484,33 +690,44 @@ describe 'ClientStub' do
expect ( c . remote_read ) . to eq ( i )
end
end
c . send_status ( status , status == @pass ? 'OK' : 'NOK' , true )
c . send_status ( status , status == @pass ? 'OK' : 'NOK' , true ,
metadata : server_trailing_md )
end
end
def run_client_streamer ( expected_inputs , resp , status , ** kw )
wanted_metadata = kw . clone
def run_client_streamer ( expected_inputs , resp , status ,
expected_metadata : { } ,
server_initial_md : { } ,
server_trailing_md : { } )
wanted_metadata = expected_metadata . clone
wakey_thread do | notifier |
c = expect_server_to_be_invoked ( notifier )
c = expect_server_to_be_invoked (
notifier , metadata_to_send : server_initial_md )
expected_inputs . each { | i | expect ( c . remote_read ) . to eq ( i ) }
wanted_metadata . each do | k , v |
expect ( c . metadata [ k . to_s ] ) . to eq ( v )
end
c . remote_send ( resp )
c . send_status ( status , status == @pass ? 'OK' : 'NOK' , true )
c . send_status ( status , status == @pass ? 'OK' : 'NOK' , true ,
metadata : server_trailing_md )
end
end
def run_request_response ( expected_input , resp , status , ** kw )
wanted_metadata = kw . clone
def run_request_response ( expected_input , resp , status ,
expected_metadata : { } ,
server_initial_md : { } ,
server_trailing_md : { } )
wanted_metadata = expected_metadata . clone
wakey_thread do | notifier |
c = expect_server_to_be_invoked ( notifier )
c = expect_server_to_be_invoked (
notifier , metadata_to_send : server_initial_md )
expect ( c . remote_read ) . to eq ( expected_input )
wanted_metadata . each do | k , v |
expect ( c . metadata [ k . to_s ] ) . to eq ( v )
end
c . remote_send ( resp )
c . send_status ( status , status == @pass ? 'OK' : 'NOK' , true )
c . send_status ( status , status == @pass ? 'OK' : 'NOK' , true ,
metadata : server_trailing_md )
end
end
@ -528,13 +745,13 @@ describe 'ClientStub' do
@server . add_http2_port ( '0.0.0.0:0' , :this_port_is_insecure )
end
def expect_server_to_be_invoked ( notifier )
def expect_server_to_be_invoked ( notifier , metadata_to_send : nil )
@server . start
notifier . notify ( nil )
recvd_rpc = @server . request_call
recvd_call = recvd_rpc . call
recvd_call . metadata = recvd_rpc . metadata
recvd_call . run_batch ( SEND_INITIAL_METADATA = > nil )
recvd_call . run_batch ( SEND_INITIAL_METADATA = > metadata_to_send )
GRPC :: ActiveCall . new ( recvd_call , noop , noop , INFINITE_FUTURE ,
metadata_received : true )
end