Sanitize unsubscription to be callback preserving

reviewable/pr4232/r5
Craig Tiller 9 years ago
parent 50ec2670a4
commit 486130455f
  1. 2
      src/core/channel/client_channel.c
  2. 5
      src/core/channel/client_uchannel.c
  3. 20
      src/core/client_config/lb_policies/pick_first.c
  4. 58
      src/core/client_config/lb_policy.c
  5. 18
      src/core/client_config/lb_policy.h
  6. 103
      src/core/client_config/subchannel.c
  7. 9
      src/core/client_config/subchannel.h
  8. 1
      src/core/iomgr/fd_posix.h
  9. 1
      src/core/iomgr/iomgr.c
  10. 12
      src/core/transport/chttp2_transport.c
  11. 69
      src/core/transport/connectivity_state.c
  12. 11
      src/core/transport/connectivity_state.h
  13. 2
      src/core/transport/transport.h

@ -227,7 +227,6 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (old_lb_policy != NULL) { if (old_lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx, &old_lb_policy->interested_parties, &chand->interested_parties); grpc_pollset_set_del_pollset_set(exec_ctx, &old_lb_policy->interested_parties, &chand->interested_parties);
grpc_lb_policy_shutdown(exec_ctx, old_lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
} }
@ -267,7 +266,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
chand->resolver = NULL; chand->resolver = NULL;
if (chand->lb_policy != NULL) { if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties); grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties);
grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
chand->lb_policy = NULL; chand->lb_policy = NULL;
} }

@ -166,8 +166,9 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx,
static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) { grpc_channel_element *elem) {
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
grpc_connected_subchannel_state_change_unsubscribe( /* cancel subscription */
exec_ctx, chand->connected_subchannel, &chand->connectivity_cb); grpc_connected_subchannel_notify_on_state_change(
exec_ctx, chand->connected_subchannel, NULL, &chand->connectivity_cb);
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
gpr_mu_destroy(&chand->mu_state); gpr_mu_destroy(&chand->mu_state);
} }

