diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 5ad2e075c31..9d3690dc7d2 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -227,7 +227,6 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, if (old_lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, &old_lb_policy->interested_parties, &chand->interested_parties); - grpc_lb_policy_shutdown(exec_ctx, old_lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); } @@ -267,7 +266,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, chand->resolver = NULL; if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties); - grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); chand->lb_policy = NULL; } diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index f8c621b8ebb..f0682d59462 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -166,8 +166,9 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; - grpc_connected_subchannel_state_change_unsubscribe( - exec_ctx, chand->connected_subchannel, &chand->connectivity_cb); + /* cancel subscription */ + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, chand->connected_subchannel, NULL, &chand->connectivity_cb); grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); gpr_mu_destroy(&chand->mu_state); } diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index d83f3718c25..a622d98317c 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -96,11 +96,17 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); + gpr_log(GPR_DEBUG, "LB_POLICY: pf_shutdown: %p", p); p->shutdown = 1; pp = p->pending_picks; p->pending_picks = NULL; grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); + if (p->selected != NULL) { + grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected, NULL, &p->connectivity_changed); + } else { + grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[p->checking_connectivity], NULL, NULL, &p->connectivity_changed); + } gpr_mu_unlock(&p->mu); while (pp != NULL) { pending_pick *next = pp->next; @@ -139,7 +145,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { p->started_picking = 1; p->checking_subchannel = 0; p->checking_connectivity = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], &p->base.interested_parties, @@ -195,7 +201,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, p->subchannels = NULL; exclude_subchannel = p->selected; gpr_mu_unlock(&p->mu); - GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels"); for (i = 0; i < num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); @@ -212,9 +218,11 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&p->mu); + gpr_log(GPR_DEBUG, "LB_POLICY: pf_connectivity_changed: %p success=%d shutdown=%d", p, iomgr_success, p->shutdown); + if (p->shutdown) { gpr_mu_unlock(&p->mu); - GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); return; } else if (p->selected != NULL) { if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { @@ -228,7 +236,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, exec_ctx, p->selected, &p->checking_connectivity, &p->connectivity_changed); } else { - GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); } } else { loop: @@ -242,7 +250,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(p->selected); GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first"); /* drop the pick list: we are connected now */ - GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels"); + GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(destroy_subchannels, p), 1); /* update any calls that were waiting for a pick */ @@ -300,7 +308,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } - GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); } else { grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 2b874daef60..4208c15b3fc 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -33,41 +33,59 @@ #include "src/core/client_config/lb_policy.h" +#define WEAK_REF_BITS 16 + void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable) { policy->vtable = vtable; - gpr_ref_init(&policy->refs, 1); + gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS); grpc_pollset_set_init(&policy->interested_parties); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG -void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, - const char *reason) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p ref %d -> %d %s", - policy, (int)policy->refs.count, (int)policy->refs.count + 1, reason); +#define REF_FUNC_EXTRA_ARGS , const char *file, int line, const char *reason +#define REF_MUTATE_EXTRA_ARGS REF_FUNC_EXTRA_ARGS, const char *purpose +#define REF_FUNC_PASS_ARGS(new_reason) , file, line, new_reason +#define REF_MUTATE_PASS_ARGS(purpose) , file, line, reason, purpose #else -void grpc_lb_policy_ref(grpc_lb_policy *policy) { +#define REF_FUNC_EXTRA_ARGS +#define REF_MUTATE_EXTRA_ARGS +#define REF_FUNC_PASS_ARGS(new_reason) +#define REF_MUTATE_PASS_ARGS(x) #endif - gpr_ref(&policy->refs); -} +static gpr_atm ref_mutate(grpc_lb_policy *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { + gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG -void grpc_lb_policy_unref(grpc_lb_policy *policy, - grpc_closure_list *closure_list, const char *file, - int line, const char *reason) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p unref %d -> %d %s", - policy, (int)policy->refs.count, (int)policy->refs.count - 1, reason); -#else -void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason); #endif - if (gpr_unref(&policy->refs)) { - grpc_pollset_set_destroy(&policy->interested_parties); - policy->vtable->destroy(exec_ctx, policy); + return old_val; +} + +void grpc_lb_policy_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF")); +} + +void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + gpr_atm old_val = ref_mutate(policy, (gpr_atm)1-(gpr_atm)(1 << WEAK_REF_BITS), 1 REF_MUTATE_PASS_ARGS("STRONG_UNREF")); + gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1); + gpr_atm check = 1 << WEAK_REF_BITS; + if ((old_val & mask) == check) { + policy->vtable->shutdown(exec_ctx, policy); } + grpc_lb_policy_weak_unref(exec_ctx, policy REF_FUNC_PASS_ARGS("strong-unref")); +} + +void grpc_lb_policy_weak_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + ref_mutate(policy, 1, 0 REF_MUTATE_PASS_ARGS("WEAK_REF")); } -void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { - policy->vtable->shutdown(exec_ctx, policy); +void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + gpr_atm old_val = ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); + if (old_val == 1) { + grpc_pollset_set_destroy(&policy->interested_parties); + policy->vtable->destroy(exec_ctx, policy); + } } int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 96b2bdf4caa..988789c9346 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -47,7 +47,7 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; - gpr_refcount refs; + gpr_atm ref_pair; grpc_pollset_set interested_parties; }; @@ -78,29 +78,39 @@ struct grpc_lb_policy_vtable { grpc_closure *closure); }; +#define GRPC_LB_POLICY_REFCOUNT_DEBUG #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG #define GRPC_LB_POLICY_REF(p, r) \ grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) #define GRPC_LB_POLICY_UNREF(exec_ctx, p, r) \ grpc_lb_policy_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) +#define GRPC_LB_POLICY_WEAK_REF(p, r) \ + grpc_lb_policy_weak_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, p, r) \ + grpc_lb_policy_weak_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, const char *reason); void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const char *file, int line, const char *reason); +void grpc_lb_policy_weak_ref(grpc_lb_policy *policy, const char *file, int line, + const char *reason); +void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const char *file, int line, const char *reason); #else #define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p)) #define GRPC_LB_POLICY_UNREF(cl, p, r) grpc_lb_policy_unref((cl), (p)) +#define GRPC_LB_POLICY_WEAK_REF(p, r) grpc_lb_policy_weak_ref((p)) +#define GRPC_LB_POLICY_WEAK_UNREF(cl, p, r) grpc_lb_policy_weak_unref((cl), (p)) void grpc_lb_policy_ref(grpc_lb_policy *policy); void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); +void grpc_lb_policy_weak_ref(grpc_lb_policy *policy); +void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); #endif /** called by concrete implementations to initialize the base struct */ void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable); -/** Start shutting down (fail any pending picks) */ -void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); - /** Given initial metadata in \a initial_metadata, find an appropriate target for this rpc, and 'return' it by calling \a on_complete after setting \a target. diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 434a37cf6bb..9f802f1cc39 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -62,13 +62,19 @@ typedef struct { grpc_closure closure; - union { - grpc_subchannel *subchannel; - grpc_connected_subchannel *connected_subchannel; - } whom; + grpc_subchannel *subchannel; grpc_connectivity_state connectivity_state; } state_watcher; +typedef struct external_state_watcher { + grpc_subchannel *subchannel; + grpc_pollset_set *pollset_set; + grpc_closure *notify; + grpc_closure closure; + struct external_state_watcher *next; + struct external_state_watcher *prev; +} external_state_watcher; + struct grpc_subchannel { grpc_connector *connector; @@ -114,6 +120,8 @@ struct grpc_subchannel { /** connectivity state tracking */ grpc_connectivity_state_tracker state_tracker; + external_state_watcher root_external_state_watcher; + /** next connect attempt time */ gpr_timespec next_attempt; /** amount to backoff each failure */ @@ -201,7 +209,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c); } -gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { +static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifdef GRPC_STREAM_REFCOUNT_DEBUG gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason); @@ -277,6 +285,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, &c->initial_connect_string); c->args = grpc_channel_args_copy(args->args); c->random = random_seed(); + c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; grpc_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); @@ -316,17 +325,16 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { return state; } -typedef struct { - grpc_subchannel *subchannel; - grpc_pollset_set *pollset_set; - grpc_closure *notify; - grpc_closure closure; -} external_state_watcher; - static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, int success) { external_state_watcher *w = arg; grpc_closure *follow_up = w->notify; - grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set); + if (w->pollset_set != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set); + } + gpr_mu_lock(&w->subchannel->mu); + w->next->prev = w->prev; + w->prev->next = w->next; + gpr_mu_unlock(&w->subchannel->mu); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); gpr_free(w); follow_up->cb(exec_ctx, follow_up->cb_arg, success); @@ -338,37 +346,47 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_connectivity_state *state, grpc_closure *notify) { int do_connect = 0; - external_state_watcher *w = gpr_malloc(sizeof(*w)); - w->subchannel = c; - w->pollset_set = interested_parties; - w->notify = notify; - grpc_closure_init(&w->closure, on_external_state_watcher_done, w); - grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties); - GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); - gpr_mu_lock(&c->mu); - if (grpc_connectivity_state_notify_on_state_change( - exec_ctx, &c->state_tracker, state, &w->closure)) { - do_connect = 1; - c->connecting = 1; - /* released by connection */ - GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); + external_state_watcher *w; + + if (state == NULL) { + gpr_mu_lock(&c->mu); + for (w = c->root_external_state_watcher.next; + w != &c->root_external_state_watcher; + w = w->next) { + if (w->notify == notify) { + grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker, NULL, &w->closure); + } + } + gpr_mu_unlock(&c->mu); + } else { + w = gpr_malloc(sizeof(*w)); + w->subchannel = c; + w->pollset_set = interested_parties; + w->notify = notify; + grpc_closure_init(&w->closure, on_external_state_watcher_done, w); + if (interested_parties != NULL) { + grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties); + } + GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); + gpr_mu_lock(&c->mu); + w->next = &c->root_external_state_watcher; + w->prev = w->next->prev; + w->next->prev = w->prev->next = w; + if (grpc_connectivity_state_notify_on_state_change( + exec_ctx, &c->state_tracker, state, &w->closure)) { + do_connect = 1; + c->connecting = 1; + /* released by connection */ + GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); + } + gpr_mu_unlock(&c->mu); } - gpr_mu_unlock(&c->mu); if (do_connect) { start_connect(exec_ctx, c); } } -void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_closure *subscribed_notify) { - gpr_mu_lock(&c->mu); - grpc_connectivity_state_change_unsubscribe(exec_ctx, &c->state_tracker, - subscribed_notify); - gpr_mu_unlock(&c->mu); -} - void grpc_connected_subchannel_process_transport_op( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_transport_op *op) { @@ -380,7 +398,7 @@ void grpc_connected_subchannel_process_transport_op( static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, int iomgr_success) { state_watcher *sw = p; - grpc_subchannel *c = sw->whom.subchannel; + grpc_subchannel *c = sw->subchannel; gpr_mu *mu = &c->mu; gpr_mu_lock(mu); @@ -423,16 +441,9 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, void grpc_connected_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_connectivity_state *state, grpc_closure *closure) { - GPR_ASSERT(state != NULL); connected_subchannel_state_op(exec_ctx, con, state, closure); } -void grpc_connected_subchannel_state_change_unsubscribe( - grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_closure *closure) { - connected_subchannel_state_op(exec_ctx, con, NULL, closure); -} - static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { size_t channel_stack_size; grpc_connected_subchannel *con; @@ -461,7 +472,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { /* initialize state watcher */ sw_subchannel = gpr_malloc(sizeof(*sw_subchannel)); - sw_subchannel->whom.subchannel = c; + sw_subchannel->subchannel = c; sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, sw_subchannel); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 66c13990e9e..b735d2ccab3 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -124,15 +124,6 @@ void grpc_connected_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, grpc_connectivity_state *state, grpc_closure *notify); -/** Remove \a subscribed_notify from the list of closures to be called on a - * state change if present. */ -void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel, - grpc_closure *subscribed_notify); -void grpc_connected_subchannel_state_change_unsubscribe( - grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, - grpc_closure *subscribed_notify); - /** retrieve the grpc_connected_subchannel - or NULL if called before the subchannel becomes connected */ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index dc917ebbc0f..8717aa51039 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -168,6 +168,7 @@ void grpc_fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd); void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd); /* Reference counting for fds */ +#define GRPC_FD_REF_COUNT_DEBUG #ifdef GRPC_FD_REF_COUNT_DEBUG void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line); void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file, int line); diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 212ce5534dd..5bba40bc9b3 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -116,6 +116,7 @@ void grpc_iomgr_shutdown(void) { "memory leaks are likely", count_objects()); dump_objects("LEAKED"); + abort(); } break; } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index ff0c6163867..91c50dd2cbc 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -911,15 +911,9 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); if (op->on_connectivity_state_change != NULL) { - if (op->connectivity_state != NULL) { - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, - op->on_connectivity_state_change); - } else { - grpc_connectivity_state_change_unsubscribe( - exec_ctx, &t->channel_callback.state_tracker, - op->on_connectivity_state_change); - } + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, + op->on_connectivity_state_change); } if (op->send_goaway) { diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index a435ba49f27..569d3c30b24 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -98,42 +98,47 @@ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify) { if (grpc_connectivity_state_trace) { - gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", - tracker, tracker->name, grpc_connectivity_state_name(*current), - grpc_connectivity_state_name(tracker->current_state), notify); + if (current == NULL) { + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", + tracker, tracker->name, notify); + } else { + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", + tracker, tracker->name, grpc_connectivity_state_name(*current), + grpc_connectivity_state_name(tracker->current_state), notify); + } } - if (tracker->current_state != *current) { - *current = tracker->current_state; - grpc_exec_ctx_enqueue(exec_ctx, notify, 1); + if (current == NULL) { + grpc_connectivity_state_watcher *w = tracker->watchers; + if (w != NULL && w->notify == notify) { + grpc_exec_ctx_enqueue(exec_ctx, notify, 0); + tracker->watchers = w->next; + gpr_free(w); + return 0; + } + while (w != NULL) { + grpc_connectivity_state_watcher *rm_candidate = w->next; + if (rm_candidate != NULL && rm_candidate->notify == notify) { + grpc_exec_ctx_enqueue(exec_ctx, notify, 0); + w->next = w->next->next; + gpr_free(rm_candidate); + return 0; + } + w = w->next; + } + return 0; } else { - grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); - w->current = current; - w->notify = notify; - w->next = tracker->watchers; - tracker->watchers = w; - } - return tracker->current_state == GRPC_CHANNEL_IDLE; -} - -int grpc_connectivity_state_change_unsubscribe( - grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, - grpc_closure *subscribed_notify) { - grpc_connectivity_state_watcher *w = tracker->watchers; - if (w != NULL && w->notify == subscribed_notify) { - tracker->watchers = w->next; - gpr_free(w); - return 1; - } - while (w != NULL) { - grpc_connectivity_state_watcher *rm_candidate = w->next; - if (rm_candidate != NULL && rm_candidate->notify == subscribed_notify) { - w->next = w->next->next; - gpr_free(rm_candidate); - return 1; + if (tracker->current_state != *current) { + *current = tracker->current_state; + grpc_exec_ctx_enqueue(exec_ctx, notify, 1); + } else { + grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); + w->current = current; + w->notify = notify; + w->next = tracker->watchers; + tracker->watchers = w; } - w = w->next; + return tracker->current_state == GRPC_CHANNEL_IDLE; } - return 0; } void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h index 119b1c1554d..d8b5b38da0e 100644 --- a/src/core/transport/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -73,16 +73,11 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker); -/** Return 1 if the channel should start connecting, 0 otherwise */ +/** Return 1 if the channel should start connecting, 0 otherwise. + If current==NULL cancel notify if it is already queued (success==0 in that + case) */ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify); -/** Remove \a subscribed_notify from the list of closures to be called on a - * state change if present, returning 1. Otherwise, nothing is done and return - * 0. */ -int grpc_connectivity_state_change_unsubscribe( - grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, - grpc_closure *subscribed_notify); - #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */ diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 4de72d7d75f..08f34ff0aaa 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -50,6 +50,8 @@ typedef struct grpc_transport grpc_transport; for a stream. */ typedef struct grpc_stream grpc_stream; +#define GRPC_STREAM_REFCOUNT_DEBUG + typedef struct grpc_stream_refcount { gpr_refcount refs; grpc_closure destroy;