From b1fa5d46275572274fe3e5a501c267be4ea9ffe8 Mon Sep 17 00:00:00 2001 From: Tim Emiola Date: Thu, 11 Jun 2015 09:35:06 -0700 Subject: [PATCH] Ruby shutdown api migration + all tests pass, - but there are a couple of workarounds - tests are flaky --- src/ruby/ext/grpc/rb_completion_queue.c | 12 ++++- src/ruby/ext/grpc/rb_server.c | 64 ++++++++++++++++++++--- src/ruby/lib/grpc/generic/rpc_server.rb | 13 +++-- src/ruby/spec/client_server_spec.rb | 9 ++-- src/ruby/spec/generic/active_call_spec.rb | 2 +- src/ruby/spec/generic/client_stub_spec.rb | 3 +- src/ruby/spec/generic/rpc_server_spec.rb | 12 ----- src/ruby/spec/server_spec.rb | 22 ++++---- 8 files changed, 91 insertions(+), 46 deletions(-) diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c index fa4c5660048..8fb3949b3dc 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.c +++ b/src/ruby/ext/grpc/rb_completion_queue.c @@ -142,8 +142,16 @@ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag, MEMZERO(&next_call, next_call_stack, 1); TypedData_Get_Struct(self, grpc_completion_queue, &grpc_rb_completion_queue_data_type, next_call.cq); - next_call.timeout = grpc_rb_time_timeval(timeout, /* absolute time*/ 0); - next_call.tag = ROBJECT(tag); + if (TYPE(timeout) == T_NIL) { + next_call.timeout = gpr_inf_future; + } else { + next_call.timeout = grpc_rb_time_timeval(timeout, /* absolute time*/ 0); + } + if (TYPE(tag) == T_NIL) { + next_call.tag = NULL; + } else { + next_call.tag = ROBJECT(tag); + } next_call.event.type = GRPC_QUEUE_TIMEOUT; rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil, (void *)&next_call, NULL, NULL); diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index 837ca3b5e8d..9c0d24bf8fd 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -210,7 +210,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue, VALUE result; TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); if (s->wrapped == NULL) { - rb_raise(rb_eRuntimeError, "closed!"); + rb_raise(rb_eRuntimeError, "destroyed!"); return Qnil; } else { grpc_request_call_stack_init(&st); @@ -259,21 +259,69 @@ static VALUE grpc_rb_server_start(VALUE self) { grpc_rb_server *s = NULL; TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); if (s->wrapped == NULL) { - rb_raise(rb_eRuntimeError, "closed!"); + rb_raise(rb_eRuntimeError, "destroyed!"); } else { grpc_server_start(s->wrapped); } return Qnil; } -static VALUE grpc_rb_server_destroy(VALUE self) { +/* + call-seq: + cq = CompletionQueue.new + server = Server.new(cq, {'arg1': 'value1'}) + ... // do stuff with server + ... + ... // to shutdown the server + server.destroy(cq) + + ... // to shutdown the server with a timeout + server.destroy(cq, timeout) + + Destroys server instances. */ +static VALUE grpc_rb_server_destroy(int argc, VALUE *argv, VALUE self) { + VALUE cqueue = Qnil; + VALUE timeout = Qnil; + grpc_completion_queue *cq = NULL; + grpc_event ev; grpc_rb_server *s = NULL; + + /* "11" == 1 mandatory args, 1 (timeout) is optional */ + rb_scan_args(argc, argv, "11", &cqueue, &timeout); + cq = grpc_rb_get_wrapped_completion_queue(cqueue); TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); + if (s->wrapped != NULL) { - grpc_server_shutdown(s->wrapped); + grpc_server_shutdown_and_notify(s->wrapped, cq, NULL); + ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout); + + if (!ev.success) { + rb_warn("server shutdown failed, there will be a LEAKED object warning"); + return Qnil; + /* + TODO: renable the rb_raise below. + + At the moment if the timeout is INFINITE_FUTURE as recommended, the + pluck blocks forever, even though + + the outstanding server_request_calls correctly fail on the other + thread that they are running on. + + it's almost as if calls that fail on the other thread do not get + cleaned up by shutdown request, even though it caused htem to + terminate. + + rb_raise(rb_eRuntimeError, "grpc server shutdown did not succeed"); + return Qnil; + + The workaround is just to use a timeout and return without really + shutting down the server, and rely on the grpc core garbage collection + it down as a 'LEAKED OBJECT'. + + */ + } grpc_server_destroy(s->wrapped); s->wrapped = NULL; - s->mark = Qnil; } return Qnil; } @@ -302,7 +350,7 @@ static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) { TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); if (s->wrapped == NULL) { - rb_raise(rb_eRuntimeError, "closed!"); + rb_raise(rb_eRuntimeError, "destroyed!"); return Qnil; } else if (rb_creds == Qnil) { recvd_port = grpc_server_add_http2_port(s->wrapped, StringValueCStr(port)); @@ -315,7 +363,7 @@ static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) { creds = grpc_rb_get_wrapped_server_credentials(rb_creds); recvd_port = grpc_server_add_secure_http2_port(s->wrapped, StringValueCStr(port), - creds); + creds); if (recvd_port == 0) { rb_raise(rb_eRuntimeError, "could not add secure port %s to server, not sure why", @@ -341,7 +389,7 @@ void Init_grpc_server() { rb_define_method(grpc_rb_cServer, "request_call", grpc_rb_server_request_call, 3); rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0); - rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, 0); + rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, -1); rb_define_alias(grpc_rb_cServer, "close", "destroy"); rb_define_method(grpc_rb_cServer, "add_http2_port", grpc_rb_server_add_http2_port, diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index dcb11bfbef7..a7e20d6b82c 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -278,7 +278,9 @@ module GRPC @stopped = true end @pool.stop - @server.close + deadline = from_relative_time(@poll_period) + + @server.close(@cq, deadline) end # determines if the server has been stopped @@ -410,17 +412,18 @@ module GRPC # handles calls to the server def loop_handle_server_calls fail 'not running' unless @running - request_call_tag = Object.new + loop_tag = Object.new until stopped? deadline = from_relative_time(@poll_period) begin - an_rpc = @server.request_call(@cq, request_call_tag, deadline) + an_rpc = @server.request_call(@cq, loop_tag, deadline) + c = new_active_server_call(an_rpc) rescue Core::CallError, RuntimeError => e - # can happen during server shutdown + # these might happen for various reasonse. The correct behaviour of + # the server is to log them and continue. GRPC.logger.warn("server call failed: #{e}") next end - c = new_active_server_call(an_rpc) unless c.nil? mth = an_rpc.method.to_sym @pool.schedule(c) do |call| diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb index 68af79f9075..b22e510a6a2 100644 --- a/src/ruby/spec/client_server_spec.rb +++ b/src/ruby/spec/client_server_spec.rb @@ -42,11 +42,8 @@ shared_context 'setup: tags' do let(:sent_message) { 'sent message' } let(:reply_text) { 'the reply' } before(:example) do - @server_finished_tag = Object.new - @client_finished_tag = Object.new - @client_metadata_tag = Object.new + @client_tag = Object.new @server_tag = Object.new - @tag = Object.new end def deadline @@ -351,7 +348,7 @@ describe 'the http client/server' do after(:example) do @ch.close - @server.close + @server.close(@server_queue, deadline) end it_behaves_like 'basic GRPC message delivery is OK' do @@ -377,7 +374,7 @@ describe 'the secure http client/server' do end after(:example) do - @server.close + @server.close(@server_queue, deadline) end it_behaves_like 'basic GRPC message delivery is OK' do diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 575871afb11..bc3bee3d440 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -51,7 +51,7 @@ describe GRPC::ActiveCall do end after(:each) do - @server.close + @server.close(@server_queue, deadline) end describe 'restricted view methods' do diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 98d68ccfbb8..68d4b117905 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -54,6 +54,7 @@ describe 'ClientStub' do before(:each) do Thread.abort_on_exception = true @server = nil + @server_queue = nil @method = 'an_rpc_method' @pass = OK @fail = INTERNAL @@ -61,7 +62,7 @@ describe 'ClientStub' do end after(:each) do - @server.close unless @server.nil? + @server.close(@server_queue) unless @server_queue.nil? end describe '#new' do diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index e60a8b27c3f..f2403de77c4 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -136,10 +136,6 @@ describe GRPC::RpcServer do @ch = GRPC::Core::Channel.new(@host, nil) end - after(:each) do - @server.close - end - describe '#new' do it 'can be created with just some args' do opts = { a_channel_arg: 'an_arg' } @@ -344,10 +340,6 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**server_opts) end - after(:each) do - @srv.stop - end - it 'should return NOT_FOUND status on unknown methods', server: true do @srv.handle(EchoService) t = Thread.new { @srv.run } @@ -527,10 +519,6 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**server_opts) end - after(:each) do - @srv.stop - end - it 'should send connect metadata to the client', server: true do service = EchoService.new @srv.handle(service) diff --git a/src/ruby/spec/server_spec.rb b/src/ruby/spec/server_spec.rb index bb566d1b1fb..47fe5753438 100644 --- a/src/ruby/spec/server_spec.rb +++ b/src/ruby/spec/server_spec.rb @@ -54,7 +54,7 @@ describe Server do it 'fails if the server is closed' do s = Server.new(@cq, nil) - s.close + s.close(@cq) expect { s.start }.to raise_error(RuntimeError) end end @@ -62,19 +62,19 @@ describe Server do describe '#destroy' do it 'destroys a server ok' do s = start_a_server - blk = proc { s.destroy } + blk = proc { s.destroy(@cq) } expect(&blk).to_not raise_error end it 'can be called more than once without error' do s = start_a_server begin - blk = proc { s.destroy } + blk = proc { s.destroy(@cq) } expect(&blk).to_not raise_error blk.call expect(&blk).to_not raise_error ensure - s.close + s.close(@cq) end end end @@ -83,16 +83,16 @@ describe Server do it 'closes a server ok' do s = start_a_server begin - blk = proc { s.close } + blk = proc { s.close(@cq) } expect(&blk).to_not raise_error ensure - s.close + s.close(@cq) end end it 'can be called more than once without error' do s = start_a_server - blk = proc { s.close } + blk = proc { s.close(@cq) } expect(&blk).to_not raise_error blk.call expect(&blk).to_not raise_error @@ -105,14 +105,14 @@ describe Server do blk = proc do s = Server.new(@cq, nil) s.add_http2_port('localhost:0') - s.close + s.close(@cq) end expect(&blk).to_not raise_error end it 'fails if the server is closed' do s = Server.new(@cq, nil) - s.close + s.close(@cq) expect { s.add_http2_port('localhost:0') }.to raise_error(RuntimeError) end end @@ -123,14 +123,14 @@ describe Server do blk = proc do s = Server.new(@cq, nil) s.add_http2_port('localhost:0', cert) - s.close + s.close(@cq) end expect(&blk).to_not raise_error end it 'fails if the server is closed' do s = Server.new(@cq, nil) - s.close + s.close(@cq) blk = proc { s.add_http2_port('localhost:0', cert) } expect(&blk).to raise_error(RuntimeError) end