@ -68,29 +68,53 @@ static VALUE grpc_rb_cChannel = Qnil;
/* Used during the conversion of a hash to channel args during channel setup */
static VALUE grpc_rb_cChannelArgs ;
typedef struct bg_watched_channel {
grpc_channel * channel ;
// these fields must only be accessed under global_connection_polling_mu
struct bg_watched_channel * next ;
int channel_destroyed ;
int refcount ;
} bg_watched_channel ;
/* grpc_rb_channel wraps a grpc_channel. */
typedef struct grpc_rb_channel {
VALUE credentials ;
/* The actual channel */
grpc_channel * wrapped ;
int request_safe_destroy ;
int safe_to_destroy ;
grpc_connectivity_state current_connectivity_state ;
int mu_init_done ;
int abort_watch_connectivity_state ;
gpr_mu channel_mu ;
gpr_cv channel_cv ;
/* The actual channel (protected in a wrapper to tell when it's safe to
* destroy ) */
bg_watched_channel * bg_wrapped ;
} grpc_rb_channel ;
/* Forward declarations of functions involved in temporary fix to
* https : //github.com/grpc/grpc/issues/9941 */
typedef enum { CONTINUOUS_WATCH , WATCH_STATE_API } watch_state_op_type ;
typedef struct watch_state_op {
watch_state_op_type op_type ;
// from event.success
union {
struct {
int success ;
// has been called back due to a cq next call
int called_back ;
} api_callback_args ;
struct {
bg_watched_channel * bg ;
} continuous_watch_callback_args ;
} op ;
} watch_state_op ;
static bg_watched_channel * bg_watched_channel_list_head = NULL ;
static void grpc_rb_channel_try_register_connection_polling (
grpc_rb_channel * wrapper ) ;
static void grpc_rb_channel_safe_destroy ( grpc_rb_channel * wrapper ) ;
bg_watched_channel * bg ) ;
static void * wait_until_channel_polling_thread_started_no_gil ( void * ) ;
static void wait_until_channel_polling_thread_started_unblocking_func ( void * ) ;
static void * channel_init_try_register_connection_polling_without_gil (
void * arg ) ;
typedef struct channel_init_try_register_stack {
grpc_channel * channel ;
grpc_rb_channel * wrapper ;
} channel_init_try_register_stack ;
static grpc_completion_queue * channel_polling_cq ;
static gpr_mu global_connection_polling_mu ;
@ -98,6 +122,42 @@ static gpr_cv global_connection_polling_cv;
static int abort_channel_polling = 0 ;
static int channel_polling_thread_started = 0 ;
static int bg_watched_channel_list_lookup ( bg_watched_channel * bg ) ;
static bg_watched_channel * bg_watched_channel_list_create_and_add (
grpc_channel * channel ) ;
static void bg_watched_channel_list_free_and_remove ( bg_watched_channel * bg ) ;
static void run_poll_channels_loop_unblocking_func ( void * arg ) ;
// Needs to be called under global_connection_polling_mu
static void grpc_rb_channel_watch_connection_state_op_complete (
watch_state_op * op , int success ) {
GPR_ASSERT ( ! op - > op . api_callback_args . called_back ) ;
op - > op . api_callback_args . called_back = 1 ;
op - > op . api_callback_args . success = success ;
// wake up the watch API call thats waiting on this op
gpr_cv_broadcast ( & global_connection_polling_cv ) ;
}
/* Avoids destroying a channel twice. */
static void grpc_rb_channel_safe_destroy ( bg_watched_channel * bg ) {
gpr_mu_lock ( & global_connection_polling_mu ) ;
GPR_ASSERT ( bg_watched_channel_list_lookup ( bg ) ) ;
if ( ! bg - > channel_destroyed ) {
grpc_channel_destroy ( bg - > channel ) ;
bg - > channel_destroyed = 1 ;
}
bg - > refcount - - ;
if ( bg - > refcount = = 0 ) {
bg_watched_channel_list_free_and_remove ( bg ) ;
}
gpr_mu_unlock ( & global_connection_polling_mu ) ;
}
static void * channel_safe_destroy_without_gil ( void * arg ) {
grpc_rb_channel_safe_destroy ( ( bg_watched_channel * ) arg ) ;
return NULL ;
}
/* Destroys Channel instances. */
static void grpc_rb_channel_free ( void * p ) {
grpc_rb_channel * ch = NULL ;
@ -106,14 +166,13 @@ static void grpc_rb_channel_free(void *p) {
} ;
ch = ( grpc_rb_channel * ) p ;
if ( ch - > wrapped ! = NULL ) {
grpc_rb_channel_safe_destroy ( ch ) ;
ch - > wrapped = NULL ;
}
if ( ch - > mu_init_done ) {
gpr_mu_destroy ( & ch - > channel_mu ) ;
gpr_cv_destroy ( & ch - > channel_cv ) ;
if ( ch - > bg_wrapped ! = NULL ) {
/* assumption made here: it's ok to directly gpr_mu_lock the global
* connection polling mutex becuse we ' re in a finalizer ,
* and we can count on this thread to not be interrupted or
* yield the gil . */
grpc_rb_channel_safe_destroy ( ch - > bg_wrapped ) ;
ch - > bg_wrapped = NULL ;
}
xfree ( p ) ;
@ -146,7 +205,7 @@ static rb_data_type_t grpc_channel_data_type = {"grpc_channel",
/* Allocates grpc_rb_channel instances. */
static VALUE grpc_rb_channel_alloc ( VALUE cls ) {
grpc_rb_channel * wrapper = ALLOC ( grpc_rb_channel ) ;
wrapper - > wrapped = NULL ;
wrapper - > bg_ wrapped = NULL ;
wrapper - > credentials = Qnil ;
return TypedData_Wrap_Struct ( cls , & grpc_channel_data_type , wrapper ) ;
}
@ -168,18 +227,21 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
grpc_channel_credentials * creds = NULL ;
char * target_chars = NULL ;
grpc_channel_args args ;
channel_init_try_register_stack stack ;
int stop_waiting_for_thread_start = 0 ;
MEMZERO ( & args , grpc_channel_args , 1 ) ;
grpc_ruby_once_init ( ) ;
rb_thread_call_without_gvl (
wait_until_channel_polling_thread_started_no_gil , NULL ,
wait_until_channel_polling_thread_started_unblocking_func , NULL ) ;
wait_until_channel_polling_thread_started_no_gil ,
& stop_waiting_for_thread_start ,
wait_until_channel_polling_thread_started_unblocking_func ,
& stop_waiting_for_thread_start ) ;
/* "3" == 3 mandatory args */
rb_scan_args ( argc , argv , " 3 " , & target , & channel_args , & credentials ) ;
TypedData_Get_Struct ( self , grpc_rb_channel , & grpc_channel_data_type , wrapper ) ;
wrapper - > mu_init_done = 0 ;
target_chars = StringValueCStr ( target ) ;
grpc_rb_hash_convert_to_channel_args ( channel_args , & args ) ;
if ( TYPE ( credentials ) = = T_SYMBOL ) {
@ -196,24 +258,11 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
}
GPR_ASSERT ( ch ) ;
wrapper - > wrapped = ch ;
gpr_mu_init ( & wrapper - > channel_mu ) ;
gpr_cv_init ( & wrapper - > channel_cv ) ;
wrapper - > mu_init_done = 1 ;
gpr_mu_lock ( & wrapper - > channel_mu ) ;
wrapper - > abort_watch_connectivity_state = 0 ;
wrapper - > current_connectivity_state =
grpc_channel_check_connectivity_state ( wrapper - > wrapped , 0 ) ;
wrapper - > safe_to_destroy = 0 ;
wrapper - > request_safe_destroy = 0 ;
gpr_cv_broadcast ( & wrapper - > channel_cv ) ;
gpr_mu_unlock ( & wrapper - > channel_mu ) ;
grpc_rb_channel_try_register_connection_polling ( wrapper ) ;
stack . channel = ch ;
stack . wrapper = wrapper ;
rb_thread_call_without_gvl (
channel_init_try_register_connection_polling_without_gil , & stack , NULL ,
NULL ) ;
if ( args . args ! = NULL ) {
xfree ( args . args ) ; /* Allocated by grpc_rb_hash_convert_to_channel_args */
@ -224,10 +273,31 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
return Qnil ;
}
rb_ivar_set ( self , id_target , target ) ;
wrapper - > wrapped = ch ;
return self ;
}
typedef struct get_state_stack {
bg_watched_channel * bg ;
int try_to_connect ;
int out ;
} get_state_stack ;
static void * get_state_without_gil ( void * arg ) {
get_state_stack * stack = ( get_state_stack * ) arg ;
gpr_mu_lock ( & global_connection_polling_mu ) ;
GPR_ASSERT ( abort_channel_polling | | channel_polling_thread_started ) ;
if ( stack - > bg - > channel_destroyed ) {
stack - > out = GRPC_CHANNEL_SHUTDOWN ;
} else {
stack - > out = grpc_channel_check_connectivity_state ( stack - > bg - > channel ,
stack - > try_to_connect ) ;
}
gpr_mu_unlock ( & global_connection_polling_mu ) ;
return NULL ;
}
/*
call - seq :
ch . connectivity_state - > state
@ -240,59 +310,69 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
static VALUE grpc_rb_channel_get_connectivity_state ( int argc , VALUE * argv ,
VALUE self ) {
VALUE try_to_connect_param = Qfalse ;
int grpc_try_to_connect = 0 ;
grpc_rb_channel * wrapper = NULL ;
grpc_channel * ch = NULL ;
get_state_stack stack ;
/* "01" == 0 mandatory args, 1 (try_to_connect) is optional */
rb_scan_args ( argc , argv , " 01 " , & try_to_connect_param ) ;
grpc_try_to_connect = RTEST ( try_to_connect_param ) ? 1 : 0 ;
TypedData_Get_Struct ( self , grpc_rb_channel , & grpc_channel_data_type , wrapper ) ;
ch = wrapper - > wrapped ;
if ( ch = = NULL ) {
if ( wrapper - > bg_wrapped = = NULL ) {
rb_raise ( rb_eRuntimeError , " closed! " ) ;
return Qnil ;
}
return LONG2NUM ( grpc_channel_check_connectivity_state ( wrapper - > wrapped ,
grpc_try_to_connect ) ) ;
stack . bg = wrapper - > bg_wrapped ;
stack . try_to_connect = RTEST ( try_to_connect_param ) ? 1 : 0 ;
rb_thread_call_without_gvl ( get_state_without_gil , & stack , NULL , NULL ) ;
return LONG2NUM ( stack . out ) ;
}
typedef struct watch_state_stack {
grpc_rb_channel * wrapper ;
grpc_channel * channel ;
gpr_timespec deadline ;
int last_state ;
} watch_state_stack ;
static void * watch_channel_sta te_without_gvl ( void * arg ) {
static void * wait_for_watch_state_op_comple te_without_gvl ( void * arg ) {
watch_state_stack * stack = ( watch_state_stack * ) arg ;
gpr_timespec deadline = stack - > deadline ;
grpc_rb_channel * wrapper = stack - > wrapper ;
int last_state = stack - > last_state ;
void * return_value = ( void * ) 0 ;
watch_state_op * op = NULL ;
void * success = ( void * ) 0 ;
gpr_mu_lock ( & wrapper - > channel _mu) ;
while ( wrapper - > current_connectivity_state = = last_state & &
! wrapper - > request_safe_destroy & & ! wrapper - > safe_to_destroy & &
! wrapper - > abort_watch_connectivity_state & &
gpr_time_cmp ( deadline , gpr_now ( GPR_CLOCK_REALTIME ) ) > 0 ) {
gpr_cv_wait ( & wrapper - > channel_cv , & wrapper - > channel_mu , deadline ) ;
gpr_mu_lock ( & global_connection_polling_mu ) ;
// its unsafe to do a "watch" after "channel polling abort" because the cq has
// been shut down.
if ( abort_channel_polling ) {
gpr_mu_unlock ( & global_connection_polling_mu ) ;
return ( void * ) 0 ;
}
if ( wrapper - > current_connectivity_state ! = last_state ) {
return_value = ( void * ) 1 ;
op = gpr_zalloc ( sizeof ( watch_state_op ) ) ;
op - > op_type = WATCH_STATE_API ;
grpc_channel_watch_connectivity_state ( stack - > channel , stack - > last_state ,
stack - > deadline , channel_polling_cq ,
op ) ;
while ( ! op - > op . api_callback_args . called_back ) {
gpr_cv_wait ( & global_connection_polling_cv , & global_connection_polling_mu ,
gpr_inf_future ( GPR_CLOCK_REALTIME ) ) ;
}
gpr_mu_unlock ( & wrapper - > channel_mu ) ;
if ( op - > op . api_callback_args . success ) {
success = ( void * ) 1 ;
}
gpr_free ( op ) ;
gpr_mu_unlock ( & global_connection_polling_mu ) ;
return return_value ;
return success ;
}
static void watch_channel_state_unblocking_func ( void * arg ) {
grpc_rb_channel * wrapper = ( grpc_rb_channel * ) arg ;
gpr_log ( GPR_DEBUG , " GRPC_RUBY: watch channel state unblocking func called " ) ;
gpr_mu_lock ( & wrapper - > channel_mu ) ;
wrapper - > abort_watch_connectivity_state = 1 ;
gpr_cv_broadcast ( & wrapper - > channel_cv ) ;
gpr_mu_unlock ( & wrapper - > channel _mu) ;
static void wait_for_watch_state_op_complete_unblocking_func ( void * arg ) {
bg_watched_channel * bg = ( bg_watched_channel * ) arg ;
gpr_mu_lock ( & global_connection_polling_mu ) ;
if ( ! bg - > channel_destroyed ) {
grpc_channel_destroy ( bg - > channel ) ;
bg - > channel_destroyed = 1 ;
}
gpr_mu_unlock ( & global_connection_polling _mu) ;
}
/* Wait until the channel's connectivity state becomes different from
@ -307,11 +387,11 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
VALUE deadline ) {
grpc_rb_channel * wrapper = NULL ;
watch_state_stack stack ;
void * out ;
void * op_success = 0 ;
TypedData_Get_Struct ( self , grpc_rb_channel , & grpc_channel_data_type , wrapper ) ;
if ( wrapper - > wrapped = = NULL ) {
if ( wrapper - > bg_ wrapped = = NULL ) {
rb_raise ( rb_eRuntimeError , " closed! " ) ;
return Qnil ;
}
@ -323,16 +403,15 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
return Qnil ;
}
stack . wrapper = wrapper ;
stack . deadline = grpc_rb_time_timeval ( deadline , 0 ) ;
stack . channel = wrapper - > bg_wrapped - > channel ;
stack . deadline = grpc_rb_time_timeval ( deadline , 0 ) ,
stack . last_state = NUM2LONG ( last_state ) ;
out =
rb_thread_call_without_gvl ( watch_channel_state_without_gvl , & stack ,
watch_channel_state_unblocking_func , wrapper ) ;
if ( out ) {
return Qtrue ;
}
return Qfalse ;
op_success = rb_thread_call_without_gvl (
wait_for_watch_state_op_complete_without_gvl , & stack ,
wait_for_watch_state_op_complete_unblocking_func , wrapper - > bg_wrapped ) ;
return op_success ? Qtrue : Qfalse ;
}
/* Create a call given a grpc_channel, in order to call method. The request
@ -344,7 +423,6 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
grpc_rb_channel * wrapper = NULL ;
grpc_call * call = NULL ;
grpc_call * parent_call = NULL ;
grpc_channel * ch = NULL ;
grpc_completion_queue * cq = NULL ;
int flags = GRPC_PROPAGATE_DEFAULTS ;
grpc_slice method_slice ;
@ -366,8 +444,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
cq = grpc_completion_queue_create_for_pluck ( NULL ) ;
TypedData_Get_Struct ( self , grpc_rb_channel , & grpc_channel_data_type , wrapper ) ;
ch = wrapper - > wrapped ;
if ( ch = = NULL ) {
if ( wrapper - > bg_wrapped = = NULL ) {
rb_raise ( rb_eRuntimeError , " closed! " ) ;
return Qnil ;
}
@ -375,8 +452,8 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
method_slice =
grpc_slice_from_copied_buffer ( RSTRING_PTR ( method ) , RSTRING_LEN ( method ) ) ;
call = grpc_channel_create_call ( ch , parent_call , flags , cq , method_slice ,
host_slice_ptr ,
call = grpc_channel_create_call ( wrapper - > bg_wrapped - > channel , parent_call ,
flags , cq , method_slice , host_slice_ptr ,
grpc_rb_time_timeval ( deadline ,
/* absolute time */ 0 ) ,
NULL ) ;
@ -401,15 +478,16 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
}
/* Closes the channel, calling it's destroy method */
/* Note this is an API-level call; a wrapped channel's finalizer doesn't call
* this */
static VALUE grpc_rb_channel_destroy ( VALUE self ) {
grpc_rb_channel * wrapper = NULL ;
grpc_channel * ch = NULL ;
TypedData_Get_Struct ( self , grpc_rb_channel , & grpc_channel_data_type , wrapper ) ;
ch = wrapper - > wrapped ;
if ( ch ! = NULL ) {
grpc_rb_channel_safe_destroy ( wrapper ) ;
wrapper - > wrapped = NULL ;
if ( wrapper - > bg_wrapped ! = NULL ) {
rb_thread_call_without_gvl ( channel_safe_destroy_without_gil ,
wrapper - > bg_wrapped , NULL , NULL ) ;
wrapper - > bg_ wrapped = NULL ;
}
return Qnil ;
@ -422,64 +500,110 @@ static VALUE grpc_rb_channel_get_target(VALUE self) {
char * target = NULL ;
TypedData_Get_Struct ( self , grpc_rb_channel , & grpc_channel_data_type , wrapper ) ;
target = grpc_channel_get_target ( wrapper - > wrapped ) ;
target = grpc_channel_get_target ( wrapper - > bg_ wrapped- > channel ) ;
res = rb_str_new2 ( target ) ;
gpr_free ( target ) ;
return res ;
}
// Either start polling channel connection state or signal that it's free to
// destroy.
// Not safe to call while a channel's connection state is polled.
static void grpc_rb_channel_try_register_connection_polling (
grpc_rb_channel * wrapper ) {
grpc_connectivity_state conn_state ;
gpr_timespec sleep_time = gpr_time_add (
gpr_now ( GPR_CLOCK_REALTIME ) , gpr_time_from_millis ( 20 , GPR_TIMESPAN ) ) ;
GPR_ASSERT ( wrapper ) ;
GPR_ASSERT ( wrapper - > wrapped ) ;
gpr_mu_lock ( & wrapper - > channel_mu ) ;
if ( wrapper - > request_safe_destroy ) {
wrapper - > safe_to_destroy = 1 ;
gpr_cv_broadcast ( & wrapper - > channel_cv ) ;
gpr_mu_unlock ( & wrapper - > channel_mu ) ;
return ;
/* Needs to be called under global_connection_polling_mu */
static int bg_watched_channel_list_lookup ( bg_watched_channel * target ) {
bg_watched_channel * cur = bg_watched_channel_list_head ;
while ( cur ! = NULL ) {
if ( cur = = target ) {
return 1 ;
}
cur = cur - > next ;
}
gpr_mu_lock ( & global_connection_polling_mu ) ;
GPR_ASSERT ( channel_polling_thread_started | | abort_channel_polling ) ;
conn_state = grpc_channel_check_connectivity_state ( wrapper - > wrapped , 0 ) ;
if ( conn_state ! = wrapper - > current_connectivity_state ) {
wrapper - > current_connectivity_state = conn_state ;
gpr_cv_broadcast ( & wrapper - > channel_cv ) ;
}
// avoid posting work to the channel polling cq if it's been shutdown
if ( ! abort_channel_polling & & conn_state ! = GRPC_CHANNEL_SHUTDOWN ) {
grpc_channel_watch_connectivity_state (
wrapper - > wrapped , conn_state , sleep_time , channel_polling_cq , wrapper ) ;
} else {
wrapper - > safe_to_destroy = 1 ;
gpr_cv_broadcast ( & wrapper - > channel_cv ) ;
return 0 ;
}
/* Needs to be called under global_connection_polling_mu */
static bg_watched_channel * bg_watched_channel_list_create_and_add (
grpc_channel * channel ) {
bg_watched_channel * watched = gpr_zalloc ( sizeof ( bg_watched_channel ) ) ;
watched - > channel = channel ;
watched - > next = bg_watched_channel_list_head ;
watched - > refcount = 1 ;
bg_watched_channel_list_head = watched ;
return watched ;
}
/* Needs to be called under global_connection_polling_mu */
static void bg_watched_channel_list_free_and_remove (
bg_watched_channel * target ) {
bg_watched_channel * bg = NULL ;
GPR_ASSERT ( bg_watched_channel_list_lookup ( target ) ) ;
GPR_ASSERT ( target - > channel_destroyed & & target - > refcount = = 0 ) ;
if ( bg_watched_channel_list_head = = target ) {
bg_watched_channel_list_head = target - > next ;
gpr_free ( target ) ;
return ;
}
bg = bg_watched_channel_list_head ;
while ( bg ! = NULL & & bg - > next ! = NULL ) {
if ( bg - > next = = target ) {
bg - > next = bg - > next - > next ;
gpr_free ( target ) ;
return ;
}
bg = bg - > next ;
}
GPR_ASSERT ( 0 ) ;
}
/* Initialize a grpc_rb_channel's "protected grpc_channel" and try to push
* it onto the background thread for constant watches . */
static void * channel_init_try_register_connection_polling_without_gil (
void * arg ) {
channel_init_try_register_stack * stack =
( channel_init_try_register_stack * ) arg ;
gpr_mu_lock ( & global_connection_polling_mu ) ;
stack - > wrapper - > bg_wrapped =
bg_watched_channel_list_create_and_add ( stack - > channel ) ;
grpc_rb_channel_try_register_connection_polling ( stack - > wrapper - > bg_wrapped ) ;
gpr_mu_unlock ( & global_connection_polling_mu ) ;
gpr_mu_unlock ( & wrapper - > channel_mu ) ;
return NULL ;
}
// Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized
static void grpc_rb_channel_safe_destroy ( grpc_rb_channel * wrapper ) {
gpr_mu_lock ( & wrapper - > channel_mu ) ;
wrapper - > request_safe_destroy = 1 ;
// Needs to be called under global_connection_poolling_mu
static void grpc_rb_channel_try_register_connection_polling (
bg_watched_channel * bg ) {
grpc_connectivity_state conn_state ;
watch_state_op * op = NULL ;
while ( ! wrapper - > safe_to_destroy ) {
gpr_cv_wait ( & wrapper - > channel_cv , & wrapper - > channel_mu ,
gpr_inf_future ( GPR_CLOCK_REALTIME ) ) ;
GPR_ASSERT ( channel_polling_thread_started | | abort_channel_polling ) ;
if ( bg - > refcount = = 0 ) {
GPR_ASSERT ( bg - > channel_destroyed ) ;
bg_watched_channel_list_free_and_remove ( bg ) ;
return ;
}
GPR_ASSERT ( bg - > refcount = = 1 ) ;
if ( bg - > channel_destroyed | | abort_channel_polling ) {
return ;
}
GPR_ASSERT ( wrapper - > safe_to_destroy ) ;
gpr_mu_unlock ( & wrapper - > channel_mu ) ;
grpc_channel_destroy ( wrapper - > wrapped ) ;
conn_state = grpc_channel_check_connectivity_state ( bg - > channel , 0 ) ;
if ( conn_state = = GRPC_CHANNEL_SHUTDOWN ) {
return ;
}
GPR_ASSERT ( bg_watched_channel_list_lookup ( bg ) ) ;
// prevent bg from being free'd by GC while background thread is watching it
bg - > refcount + + ;
op = gpr_zalloc ( sizeof ( watch_state_op ) ) ;
op - > op_type = CONTINUOUS_WATCH ;
op - > op . continuous_watch_callback_args . bg = bg ;
grpc_channel_watch_connectivity_state ( bg - > channel , conn_state ,
gpr_inf_future ( GPR_CLOCK_REALTIME ) ,
channel_polling_cq , op ) ;
}
// Note this loop breaks out with a single call of
@ -490,6 +614,8 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
// early and falls back to current behavior.
static void * run_poll_channels_loop_no_gil ( void * arg ) {
grpc_event event ;
watch_state_op * op = NULL ;
bg_watched_channel * bg = NULL ;
( void ) arg ;
gpr_log ( GPR_DEBUG , " GRPC_RUBY: run_poll_channels_loop_no_gil - begin " ) ;
@ -505,10 +631,22 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
if ( event . type = = GRPC_QUEUE_SHUTDOWN ) {
break ;
}
gpr_mu_lock ( & global_connection_polling_mu ) ;
if ( event . type = = GRPC_OP_COMPLETE ) {
grpc_rb_channel_try_register_connection_polling (
( grpc_rb_channel * ) event . tag ) ;
op = ( watch_state_op * ) event . tag ;
if ( op - > op_type = = CONTINUOUS_WATCH ) {
bg = ( bg_watched_channel * ) op - > op . continuous_watch_callback_args . bg ;
bg - > refcount - - ;
grpc_rb_channel_try_register_connection_polling ( bg ) ;
gpr_free ( op ) ;
} else if ( op - > op_type = = WATCH_STATE_API ) {
grpc_rb_channel_watch_connection_state_op_complete (
( watch_state_op * ) event . tag , event . success ) ;
} else {
GPR_ASSERT ( 0 ) ;
}
}
gpr_mu_unlock ( & global_connection_polling_mu ) ;
}
grpc_completion_queue_destroy ( channel_polling_cq ) ;
gpr_log ( GPR_DEBUG ,
@ -519,14 +657,36 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
// Notify the channel polling loop to cleanup and shutdown.
static void run_poll_channels_loop_unblocking_func ( void * arg ) {
bg_watched_channel * bg = NULL ;
( void ) arg ;
gpr_mu_lock ( & global_connection_polling_mu ) ;
gpr_log ( GPR_DEBUG ,
" GRPC_RUBY: grpc_rb_event _unblocking_func - begin aborting "
" GRPC_RUBY: run_poll_channels_loop _unblocking_func - begin aborting "
" connection polling " ) ;
// early out after first time through
if ( abort_channel_polling ) {
gpr_mu_unlock ( & global_connection_polling_mu ) ;
return ;
}
abort_channel_polling = 1 ;
// force pending watches to end by switching to shutdown state
bg = bg_watched_channel_list_head ;
while ( bg ! = NULL ) {
if ( ! bg - > channel_destroyed ) {
grpc_channel_destroy ( bg - > channel ) ;
bg - > channel_destroyed = 1 ;
}
bg = bg - > next ;
}
grpc_completion_queue_shutdown ( channel_polling_cq ) ;
gpr_cv_broadcast ( & global_connection_polling_cv ) ;
gpr_mu_unlock ( & global_connection_polling_mu ) ;
gpr_log ( GPR_DEBUG ,
" GRPC_RUBY: run_poll_channels_loop_unblocking_func - end aborting "
" connection polling " ) ;
}
// Poll channel connectivity states in background thread without the GIL.
@ -542,10 +702,11 @@ static VALUE run_poll_channels_loop(VALUE arg) {
}
static void * wait_until_channel_polling_thread_started_no_gil ( void * arg ) {
( void ) arg ;
int * stop_waiting = ( int * ) arg ;
gpr_log ( GPR_DEBUG , " GRPC_RUBY: wait for channel polling thread to start " ) ;
gpr_mu_lock ( & global_connection_polling_mu ) ;
while ( ! channel_polling_thread_started & & ! abort_channel_polling ) {
while ( ! channel_polling_thread_started & & ! abort_channel_polling & &
! * stop_waiting ) {
gpr_cv_wait ( & global_connection_polling_cv , & global_connection_polling_mu ,
gpr_inf_future ( GPR_CLOCK_REALTIME ) ) ;
}
@ -556,15 +717,22 @@ static void *wait_until_channel_polling_thread_started_no_gil(void *arg) {
static void wait_until_channel_polling_thread_started_unblocking_func (
void * arg ) {
( void ) arg ;
int * stop_waiting = ( int * ) arg ;
gpr_mu_lock ( & global_connection_polling_mu ) ;
gpr_log ( GPR_DEBUG ,
" GRPC_RUBY: "
" wait_until_channel_polling_thread_started_unblocking_func - begin "
" aborting connection polling " ) ;
" GRPC_RUBY: interrupt wait for channel polling thread to start " ) ;
* stop_waiting = 1 ;
gpr_cv_broadcast ( & global_connection_polling_cv ) ;
gpr_mu_unlock ( & global_connection_polling_mu ) ;
}
static void * set_abort_channel_polling_without_gil ( void * arg ) {
( void ) arg ;
gpr_mu_lock ( & global_connection_polling_mu ) ;
abort_channel_polling = 1 ;
gpr_cv_broadcast ( & global_connection_polling_cv ) ;
gpr_mu_unlock ( & global_connection_polling_mu ) ;
return NULL ;
}
/* Temporary fix for
@ -592,10 +760,8 @@ void grpc_rb_channel_polling_thread_start() {
if ( ! RTEST ( background_thread ) ) {
gpr_log ( GPR_DEBUG , " GRPC_RUBY: failed to spawn channel polling thread " ) ;
gpr_mu_lock ( & global_connection_polling_mu ) ;
abort_channel_polling = 1 ;
gpr_cv_broadcast ( & global_connection_polling_cv ) ;
gpr_mu_unlock ( & global_connection_polling_mu ) ;
rb_thread_call_without_gvl ( set_abort_channel_polling_without_gil , NULL ,
NULL , NULL ) ;
}
}
@ -674,5 +840,5 @@ void Init_grpc_channel() {
grpc_channel * grpc_rb_get_wrapped_channel ( VALUE v ) {
grpc_rb_channel * wrapper = NULL ;
TypedData_Get_Struct ( v , grpc_rb_channel , & grpc_channel_data_type , wrapper ) ;
return wrapper - > wrapped ;
return wrapper - > bg_ wrapped- > channel ;
}