Merge pull request #14134 from apolcyn/fix_ruby_shutdown_race

Refactor ruby server shutdown to fix a race in tests
pull/14331/merge
apolcyn 7 years ago committed by GitHub
commit 90dde8f136
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 80
      src/ruby/ext/grpc/rb_server.c
  2. 11
      src/ruby/lib/grpc/generic/rpc_server.rb
  3. 6
      src/ruby/spec/client_server_spec.rb
  4. 3
      src/ruby/spec/generic/active_call_spec.rb
  5. 40
      src/ruby/spec/generic/client_stub_spec.rb
  6. 33
      src/ruby/spec/server_spec.rb

@ -46,21 +46,38 @@ typedef struct grpc_rb_server {
/* The actual server */
grpc_server* wrapped;
grpc_completion_queue* queue;
gpr_atm shutdown_started;
int shutdown_and_notify_done;
int destroy_done;
} grpc_rb_server;
static void destroy_server(grpc_rb_server* server, gpr_timespec deadline) {
static void grpc_rb_server_maybe_shutdown_and_notify(grpc_rb_server* server,
gpr_timespec deadline) {
grpc_event ev;
// This can be started by app or implicitly by GC. Avoid a race between these.
if (gpr_atm_full_fetch_add(&server->shutdown_started, (gpr_atm)1) == 0) {
void* tag = &ev;
if (!server->shutdown_and_notify_done) {
server->shutdown_and_notify_done = 1;
if (server->wrapped != NULL) {
grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL);
ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL);
grpc_server_shutdown_and_notify(server->wrapped, server->queue, tag);
ev = rb_completion_queue_pluck(server->queue, tag, deadline, NULL);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_server_cancel_all_calls(server->wrapped);
rb_completion_queue_pluck(server->queue, NULL,
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
ev = rb_completion_queue_pluck(
server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
}
if (ev.type != GRPC_OP_COMPLETE) {
gpr_log(GPR_INFO,
"GRPC_RUBY: bad grpc_server_shutdown_and_notify result:%d",
ev.type);
}
}
}
}
static void grpc_rb_server_maybe_destroy(grpc_rb_server* server) {
// This can be started by app or implicitly by GC. Avoid a race between these.
if (!server->destroy_done) {
server->destroy_done = 1;
if (server->wrapped != NULL) {
grpc_server_destroy(server->wrapped);
grpc_rb_completion_queue_destroy(server->queue);
server->wrapped = NULL;
@ -81,7 +98,8 @@ static void grpc_rb_server_free(void* p) {
deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(2, GPR_TIMESPAN));
destroy_server(svr, deadline);
grpc_rb_server_maybe_shutdown_and_notify(svr, deadline);
grpc_rb_server_maybe_destroy(svr);
xfree(p);
}
@ -107,7 +125,8 @@ static const rb_data_type_t grpc_rb_server_data_type = {
static VALUE grpc_rb_server_alloc(VALUE cls) {
grpc_rb_server* wrapper = ALLOC(grpc_rb_server);
wrapper->wrapped = NULL;
wrapper->shutdown_started = (gpr_atm)0;
wrapper->destroy_done = 0;
wrapper->shutdown_and_notify_done = 0;
return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper);
}
@ -232,25 +251,10 @@ static VALUE grpc_rb_server_start(VALUE self) {
return Qnil;
}
/*
call-seq:
server = Server.new({'arg1': 'value1'})
... // do stuff with server
...
... // to shutdown the server
server.destroy()
... // to shutdown the server with a timeout
server.destroy(timeout)
Destroys server instances. */
static VALUE grpc_rb_server_destroy(int argc, VALUE* argv, VALUE self) {
VALUE timeout = Qnil;
static VALUE grpc_rb_server_shutdown_and_notify(VALUE self, VALUE timeout) {
gpr_timespec deadline;
grpc_rb_server* s = NULL;
/* "01" == 0 mandatory args, 1 (timeout) is optional */
rb_scan_args(argc, argv, "01", &timeout);
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (TYPE(timeout) == T_NIL) {
deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
@ -258,8 +262,26 @@ static VALUE grpc_rb_server_destroy(int argc, VALUE* argv, VALUE self) {
deadline = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
}
destroy_server(s, deadline);
grpc_rb_server_maybe_shutdown_and_notify(s, deadline);
return Qnil;
}
/*
call-seq:
server = Server.new({'arg1': 'value1'})
... // do stuff with server
...
... // initiate server shutdown
server.shutdown_and_notify(timeout)
... // to shutdown the server
server.destroy()
Destroys server instances. */
static VALUE grpc_rb_server_destroy(VALUE self) {
grpc_rb_server* s = NULL;
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
grpc_rb_server_maybe_destroy(s);
return Qnil;
}
@ -326,7 +348,9 @@ void Init_grpc_server() {
rb_define_method(grpc_rb_cServer, "request_call", grpc_rb_server_request_call,
0);
rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0);
rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, -1);
rb_define_method(grpc_rb_cServer, "shutdown_and_notify",
grpc_rb_server_shutdown_and_notify, 1);
rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, 0);
rb_define_alias(grpc_rb_cServer, "close", "destroy");
rb_define_method(grpc_rb_cServer, "add_http2_port",
grpc_rb_server_add_http2_port, 2);

@ -244,9 +244,9 @@ module GRPC
fail 'Cannot stop before starting' if @running_state == :not_started
return if @running_state != :running
transition_running_state(:stopping)
deadline = from_relative_time(@poll_period)
@server.shutdown_and_notify(deadline)
end
deadline = from_relative_time(@poll_period)
@server.close(deadline)
@pool.stop
end
@ -416,8 +416,11 @@ module GRPC
end
end
# @running_state should be :stopping here
@run_mutex.synchronize { transition_running_state(:stopped) }
GRPC.logger.info("stopped: #{self}")
@run_mutex.synchronize do
transition_running_state(:stopped)
GRPC.logger.info("stopped: #{self}")
@server.close
end
end
def new_active_server_call(an_rpc)

@ -550,7 +550,8 @@ describe 'the http client/server' do
after(:example) do
@ch.close
@server.close(deadline)
@server.shutdown_and_notify(deadline)
@server.close
end
it_behaves_like 'basic GRPC message delivery is OK' do
@ -583,7 +584,8 @@ describe 'the secure http client/server' do
end
after(:example) do
@server.close(deadline)
@server.shutdown_and_notify(deadline)
@server.close
end
it_behaves_like 'basic GRPC message delivery is OK' do

@ -48,7 +48,8 @@ describe GRPC::ActiveCall do
end
after(:each) do
@server.close(deadline)
@server.shutdown_and_notify(deadline)
@server.close
end
describe 'restricted view methods' do

@ -83,7 +83,12 @@ def sanity_check_values_of_accessors(op_view,
op_view.deadline.is_a?(Time)).to be(true)
end
describe 'ClientStub' do
def close_active_server_call(active_server_call)
active_server_call.send(:set_input_stream_done)
active_server_call.send(:set_output_stream_done)
end
describe 'ClientStub' do # rubocop:disable Metrics/BlockLength
let(:noop) { proc { |x| x } }
before(:each) do
@ -96,7 +101,10 @@ describe 'ClientStub' do
end
after(:each) do
@server.close(from_relative_time(2)) unless @server.nil?
unless @server.nil?
@server.shutdown_and_notify(from_relative_time(2))
@server.close
end
end
describe '#new' do
@ -230,7 +238,15 @@ describe 'ClientStub' do
it 'should receive UNAVAILABLE if call credentials plugin fails' do
server_port = create_secure_test_server
th = run_request_response(@sent_msg, @resp, @pass)
server_started_notifier = GRPC::Notifier.new
th = Thread.new do
@server.start
server_started_notifier.notify(nil)
# Poll on the server so that the client connection can proceed.
# We don't expect the server to actually accept a call though.
expect { @server.request_call }.to raise_error(GRPC::Core::CallError)
end
server_started_notifier.wait
certs = load_test_certs
secure_channel_creds = GRPC::Core::ChannelCredentials.new(
@ -249,17 +265,18 @@ describe 'ClientStub' do
end
creds = GRPC::Core::CallCredentials.new(failing_auth)
unauth_error_occured = false
unavailable_error_occured = false
begin
get_response(stub, credentials: creds)
rescue GRPC::Unavailable => e
unauth_error_occured = true
unavailable_error_occured = true
expect(e.details.include?(error_message)).to be true
end
expect(unauth_error_occured).to eq(true)
expect(unavailable_error_occured).to eq(true)
# Kill the server thread so tests can complete
th.kill
@server.shutdown_and_notify(Time.now + 3)
th.join
@server.close
end
it 'should raise ArgumentError if metadata contains invalid values' do
@ -493,6 +510,7 @@ describe 'ClientStub' do
p 'remote_send failed (allowed because call expected to cancel)'
ensure
c.send_status(OK, 'OK', true)
close_active_server_call(c)
end
end
end
@ -659,6 +677,7 @@ describe 'ClientStub' do
end
# can't fail since initial metadata already sent
server_call.send_status(@pass, 'OK', true)
close_active_server_call(server_call)
end
def verify_error_from_write_thread(stub, requests_to_push,
@ -809,6 +828,7 @@ describe 'ClientStub' do
replys.each { |r| c.remote_send(r) }
c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
metadata: server_trailing_md)
close_active_server_call(c)
end
end
@ -819,6 +839,7 @@ describe 'ClientStub' do
expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
replys.each { |r| c.remote_send(r) }
c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
close_active_server_call(c)
end
end
@ -844,6 +865,7 @@ describe 'ClientStub' do
end
c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
metadata: server_trailing_md)
close_active_server_call(c)
end
end
@ -862,6 +884,7 @@ describe 'ClientStub' do
c.remote_send(resp)
c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
metadata: server_trailing_md)
close_active_server_call(c)
end
end
@ -880,6 +903,7 @@ describe 'ClientStub' do
c.remote_send(resp)
c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
metadata: server_trailing_md)
close_active_server_call(c)
end
end

@ -36,45 +36,60 @@ describe Server do
it 'fails if the server is closed' do
s = new_core_server_for_testing(nil)
s.shutdown_and_notify(nil)
s.close
expect { s.start }.to raise_error(RuntimeError)
end
end
describe '#destroy' do
describe '#shutdown_and_notify and #destroy' do
it 'destroys a server ok' do
s = start_a_server
blk = proc { s.destroy }
blk = proc do
s.shutdown_and_notify(nil)
s.destroy
end
expect(&blk).to_not raise_error
end
it 'can be called more than once without error' do
s = start_a_server
begin
blk = proc { s.destroy }
blk = proc do
s.shutdown_and_notify(nil)
s.destroy
end
expect(&blk).to_not raise_error
blk.call
expect(&blk).to_not raise_error
ensure
s.shutdown_and_notify(nil)
s.close
end
end
end
describe '#close' do
describe '#shutdown_and_notify and #close' do
it 'closes a server ok' do
s = start_a_server
begin
blk = proc { s.close }
blk = proc do
s.shutdown_and_notify(nil)
s.close
end
expect(&blk).to_not raise_error
ensure
s.close(@cq)
s.shutdown_and_notify(nil)
s.close
end
end
it 'can be called more than once without error' do
s = start_a_server
blk = proc { s.close }
blk = proc do
s.shutdown_and_notify(nil)
s.close
end
expect(&blk).to_not raise_error
blk.call
expect(&blk).to_not raise_error
@ -87,6 +102,7 @@ describe Server do
blk = proc do
s = new_core_server_for_testing(nil)
s.add_http2_port('localhost:0', :this_port_is_insecure)
s.shutdown_and_notify(nil)
s.close
end
expect(&blk).to_not raise_error
@ -94,6 +110,7 @@ describe Server do
it 'fails if the server is closed' do
s = new_core_server_for_testing(nil)
s.shutdown_and_notify(nil)
s.close
blk = proc do
s.add_http2_port('localhost:0', :this_port_is_insecure)
@ -108,6 +125,7 @@ describe Server do
blk = proc do
s = new_core_server_for_testing(nil)
s.add_http2_port('localhost:0', cert)
s.shutdown_and_notify(nil)
s.close
end
expect(&blk).to_not raise_error
@ -115,6 +133,7 @@ describe Server do
it 'fails if the server is closed' do
s = new_core_server_for_testing(nil)
s.shutdown_and_notify(nil)
s.close
blk = proc { s.add_http2_port('localhost:0', cert) }
expect(&blk).to raise_error(RuntimeError)

Loading…
Cancel
Save