diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 6cb71870f55..344cb941ffb 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -103,7 +103,7 @@ static void destroy_call(grpc_rb_call *call) { if (call->wrapped != NULL) { grpc_call_destroy(call->wrapped); call->wrapped = NULL; - grpc_rb_completion_queue_safe_destroy(call->queue); + grpc_rb_completion_queue_destroy(call->queue); call->queue = NULL; } } diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 449c91a0f98..e02dd0805d3 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -52,27 +52,28 @@ /* id_channel is the name of the hidden ivar that preserves a reference to the * channel on a call, so that calls are not GCed before their channel. */ -ID id_channel; +static ID id_channel; /* id_target is the name of the hidden ivar that preserves a reference to the * target string used to create the call, preserved so that it does not get * GCed before the channel */ -ID id_target; +static ID id_target; /* id_insecure_channel is used to indicate that a channel is insecure */ -VALUE id_insecure_channel; +static VALUE id_insecure_channel; /* grpc_rb_cChannel is the ruby class that proxies grpc_channel. */ -VALUE grpc_rb_cChannel = Qnil; +static VALUE grpc_rb_cChannel = Qnil; /* Used during the conversion of a hash to channel args during channel setup */ -VALUE grpc_rb_cChannelArgs; +static VALUE grpc_rb_cChannelArgs; typedef struct bg_watched_channel { grpc_channel *channel; + // these fields must only be accessed under global_connection_polling_mu struct bg_watched_channel *next; int channel_destroyed; - int refcount; // must only be accessed under global_connection_polling_mu + int refcount; } bg_watched_channel; /* grpc_rb_channel wraps a grpc_channel. */ @@ -101,32 +102,34 @@ typedef struct watch_state_op { } op; } watch_state_op; -bg_watched_channel *bg_watched_channel_list_head = NULL; +static bg_watched_channel *bg_watched_channel_list_head = NULL; -void grpc_rb_channel_try_register_connection_polling(bg_watched_channel *bg); -void *wait_until_channel_polling_thread_started_no_gil(void *); -void *channel_init_try_register_connection_polling_without_gil(void *arg); +static void grpc_rb_channel_try_register_connection_polling( + bg_watched_channel *bg); +static void *wait_until_channel_polling_thread_started_no_gil(void *); +static void *channel_init_try_register_connection_polling_without_gil( + void *arg); typedef struct channel_init_try_register_stack { grpc_channel *channel; grpc_rb_channel *wrapper; } channel_init_try_register_stack; -grpc_completion_queue *channel_polling_cq; -gpr_mu global_connection_polling_mu; -gpr_cv global_connection_polling_cv; -int abort_channel_polling = 0; -int channel_polling_thread_started = 0; +static grpc_completion_queue *channel_polling_cq; +static gpr_mu global_connection_polling_mu; +static gpr_cv global_connection_polling_cv; +static int abort_channel_polling = 0; +static int channel_polling_thread_started = 0; -int bg_watched_channel_list_lookup(bg_watched_channel *bg); -bg_watched_channel *bg_watched_channel_list_create_and_add( +static int bg_watched_channel_list_lookup(bg_watched_channel *bg); +static bg_watched_channel *bg_watched_channel_list_create_and_add( grpc_channel *channel); -void bg_watched_channel_list_free_and_remove(bg_watched_channel *bg); -void run_poll_channels_loop_unblocking_func(void *arg); +static void bg_watched_channel_list_free_and_remove(bg_watched_channel *bg); +static void run_poll_channels_loop_unblocking_func(void *arg); // Needs to be called under global_connection_polling_mu -void grpc_rb_channel_watch_connection_state_op_complete(watch_state_op *op, - int success) { +static void grpc_rb_channel_watch_connection_state_op_complete( + watch_state_op *op, int success) { GPR_ASSERT(!op->op.api_callback_args.called_back); op->op.api_callback_args.called_back = 1; op->op.api_callback_args.success = success; @@ -135,7 +138,7 @@ void grpc_rb_channel_watch_connection_state_op_complete(watch_state_op *op, } /* Avoids destroying a channel twice. */ -void grpc_rb_channel_safe_destroy(bg_watched_channel *bg) { +static void grpc_rb_channel_safe_destroy(bg_watched_channel *bg) { gpr_mu_lock(&global_connection_polling_mu); GPR_ASSERT(bg_watched_channel_list_lookup(bg)); if (!bg->channel_destroyed) { @@ -149,13 +152,13 @@ void grpc_rb_channel_safe_destroy(bg_watched_channel *bg) { gpr_mu_unlock(&global_connection_polling_mu); } -void *channel_safe_destroy_without_gil(void *arg) { +static void *channel_safe_destroy_without_gil(void *arg) { grpc_rb_channel_safe_destroy((bg_watched_channel *)arg); return NULL; } /* Destroys Channel instances. */ -void grpc_rb_channel_free(void *p) { +static void grpc_rb_channel_free(void *p) { grpc_rb_channel *ch = NULL; if (p == NULL) { return; @@ -165,7 +168,8 @@ void grpc_rb_channel_free(void *p) { if (ch->bg_wrapped != NULL) { /* assumption made here: it's ok to directly gpr_mu_lock the global * connection polling mutex becuse we're in a finalizer, - * and we can count on this thread to not be interrupted. */ + * and we can count on this thread to not be interrupted or + * yield the gil. */ grpc_rb_channel_safe_destroy(ch->bg_wrapped); ch->bg_wrapped = NULL; } @@ -174,7 +178,7 @@ void grpc_rb_channel_free(void *p) { } /* Protects the mark object from GC */ -void grpc_rb_channel_mark(void *p) { +static void grpc_rb_channel_mark(void *p) { grpc_rb_channel *channel = NULL; if (p == NULL) { return; @@ -185,20 +189,20 @@ void grpc_rb_channel_mark(void *p) { } } -rb_data_type_t grpc_channel_data_type = {"grpc_channel", - {grpc_rb_channel_mark, - grpc_rb_channel_free, - GRPC_RB_MEMSIZE_UNAVAILABLE, - {NULL, NULL}}, - NULL, - NULL, +static rb_data_type_t grpc_channel_data_type = {"grpc_channel", + {grpc_rb_channel_mark, + grpc_rb_channel_free, + GRPC_RB_MEMSIZE_UNAVAILABLE, + {NULL, NULL}}, + NULL, + NULL, #ifdef RUBY_TYPED_FREE_IMMEDIATELY - RUBY_TYPED_FREE_IMMEDIATELY + RUBY_TYPED_FREE_IMMEDIATELY #endif }; /* Allocates grpc_rb_channel instances. */ -VALUE grpc_rb_channel_alloc(VALUE cls) { +static VALUE grpc_rb_channel_alloc(VALUE cls) { grpc_rb_channel *wrapper = ALLOC(grpc_rb_channel); wrapper->bg_wrapped = NULL; wrapper->credentials = Qnil; @@ -213,7 +217,7 @@ VALUE grpc_rb_channel_alloc(VALUE cls) { secure_channel = Channel:new("myhost:443", {'arg1': 'value1'}, creds) Creates channel instances. */ -VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { +static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { VALUE channel_args = Qnil; VALUE credentials = Qnil; VALUE target = Qnil; @@ -274,13 +278,15 @@ typedef struct get_state_stack { int out; } get_state_stack; -void *get_state_without_gil(void *arg) { +static void *get_state_without_gil(void *arg) { get_state_stack *stack = (get_state_stack *)arg; gpr_mu_lock(&global_connection_polling_mu); GPR_ASSERT(abort_channel_polling || channel_polling_thread_started); if (abort_channel_polling) { - // the case in which the channel polling thread + // Assume that this channel has been destroyed by the + // background thread. + // The case in which the channel polling thread // failed to start just always shows shutdown state. stack->out = GRPC_CHANNEL_SHUTDOWN; } else { @@ -301,16 +307,14 @@ void *get_state_without_gil(void *arg) { constants defined in GRPC::Core::ConnectivityStates. It also tries to connect if the chennel is idle in the second form. */ -VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, - VALUE self) { +static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, + VALUE self) { VALUE try_to_connect_param = Qfalse; - int grpc_try_to_connect = 0; grpc_rb_channel *wrapper = NULL; get_state_stack stack; /* "01" == 0 mandatory args, 1 (try_to_connect) is optional */ rb_scan_args(argc, argv, "01", &try_to_connect_param); - grpc_try_to_connect = RTEST(try_to_connect_param) ? 1 : 0; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); if (wrapper->bg_wrapped == NULL) { @@ -319,7 +323,7 @@ VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, } stack.channel = wrapper->bg_wrapped->channel; - stack.try_to_connect = grpc_try_to_connect; + stack.try_to_connect = RTEST(try_to_connect_param) ? 1 : 0; rb_thread_call_without_gvl(get_state_without_gil, &stack, NULL, NULL); return LONG2NUM(stack.out); @@ -331,7 +335,7 @@ typedef struct watch_state_stack { int last_state; } watch_state_stack; -void *wait_for_watch_state_op_complete_without_gvl(void *arg) { +static void *wait_for_watch_state_op_complete_without_gvl(void *arg) { watch_state_stack *stack = (watch_state_stack *)arg; watch_state_op *op = NULL; void *success = (void *)0; @@ -345,7 +349,6 @@ void *wait_for_watch_state_op_complete_without_gvl(void *arg) { } op = gpr_zalloc(sizeof(watch_state_op)); op->op_type = WATCH_STATE_API; - // one ref for this thread and another for the callback-running thread grpc_channel_watch_connectivity_state(stack->channel, stack->last_state, stack->deadline, channel_polling_cq, op); @@ -370,8 +373,9 @@ void *wait_for_watch_state_op_complete_without_gvl(void *arg) { * Returns false if "deadline" expires before the channel's connectivity * state changes from "last_state". * */ -VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE last_state, - VALUE deadline) { +static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, + VALUE last_state, + VALUE deadline) { grpc_rb_channel *wrapper = NULL; watch_state_stack stack; void *op_success = 0; @@ -403,8 +407,9 @@ VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE last_state, /* Create a call given a grpc_channel, in order to call method. The request is not sent until grpc_call_invoke is called. */ -VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, - VALUE method, VALUE host, VALUE deadline) { +static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, + VALUE method, VALUE host, + VALUE deadline) { VALUE res = Qnil; grpc_rb_channel *wrapper = NULL; grpc_call *call = NULL; @@ -466,7 +471,7 @@ VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, /* Closes the channel, calling it's destroy method */ /* Note this is an API-level call; a wrapped channel's finalizer doesn't call * this */ -VALUE grpc_rb_channel_destroy(VALUE self) { +static VALUE grpc_rb_channel_destroy(VALUE self) { grpc_rb_channel *wrapper = NULL; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); @@ -480,7 +485,7 @@ VALUE grpc_rb_channel_destroy(VALUE self) { } /* Called to obtain the target that this channel accesses. */ -VALUE grpc_rb_channel_get_target(VALUE self) { +static VALUE grpc_rb_channel_get_target(VALUE self) { grpc_rb_channel *wrapper = NULL; VALUE res = Qnil; char *target = NULL; @@ -494,7 +499,7 @@ VALUE grpc_rb_channel_get_target(VALUE self) { } /* Needs to be called under global_connection_polling_mu */ -int bg_watched_channel_list_lookup(bg_watched_channel *target) { +static int bg_watched_channel_list_lookup(bg_watched_channel *target) { bg_watched_channel *cur = bg_watched_channel_list_head; while (cur != NULL) { @@ -508,7 +513,7 @@ int bg_watched_channel_list_lookup(bg_watched_channel *target) { } /* Needs to be called under global_connection_polling_mu */ -bg_watched_channel *bg_watched_channel_list_create_and_add( +static bg_watched_channel *bg_watched_channel_list_create_and_add( grpc_channel *channel) { bg_watched_channel *watched = gpr_zalloc(sizeof(bg_watched_channel)); @@ -520,7 +525,8 @@ bg_watched_channel *bg_watched_channel_list_create_and_add( } /* Needs to be called under global_connection_polling_mu */ -void bg_watched_channel_list_free_and_remove(bg_watched_channel *target) { +static void bg_watched_channel_list_free_and_remove( + bg_watched_channel *target) { bg_watched_channel *bg = NULL; GPR_ASSERT(bg_watched_channel_list_lookup(target)); @@ -544,7 +550,8 @@ void bg_watched_channel_list_free_and_remove(bg_watched_channel *target) { /* Initialize a grpc_rb_channel's "protected grpc_channel" and try to push * it onto the background thread for constant watches. */ -void *channel_init_try_register_connection_polling_without_gil(void *arg) { +static void *channel_init_try_register_connection_polling_without_gil( + void *arg) { channel_init_try_register_stack *stack = (channel_init_try_register_stack *)arg; @@ -557,7 +564,8 @@ void *channel_init_try_register_connection_polling_without_gil(void *arg) { } // Needs to be called under global_connection_poolling_mu -void grpc_rb_channel_try_register_connection_polling(bg_watched_channel *bg) { +static void grpc_rb_channel_try_register_connection_polling( + bg_watched_channel *bg) { grpc_connectivity_state conn_state; watch_state_op *op = NULL; @@ -599,7 +607,7 @@ void grpc_rb_channel_try_register_connection_polling(bg_watched_channel *bg) { // indicates process shutdown. // In the worst case, this stops polling channel connectivity // early and falls back to current behavior. -void *run_poll_channels_loop_no_gil(void *arg) { +static void *run_poll_channels_loop_no_gil(void *arg) { grpc_event event; watch_state_op *op = NULL; bg_watched_channel *bg = NULL; @@ -643,7 +651,7 @@ void *run_poll_channels_loop_no_gil(void *arg) { } // Notify the channel polling loop to cleanup and shutdown. -void run_poll_channels_loop_unblocking_func(void *arg) { +static void run_poll_channels_loop_unblocking_func(void *arg) { bg_watched_channel *bg = NULL; (void)arg; @@ -677,7 +685,7 @@ void run_poll_channels_loop_unblocking_func(void *arg) { } // Poll channel connectivity states in background thread without the GIL. -VALUE run_poll_channels_loop(VALUE arg) { +static VALUE run_poll_channels_loop(VALUE arg) { (void)arg; gpr_log( GPR_DEBUG, @@ -688,7 +696,7 @@ VALUE run_poll_channels_loop(VALUE arg) { return Qnil; } -void *wait_until_channel_polling_thread_started_no_gil(void *arg) { +static void *wait_until_channel_polling_thread_started_no_gil(void *arg) { (void)arg; gpr_log(GPR_DEBUG, "GRPC_RUBY: wait for channel polling thread to start"); gpr_mu_lock(&global_connection_polling_mu); @@ -740,7 +748,7 @@ void grpc_rb_channel_polling_thread_start() { } } -void Init_grpc_propagate_masks() { +static void Init_grpc_propagate_masks() { /* Constants representing call propagation masks in grpc.h */ VALUE grpc_rb_mPropagateMasks = rb_define_module_under(grpc_rb_mGrpcCore, "PropagateMasks"); @@ -756,7 +764,7 @@ void Init_grpc_propagate_masks() { UINT2NUM(GRPC_PROPAGATE_DEFAULTS)); } -void Init_grpc_connectivity_states() { +static void Init_grpc_connectivity_states() { /* Constants representing call propagation masks in grpc.h */ VALUE grpc_rb_mConnectivityStates = rb_define_module_under(grpc_rb_mGrpcCore, "ConnectivityStates"); diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c index 9f3a81b1a8b..fd75d2f691f 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.c +++ b/src/ruby/ext/grpc/rb_completion_queue.c @@ -71,16 +71,12 @@ static void *grpc_rb_completion_queue_pluck_no_gil(void *param) { } /* Helper function to free a completion queue. */ -void grpc_rb_completion_queue_safe_destroy(grpc_completion_queue *cq) { - grpc_event ev; - +void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq) { + /* Every function that adds an event to a queue also synchronously plucks + that event from the queue, and holds a reference to the Ruby object that + holds the queue, so we only get to this point if all of those functions + have completed, and the queue is empty */ grpc_completion_queue_shutdown(cq); - for(;;) { - ev = grpc_completion_queue_pluck(cq, NULL, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); - if (ev.type == GRPC_QUEUE_SHUTDOWN) { - break; - } - } grpc_completion_queue_destroy(cq); } diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h index eb041b28dfc..aa9dc6416af 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.h +++ b/src/ruby/ext/grpc/rb_completion_queue.h @@ -38,7 +38,7 @@ #include -void grpc_rb_completion_queue_safe_destroy(grpc_completion_queue *cq); +void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq); /** * Makes the implementation of CompletionQueue#pluck available in other files diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c index 2a3b3391027..e0be5d7f087 100644 --- a/src/ruby/ext/grpc/rb_grpc.c +++ b/src/ruby/ext/grpc/rb_grpc.c @@ -42,7 +42,6 @@ #include #include -#include #include "rb_call.h" #include "rb_call_credentials.h" #include "rb_channel.h" diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index 2b0858c247e..2286a99f249 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -77,7 +77,7 @@ static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) { gpr_inf_future(GPR_CLOCK_REALTIME), NULL); } grpc_server_destroy(server->wrapped); - grpc_rb_completion_queue_safe_destroy(server->queue); + grpc_rb_completion_queue_destroy(server->queue); server->wrapped = NULL; server->queue = NULL; } diff --git a/src/ruby/spec/channel_connection_spec.rb b/src/ruby/spec/channel_connection_spec.rb index b3edec8f938..c8a7856a099 100644 --- a/src/ruby/spec/channel_connection_spec.rb +++ b/src/ruby/spec/channel_connection_spec.rb @@ -143,8 +143,7 @@ describe 'channel connection behavior' do stop_server end - it 'observably connects and reconnects to transient server' \ - ' when using the channel state API' do + it 'concurrent watches on the same channel' do timeout(180) do port = start_server ch = GRPC::Core::Channel.new("localhost:#{port}", {},