From 3d48cf4ed30032d417c3b64df11b7fb45220388c Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 23 Nov 2016 09:50:22 -0800 Subject: [PATCH 1/3] dont break out of response stream iterator in benchmark client --- src/ruby/qps/client.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/ruby/qps/client.rb b/src/ruby/qps/client.rb index 8aed866da5d..817192626be 100644 --- a/src/ruby/qps/client.rb +++ b/src/ruby/qps/client.rb @@ -134,6 +134,7 @@ class BenchmarkClient resp = stub.streaming_call(q.each_item) start = Time.now q.push(req) + pushed_sentinal = false resp.each do |r| @histogram.add((Time.now-start)*1e9) if !@done @@ -141,8 +142,9 @@ class BenchmarkClient start = Time.now q.push(req) else - q.push(self) - break + q.push(self) unless pushed_sentinal + # Continue polling on the responses to consume and release resources + pushed_sentinal = true end end end From 7f8a628f6ae1c67020c9137ea53d2f9f20b00a50 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 23 Nov 2016 10:36:08 -0800 Subject: [PATCH 2/3] fix race between app and GC on ruby server shutdown --- src/ruby/ext/grpc/rb_server.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index bf26841fd22..6783fd3f88a 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -37,6 +37,7 @@ #include "rb_server.h" #include +#include #include #include #include "rb_call.h" @@ -59,11 +60,13 @@ typedef struct grpc_rb_server { /* The actual server */ grpc_server *wrapped; grpc_completion_queue *queue; + gpr_atm shutdown_started; } grpc_rb_server; static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) { grpc_event ev; - if (server->wrapped != NULL) { + // This can be started by app or implicitly by GC. Avoid a race between these. + if (gpr_atm_full_fetch_add(&server->shutdown_started, (gpr_atm)1) == 0) { grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL); ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL); if (ev.type == GRPC_QUEUE_TIMEOUT) { @@ -115,6 +118,7 @@ static const rb_data_type_t grpc_rb_server_data_type = { static VALUE grpc_rb_server_alloc(VALUE cls) { grpc_rb_server *wrapper = ALLOC(grpc_rb_server); wrapper->wrapped = NULL; + wrapper->shutdown_started = (gpr_atm)0; return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper); } From a868c0424e492cfa135390dacb76c30c525dd794 Mon Sep 17 00:00:00 2001 From: Alex Polcyn Date: Mon, 5 Dec 2016 18:49:08 +0000 Subject: [PATCH 3/3] keep old behavior to not destroy ruby server if not instantiated --- src/ruby/ext/grpc/rb_server.c | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index 6783fd3f88a..55c745965b3 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -67,17 +67,19 @@ static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) { grpc_event ev; // This can be started by app or implicitly by GC. Avoid a race between these. if (gpr_atm_full_fetch_add(&server->shutdown_started, (gpr_atm)1) == 0) { - grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL); - ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL); - if (ev.type == GRPC_QUEUE_TIMEOUT) { - grpc_server_cancel_all_calls(server->wrapped); - rb_completion_queue_pluck(server->queue, NULL, - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + if (server->wrapped != NULL) { + grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL); + ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL); + if (ev.type == GRPC_QUEUE_TIMEOUT) { + grpc_server_cancel_all_calls(server->wrapped); + rb_completion_queue_pluck(server->queue, NULL, + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + } + grpc_server_destroy(server->wrapped); + grpc_rb_completion_queue_destroy(server->queue); + server->wrapped = NULL; + server->queue = NULL; } - grpc_server_destroy(server->wrapped); - grpc_rb_completion_queue_destroy(server->queue); - server->wrapped = NULL; - server->queue = NULL; } }