|
|
@ -123,7 +123,7 @@ static void* run_poll_channels_loop_unblocking_func_wrapper(void* arg); |
|
|
|
// Needs to be called under global_connection_polling_mu
|
|
|
|
// Needs to be called under global_connection_polling_mu
|
|
|
|
static void grpc_rb_channel_watch_connection_state_op_complete( |
|
|
|
static void grpc_rb_channel_watch_connection_state_op_complete( |
|
|
|
watch_state_op* op, int success) { |
|
|
|
watch_state_op* op, int success) { |
|
|
|
GPR_ASSERT(!op->op.api_callback_args.called_back); |
|
|
|
GRPC_RUBY_ASSERT(!op->op.api_callback_args.called_back); |
|
|
|
op->op.api_callback_args.called_back = 1; |
|
|
|
op->op.api_callback_args.called_back = 1; |
|
|
|
op->op.api_callback_args.success = success; |
|
|
|
op->op.api_callback_args.success = success; |
|
|
|
// wake up the watch API call that's waiting on this op
|
|
|
|
// wake up the watch API call that's waiting on this op
|
|
|
@ -133,7 +133,7 @@ static void grpc_rb_channel_watch_connection_state_op_complete( |
|
|
|
/* Avoids destroying a channel twice. */ |
|
|
|
/* Avoids destroying a channel twice. */ |
|
|
|
static void grpc_rb_channel_safe_destroy(bg_watched_channel* bg) { |
|
|
|
static void grpc_rb_channel_safe_destroy(bg_watched_channel* bg) { |
|
|
|
gpr_mu_lock(&global_connection_polling_mu); |
|
|
|
gpr_mu_lock(&global_connection_polling_mu); |
|
|
|
GPR_ASSERT(bg_watched_channel_list_lookup(bg)); |
|
|
|
GRPC_RUBY_ASSERT(bg_watched_channel_list_lookup(bg)); |
|
|
|
if (!bg->channel_destroyed) { |
|
|
|
if (!bg->channel_destroyed) { |
|
|
|
grpc_channel_destroy(bg->channel); |
|
|
|
grpc_channel_destroy(bg->channel); |
|
|
|
bg->channel_destroyed = 1; |
|
|
|
bg->channel_destroyed = 1; |
|
|
@ -253,7 +253,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE* argv, VALUE self) { |
|
|
|
ch = grpc_channel_create(target_chars, creds, &wrapper->args); |
|
|
|
ch = grpc_channel_create(target_chars, creds, &wrapper->args); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
GPR_ASSERT(ch); |
|
|
|
GRPC_RUBY_ASSERT(ch); |
|
|
|
stack.channel = ch; |
|
|
|
stack.channel = ch; |
|
|
|
stack.wrapper = wrapper; |
|
|
|
stack.wrapper = wrapper; |
|
|
|
rb_thread_call_without_gvl( |
|
|
|
rb_thread_call_without_gvl( |
|
|
@ -416,7 +416,7 @@ static void grpc_rb_channel_maybe_recreate_channel_after_fork( |
|
|
|
if (bg->channel_destroyed) { |
|
|
|
if (bg->channel_destroyed) { |
|
|
|
// There must be one ref at this point, held by the ruby-level channel
|
|
|
|
// There must be one ref at this point, held by the ruby-level channel
|
|
|
|
// object, drop this one here.
|
|
|
|
// object, drop this one here.
|
|
|
|
GPR_ASSERT(bg->refcount == 1); |
|
|
|
GRPC_RUBY_ASSERT(bg->refcount == 1); |
|
|
|
rb_thread_call_without_gvl(channel_safe_destroy_without_gil, bg, NULL, |
|
|
|
rb_thread_call_without_gvl(channel_safe_destroy_without_gil, bg, NULL, |
|
|
|
NULL); |
|
|
|
NULL); |
|
|
|
// re-create C-core channel
|
|
|
|
// re-create C-core channel
|
|
|
@ -581,8 +581,8 @@ static void bg_watched_channel_list_free_and_remove( |
|
|
|
bg_watched_channel* target) { |
|
|
|
bg_watched_channel* target) { |
|
|
|
bg_watched_channel* bg = NULL; |
|
|
|
bg_watched_channel* bg = NULL; |
|
|
|
|
|
|
|
|
|
|
|
GPR_ASSERT(bg_watched_channel_list_lookup(target)); |
|
|
|
GRPC_RUBY_ASSERT(bg_watched_channel_list_lookup(target)); |
|
|
|
GPR_ASSERT(target->channel_destroyed && target->refcount == 0); |
|
|
|
GRPC_RUBY_ASSERT(target->channel_destroyed && target->refcount == 0); |
|
|
|
if (bg_watched_channel_list_head == target) { |
|
|
|
if (bg_watched_channel_list_head == target) { |
|
|
|
bg_watched_channel_list_head = target->next; |
|
|
|
bg_watched_channel_list_head = target->next; |
|
|
|
gpr_free(target); |
|
|
|
gpr_free(target); |
|
|
@ -597,7 +597,7 @@ static void bg_watched_channel_list_free_and_remove( |
|
|
|
} |
|
|
|
} |
|
|
|
bg = bg->next; |
|
|
|
bg = bg->next; |
|
|
|
} |
|
|
|
} |
|
|
|
GPR_ASSERT(0); |
|
|
|
GRPC_RUBY_ASSERT(0); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/* Initialize a grpc_rb_channel's "protected grpc_channel" and try to push
|
|
|
|
/* Initialize a grpc_rb_channel's "protected grpc_channel" and try to push
|
|
|
@ -621,11 +621,11 @@ static void grpc_rb_channel_try_register_connection_polling( |
|
|
|
grpc_connectivity_state conn_state; |
|
|
|
grpc_connectivity_state conn_state; |
|
|
|
watch_state_op* op = NULL; |
|
|
|
watch_state_op* op = NULL; |
|
|
|
if (bg->refcount == 0) { |
|
|
|
if (bg->refcount == 0) { |
|
|
|
GPR_ASSERT(bg->channel_destroyed); |
|
|
|
GRPC_RUBY_ASSERT(bg->channel_destroyed); |
|
|
|
bg_watched_channel_list_free_and_remove(bg); |
|
|
|
bg_watched_channel_list_free_and_remove(bg); |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
GPR_ASSERT(bg->refcount == 1); |
|
|
|
GRPC_RUBY_ASSERT(bg->refcount == 1); |
|
|
|
if (bg->channel_destroyed || g_abort_channel_polling) { |
|
|
|
if (bg->channel_destroyed || g_abort_channel_polling) { |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
@ -633,7 +633,7 @@ static void grpc_rb_channel_try_register_connection_polling( |
|
|
|
if (conn_state == GRPC_CHANNEL_SHUTDOWN) { |
|
|
|
if (conn_state == GRPC_CHANNEL_SHUTDOWN) { |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
GPR_ASSERT(bg_watched_channel_list_lookup(bg)); |
|
|
|
GRPC_RUBY_ASSERT(bg_watched_channel_list_lookup(bg)); |
|
|
|
// prevent bg from being free'd by GC while background thread is watching it
|
|
|
|
// prevent bg from being free'd by GC while background thread is watching it
|
|
|
|
bg->refcount++; |
|
|
|
bg->refcount++; |
|
|
|
op = gpr_zalloc(sizeof(watch_state_op)); |
|
|
|
op = gpr_zalloc(sizeof(watch_state_op)); |
|
|
@ -679,7 +679,7 @@ static void* run_poll_channels_loop_no_gil(void* arg) { |
|
|
|
grpc_rb_channel_watch_connection_state_op_complete( |
|
|
|
grpc_rb_channel_watch_connection_state_op_complete( |
|
|
|
(watch_state_op*)event.tag, event.success); |
|
|
|
(watch_state_op*)event.tag, event.success); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
GPR_ASSERT(0); |
|
|
|
GRPC_RUBY_ASSERT(0); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
gpr_mu_unlock(&global_connection_polling_mu); |
|
|
|
gpr_mu_unlock(&global_connection_polling_mu); |
|
|
@ -769,9 +769,9 @@ static void do_basic_init() { |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
void grpc_rb_channel_polling_thread_start() { |
|
|
|
void grpc_rb_channel_polling_thread_start() { |
|
|
|
gpr_once_init(&g_once_init, do_basic_init); |
|
|
|
gpr_once_init(&g_once_init, do_basic_init); |
|
|
|
GPR_ASSERT(!RTEST(g_channel_polling_thread)); |
|
|
|
GRPC_RUBY_ASSERT(!RTEST(g_channel_polling_thread)); |
|
|
|
GPR_ASSERT(!g_abort_channel_polling); |
|
|
|
GRPC_RUBY_ASSERT(!g_abort_channel_polling); |
|
|
|
GPR_ASSERT(g_channel_polling_cq == NULL); |
|
|
|
GRPC_RUBY_ASSERT(g_channel_polling_cq == NULL); |
|
|
|
|
|
|
|
|
|
|
|
g_channel_polling_cq = grpc_completion_queue_create_for_next(NULL); |
|
|
|
g_channel_polling_cq = grpc_completion_queue_create_for_next(NULL); |
|
|
|
g_channel_polling_thread = rb_thread_create(run_poll_channels_loop, NULL); |
|
|
|
g_channel_polling_thread = rb_thread_create(run_poll_channels_loop, NULL); |
|
|
|