@ -96,11 +96,17 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp; pending_pick *pp;
gpr_mu_lock(&p->mu); gpr_mu_lock(&p->mu);
gpr_log(GPR_DEBUG, "LB_POLICY: pf_shutdown: %p", p);
p->shutdown = 1; p->shutdown = 1;
pp = p->pending_picks; pp = p->pending_picks;
p->pending_picks = NULL; p->pending_picks = NULL;
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
if (p->selected != NULL) {
grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected, NULL, &p->connectivity_changed);
} else {
grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[p->checking_connectivity], NULL, NULL, &p->connectivity_changed);
}
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
while (pp != NULL) { while (pp != NULL) {
pending_pick *next = pp->next; pending_pick *next = pp->next;
@ -139,7 +145,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
p->started_picking = 1; p->started_picking = 1;
p->checking_subchannel = 0; p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE; p->checking_connectivity = GRPC_CHANNEL_IDLE;
GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity"); GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity");
grpc_subchannel_notify_on_state_change( grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel], exec_ctx, p->subchannels[p->checking_subchannel],
&p->base.interested_parties, &p->base.interested_parties,
@ -195,7 +201,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
p->subchannels = NULL; p->subchannels = NULL;
exclude_subchannel = p->selected; exclude_subchannel = p->selected;
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels");
for (i = 0; i < num_subchannels; i++) { for (i = 0; i < num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
@ -212,9 +218,11 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_lock(&p->mu); gpr_mu_lock(&p->mu);
gpr_log(GPR_DEBUG, "LB_POLICY: pf_connectivity_changed: %p success=%d shutdown=%d", p, iomgr_success, p->shutdown);
if (p->shutdown) { if (p->shutdown) {
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
return; return;
} else if (p->selected != NULL) { } else if (p->selected != NULL) {
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
@ -228,7 +236,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
exec_ctx, p->selected, &p->checking_connectivity, exec_ctx, p->selected, &p->checking_connectivity,
&p->connectivity_changed); &p->connectivity_changed);
} else { } else {
GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
} }
} else { } else {
loop: loop:
@ -242,7 +250,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(p->selected); GPR_ASSERT(p->selected);
GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first"); GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first");
/* drop the pick list: we are connected now */ /* drop the pick list: we are connected now */
GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels"); GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
grpc_exec_ctx_enqueue(exec_ctx, grpc_exec_ctx_enqueue(exec_ctx,
grpc_closure_create(destroy_subchannels, p), 1); grpc_closure_create(destroy_subchannels, p), 1);
/* update any calls that were waiting for a pick */ /* update any calls that were waiting for a pick */
@ -300,7 +308,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp); gpr_free(pp);
} }
GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
} else { } else {
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_CHANNEL_TRANSIENT_FAILURE,

@ -33,41 +33,59 @@
#include "src/core/client_config/lb_policy.h" #include "src/core/client_config/lb_policy.h"
#define WEAK_REF_BITS 16
void grpc_lb_policy_init(grpc_lb_policy *policy, void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable) { const grpc_lb_policy_vtable *vtable) {
policy->vtable = vtable; policy->vtable = vtable;
gpr_ref_init(&policy->refs, 1); gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS);
grpc_pollset_set_init(&policy->interested_parties); grpc_pollset_set_init(&policy->interested_parties);
} }
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, #define REF_FUNC_EXTRA_ARGS , const char *file, int line, const char *reason
const char *reason) { #define REF_MUTATE_EXTRA_ARGS REF_FUNC_EXTRA_ARGS, const char *purpose
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p ref %d -> %d %s", #define REF_FUNC_PASS_ARGS(new_reason) , file, line, new_reason
policy, (int)policy->refs.count, (int)policy->refs.count + 1, reason); #define REF_MUTATE_PASS_ARGS(purpose) , file, line, reason, purpose
#else #else
void grpc_lb_policy_ref(grpc_lb_policy *policy) { #define REF_FUNC_EXTRA_ARGS
#define REF_MUTATE_EXTRA_ARGS
#define REF_FUNC_PASS_ARGS(new_reason)
#define REF_MUTATE_PASS_ARGS(x)
#endif #endif
gpr_ref(&policy->refs);
}
static gpr_atm ref_mutate(grpc_lb_policy *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) {
gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
void grpc_lb_policy_unref(grpc_lb_policy *policy, gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason);
grpc_closure_list *closure_list, const char *file,
int line, const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p unref %d -> %d %s",
policy, (int)policy->refs.count, (int)policy->refs.count - 1, reason);
#else
void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
#endif #endif
if (gpr_unref(&policy->refs)) { return old_val;
grpc_pollset_set_destroy(&policy->interested_parties); }
policy->vtable->destroy(exec_ctx, policy);
void grpc_lb_policy_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF"));
}
void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
gpr_atm old_val = ref_mutate(policy, (gpr_atm)1-(gpr_atm)(1 << WEAK_REF_BITS), 1 REF_MUTATE_PASS_ARGS("STRONG_UNREF"));
gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1);
gpr_atm check = 1 << WEAK_REF_BITS;
if ((old_val & mask) == check) {
policy->vtable->shutdown(exec_ctx, policy);
} }
grpc_lb_policy_weak_unref(exec_ctx, policy REF_FUNC_PASS_ARGS("strong-unref"));
}
void grpc_lb_policy_weak_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
ref_mutate(policy, 1, 0 REF_MUTATE_PASS_ARGS("WEAK_REF"));
} }
void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
policy->vtable->shutdown(exec_ctx, policy); gpr_atm old_val = ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF"));
if (old_val == 1) {
grpc_pollset_set_destroy(&policy->interested_parties);
policy->vtable->destroy(exec_ctx, policy);
}
} }
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,

