diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index bf26841fd22..55c745965b3 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -37,6 +37,7 @@ #include "rb_server.h" #include +#include #include #include #include "rb_call.h" @@ -59,22 +60,26 @@ typedef struct grpc_rb_server { /* The actual server */ grpc_server *wrapped; grpc_completion_queue *queue; + gpr_atm shutdown_started; } grpc_rb_server; static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) { grpc_event ev; - if (server->wrapped != NULL) { - grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL); - ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL); - if (ev.type == GRPC_QUEUE_TIMEOUT) { - grpc_server_cancel_all_calls(server->wrapped); - rb_completion_queue_pluck(server->queue, NULL, - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + // This can be started by app or implicitly by GC. Avoid a race between these. + if (gpr_atm_full_fetch_add(&server->shutdown_started, (gpr_atm)1) == 0) { + if (server->wrapped != NULL) { + grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL); + ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL); + if (ev.type == GRPC_QUEUE_TIMEOUT) { + grpc_server_cancel_all_calls(server->wrapped); + rb_completion_queue_pluck(server->queue, NULL, + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + } + grpc_server_destroy(server->wrapped); + grpc_rb_completion_queue_destroy(server->queue); + server->wrapped = NULL; + server->queue = NULL; } - grpc_server_destroy(server->wrapped); - grpc_rb_completion_queue_destroy(server->queue); - server->wrapped = NULL; - server->queue = NULL; } } @@ -115,6 +120,7 @@ static const rb_data_type_t grpc_rb_server_data_type = { static VALUE grpc_rb_server_alloc(VALUE cls) { grpc_rb_server *wrapper = ALLOC(grpc_rb_server); wrapper->wrapped = NULL; + wrapper->shutdown_started = (gpr_atm)0; return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper); } diff --git a/src/ruby/qps/client.rb b/src/ruby/qps/client.rb index 8aed866da5d..817192626be 100644 --- a/src/ruby/qps/client.rb +++ b/src/ruby/qps/client.rb @@ -134,6 +134,7 @@ class BenchmarkClient resp = stub.streaming_call(q.each_item) start = Time.now q.push(req) + pushed_sentinal = false resp.each do |r| @histogram.add((Time.now-start)*1e9) if !@done @@ -141,8 +142,9 @@ class BenchmarkClient start = Time.now q.push(req) else - q.push(self) - break + q.push(self) unless pushed_sentinal + # Continue polling on the responses to consume and release resources + pushed_sentinal = true end end end