@ -47,7 +47,7 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel,
struct grpc_lb_policy { struct grpc_lb_policy {
const grpc_lb_policy_vtable *vtable; const grpc_lb_policy_vtable *vtable;
gpr_refcount refs; gpr_atm ref_pair;
grpc_pollset_set interested_parties; grpc_pollset_set interested_parties;
}; };
@ -78,29 +78,39 @@ struct grpc_lb_policy_vtable {
grpc_closure *closure); grpc_closure *closure);
}; };
#define GRPC_LB_POLICY_REFCOUNT_DEBUG
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
#define GRPC_LB_POLICY_REF(p, r) \ #define GRPC_LB_POLICY_REF(p, r) \
grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) grpc_lb_policy_ref((p), __FILE__, __LINE__, (r))
#define GRPC_LB_POLICY_UNREF(exec_ctx, p, r) \ #define GRPC_LB_POLICY_UNREF(exec_ctx, p, r) \
grpc_lb_policy_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) grpc_lb_policy_unref((exec_ctx), (p), __FILE__, __LINE__, (r))
#define GRPC_LB_POLICY_WEAK_REF(p, r) \
grpc_lb_policy_weak_ref((p), __FILE__, __LINE__, (r))
#define GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, p, r) \
grpc_lb_policy_weak_unref((exec_ctx), (p), __FILE__, __LINE__, (r))
void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line,
const char *reason); const char *reason);
void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const char *file, int line, const char *reason); const char *file, int line, const char *reason);
void grpc_lb_policy_weak_ref(grpc_lb_policy *policy, const char *file, int line,
const char *reason);
void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const char *file, int line, const char *reason);
#else #else
#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p)) #define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p))
#define GRPC_LB_POLICY_UNREF(cl, p, r) grpc_lb_policy_unref((cl), (p)) #define GRPC_LB_POLICY_UNREF(cl, p, r) grpc_lb_policy_unref((cl), (p))
#define GRPC_LB_POLICY_WEAK_REF(p, r) grpc_lb_policy_weak_ref((p))
#define GRPC_LB_POLICY_WEAK_UNREF(cl, p, r) grpc_lb_policy_weak_unref((cl), (p))
void grpc_lb_policy_ref(grpc_lb_policy *policy); void grpc_lb_policy_ref(grpc_lb_policy *policy);
void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_weak_ref(grpc_lb_policy *policy);
void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
#endif #endif
/** called by concrete implementations to initialize the base struct */ /** called by concrete implementations to initialize the base struct */
void grpc_lb_policy_init(grpc_lb_policy *policy, void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable); const grpc_lb_policy_vtable *vtable);
/** Start shutting down (fail any pending picks) */
void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
/** Given initial metadata in \a initial_metadata, find an appropriate /** Given initial metadata in \a initial_metadata, find an appropriate
target for this rpc, and 'return' it by calling \a on_complete after setting target for this rpc, and 'return' it by calling \a on_complete after setting
\a target. \a target.

@ -62,13 +62,19 @@
typedef struct { typedef struct {
grpc_closure closure; grpc_closure closure;
union { grpc_subchannel *subchannel;
grpc_subchannel *subchannel;
grpc_connected_subchannel *connected_subchannel;
} whom;
grpc_connectivity_state connectivity_state; grpc_connectivity_state connectivity_state;
} state_watcher; } state_watcher;
typedef struct external_state_watcher {
grpc_subchannel *subchannel;
grpc_pollset_set *pollset_set;
grpc_closure *notify;
grpc_closure closure;
struct external_state_watcher *next;
struct external_state_watcher *prev;
} external_state_watcher;
struct grpc_subchannel { struct grpc_subchannel {
grpc_connector *connector; grpc_connector *connector;
@ -114,6 +120,8 @@ struct grpc_subchannel {
/** connectivity state tracking */ /** connectivity state tracking */
grpc_connectivity_state_tracker state_tracker; grpc_connectivity_state_tracker state_tracker;
external_state_watcher root_external_state_watcher;
/** next connect attempt time */ /** next connect attempt time */
gpr_timespec next_attempt; gpr_timespec next_attempt;
/** amount to backoff each failure */ /** amount to backoff each failure */
@ -201,7 +209,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(c); gpr_free(c);
} }
gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) {
gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG #ifdef GRPC_STREAM_REFCOUNT_DEBUG
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason);
@ -277,6 +285,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
&c->initial_connect_string); &c->initial_connect_string);
c->args = grpc_channel_args_copy(args->args); c->args = grpc_channel_args_copy(args->args);
c->random = random_seed(); c->random = random_seed();
c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher;
grpc_closure_init(&c->connected, subchannel_connected, c); grpc_closure_init(&c->connected, subchannel_connected, c);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel"); "subchannel");
@ -316,17 +325,16 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
return state; return state;
} }
typedef struct {
grpc_subchannel *subchannel;
grpc_pollset_set *pollset_set;
grpc_closure *notify;
grpc_closure closure;
} external_state_watcher;
static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, int success) { static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, int success) {
external_state_watcher *w = arg; external_state_watcher *w = arg;
grpc_closure *follow_up = w->notify; grpc_closure *follow_up = w->notify;
grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set); if (w->pollset_set != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set);
}
gpr_mu_lock(&w->subchannel->mu);
w->next->prev = w->prev;
w->prev->next = w->next;
gpr_mu_unlock(&w->subchannel->mu);
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher");
gpr_free(w); gpr_free(w);
follow_up->cb(exec_ctx, follow_up->cb_arg, success); follow_up->cb(exec_ctx, follow_up->cb_arg, success);
@ -338,37 +346,47 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state *state, grpc_connectivity_state *state,
grpc_closure *notify) { grpc_closure *notify) {
int do_connect = 0; int do_connect = 0;
external_state_watcher *w = gpr_malloc(sizeof(*w)); external_state_watcher *w;
w->subchannel = c;
w->pollset_set = interested_parties; if (state == NULL) {
w->notify = notify; gpr_mu_lock(&c->mu);
grpc_closure_init(&w->closure, on_external_state_watcher_done, w); for (w = c->root_external_state_watcher.next;
grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties); w != &c->root_external_state_watcher;
GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); w = w->next) {
gpr_mu_lock(&c->mu); if (w->notify == notify) {
if (grpc_connectivity_state_notify_on_state_change( grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker, NULL, &w->closure);
exec_ctx, &c->state_tracker, state, &w->closure)) { }
do_connect = 1; }
c->connecting = 1; gpr_mu_unlock(&c->mu);
/* released by connection */ } else {
GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); w = gpr_malloc(sizeof(*w));
w->subchannel = c;
w->pollset_set = interested_parties;
w->notify = notify;
grpc_closure_init(&w->closure, on_external_state_watcher_done, w);
if (interested_parties != NULL) {
grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties);
}
GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher");
gpr_mu_lock(&c->mu);
w->next = &c->root_external_state_watcher;
w->prev = w->next->prev;
w->next->prev = w->prev->next = w;
if (grpc_connectivity_state_notify_on_state_change(
exec_ctx, &c->state_tracker, state, &w->closure)) {
do_connect = 1;
c->connecting = 1;
/* released by connection */
GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
}
gpr_mu_unlock(&c->mu);
} }
gpr_mu_unlock(&c->mu);
if (do_connect) { if (do_connect) {
start_connect(exec_ctx, c); start_connect(exec_ctx, c);
} }
} }
void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c,
grpc_closure *subscribed_notify) {
gpr_mu_lock(&c->mu);
grpc_connectivity_state_change_unsubscribe(exec_ctx, &c->state_tracker,
subscribed_notify);
gpr_mu_unlock(&c->mu);
}
void grpc_connected_subchannel_process_transport_op( void grpc_connected_subchannel_process_transport_op(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
grpc_transport_op *op) { grpc_transport_op *op) {
@ -380,7 +398,7 @@ void grpc_connected_subchannel_process_transport_op(
static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
int iomgr_success) { int iomgr_success) {
state_watcher *sw = p; state_watcher *sw = p;
grpc_subchannel *c = sw->whom.subchannel; grpc_subchannel *c = sw->subchannel;
gpr_mu *mu = &c->mu; gpr_mu *mu = &c->mu;
gpr_mu_lock(mu); gpr_mu_lock(mu);
@ -423,16 +441,9 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx,
void grpc_connected_subchannel_notify_on_state_change( void grpc_connected_subchannel_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
grpc_connectivity_state *state, grpc_closure *closure) { grpc_connectivity_state *state, grpc_closure *closure) {
GPR_ASSERT(state != NULL);
connected_subchannel_state_op(exec_ctx, con, state, closure); connected_subchannel_state_op(exec_ctx, con, state, closure);
} }
void grpc_connected_subchannel_state_change_unsubscribe(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
grpc_closure *closure) {
connected_subchannel_state_op(exec_ctx, con, NULL, closure);
}
static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
size_t channel_stack_size; size_t channel_stack_size;
grpc_connected_subchannel *con; grpc_connected_subchannel *con;
@ -461,7 +472,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
/* initialize state watcher */ /* initialize state watcher */
sw_subchannel = gpr_malloc(sizeof(*sw_subchannel)); sw_subchannel = gpr_malloc(sizeof(*sw_subchannel));
sw_subchannel->whom.subchannel = c; sw_subchannel->subchannel = c;
sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; sw_subchannel->connectivity_state = GRPC_CHANNEL_READY;
grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed,
sw_subchannel); sw_subchannel);

@ -124,15 +124,6 @@ void grpc_connected_subchannel_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel,
grpc_connectivity_state *state, grpc_closure *notify); grpc_connectivity_state *state, grpc_closure *notify);
/** Remove \a subscribed_notify from the list of closures to be called on a
* state change if present. */
void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel,
grpc_closure *subscribed_notify);
void grpc_connected_subchannel_state_change_unsubscribe(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel,
grpc_closure *subscribed_notify);
/** retrieve the grpc_connected_subchannel - or NULL if called before /** retrieve the grpc_connected_subchannel - or NULL if called before
the subchannel becomes connected */ the subchannel becomes connected */
grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(

@ -168,6 +168,7 @@ void grpc_fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd);
void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd); void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd);
/* Reference counting for fds */ /* Reference counting for fds */
#define GRPC_FD_REF_COUNT_DEBUG
#ifdef GRPC_FD_REF_COUNT_DEBUG #ifdef GRPC_FD_REF_COUNT_DEBUG
void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line); void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file, int line); void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file, int line);

@ -116,6 +116,7 @@ void grpc_iomgr_shutdown(void) {
"memory leaks are likely", "memory leaks are likely",
count_objects()); count_objects());
dump_objects("LEAKED"); dump_objects("LEAKED");
abort();
} }
break; break;
} }

@ -911,15 +911,9 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
if (op->on_connectivity_state_change != NULL) { if (op->on_connectivity_state_change != NULL) {
if (op->connectivity_state != NULL) { grpc_connectivity_state_notify_on_state_change(
grpc_connectivity_state_notify_on_state_change( exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, op->on_connectivity_state_change);
op->on_connectivity_state_change);
} else {
grpc_connectivity_state_change_unsubscribe(
exec_ctx, &t->channel_callback.state_tracker,
op->on_connectivity_state_change);
}
} }
if (op->send_goaway) { if (op->send_goaway) {

@ -98,42 +98,47 @@ int grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state *current, grpc_closure *notify) { grpc_connectivity_state *current, grpc_closure *notify) {
if (grpc_connectivity_state_trace) { if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", if (current == NULL) {
tracker, tracker->name, grpc_connectivity_state_name(*current), gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p",
grpc_connectivity_state_name(tracker->current_state), notify); tracker, tracker->name, notify);
} else {
gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p",
tracker, tracker->name, grpc_connectivity_state_name(*current),
grpc_connectivity_state_name(tracker->current_state), notify);
}
} }
if (tracker->current_state != *current) { if (current == NULL) {
*current = tracker->current_state; grpc_connectivity_state_watcher *w = tracker->watchers;
grpc_exec_ctx_enqueue(exec_ctx, notify, 1); if (w != NULL && w->notify == notify) {
grpc_exec_ctx_enqueue(exec_ctx, notify, 0);
tracker->watchers = w->next;
gpr_free(w);
return 0;
}
while (w != NULL) {
grpc_connectivity_state_watcher *rm_candidate = w->next;
if (rm_candidate != NULL && rm_candidate->notify == notify) {
grpc_exec_ctx_enqueue(exec_ctx, notify, 0);
w->next = w->next->next;
gpr_free(rm_candidate);
return 0;
}
w = w->next;
}
return 0;
} else { } else {
grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); if (tracker->current_state != *current) {
w->current = current; *current = tracker->current_state;
w->notify = notify; grpc_exec_ctx_enqueue(exec_ctx, notify, 1);
w->next = tracker->watchers; } else {
tracker->watchers = w; grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
} w->current = current;
return tracker->current_state == GRPC_CHANNEL_IDLE; w->notify = notify;
} w->next = tracker->watchers;
tracker->watchers = w;
int grpc_connectivity_state_change_unsubscribe(
grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
grpc_closure *subscribed_notify) {
grpc_connectivity_state_watcher *w = tracker->watchers;
if (w != NULL && w->notify == subscribed_notify) {
tracker->watchers = w->next;
gpr_free(w);
return 1;
}
while (w != NULL) {
grpc_connectivity_state_watcher *rm_candidate = w->next;
if (rm_candidate != NULL && rm_candidate->notify == subscribed_notify) {
w->next = w->next->next;
gpr_free(rm_candidate);
return 1;
} }
w = w->next; return tracker->current_state == GRPC_CHANNEL_IDLE;
} }
return 0;
} }
void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,

@ -73,16 +73,11 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state grpc_connectivity_state_check(
grpc_connectivity_state_tracker *tracker); grpc_connectivity_state_tracker *tracker);
/** Return 1 if the channel should start connecting, 0 otherwise */ /** Return 1 if the channel should start connecting, 0 otherwise.
If current==NULL cancel notify if it is already queued (success==0 in that
case) */
int grpc_connectivity_state_notify_on_state_change( int grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state *current, grpc_closure *notify); grpc_connectivity_state *current, grpc_closure *notify);
/** Remove \a subscribed_notify from the list of closures to be called on a
* state change if present, returning 1. Otherwise, nothing is done and return
* 0. */
int grpc_connectivity_state_change_unsubscribe(
grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
grpc_closure *subscribed_notify);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */ #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */

@ -50,6 +50,8 @@ typedef struct grpc_transport grpc_transport;
for a stream. */ for a stream. */
typedef struct grpc_stream grpc_stream; typedef struct grpc_stream grpc_stream;
#define GRPC_STREAM_REFCOUNT_DEBUG
typedef struct grpc_stream_refcount { typedef struct grpc_stream_refcount {
gpr_refcount refs; gpr_refcount refs;
grpc_closure destroy; grpc_closure destroy;

Loading…
Cancel
Save