clang-format

reviewable/pr4232/r6
Craig Tiller 9 years ago
parent caa4e702d0
commit 1d881fbed6
  1. 3
      src/core/channel/channel_stack.c
  2. 3
      src/core/channel/channel_stack.h
  3. 30
      src/core/channel/client_channel.c
  4. 34
      src/core/client_config/lb_policies/pick_first.c
  5. 61
      src/core/client_config/lb_policies/round_robin.c
  6. 26
      src/core/client_config/lb_policy.c
  7. 4
      src/core/client_config/lb_policy.h
  8. 60
      src/core/client_config/subchannel.c
  9. 18
      src/core/client_config/subchannel.h
  10. 8
      src/core/iomgr/pollset_set.h
  11. 10
      src/core/iomgr/pollset_set_posix.c
  12. 12
      src/core/iomgr/pollset_set_windows.c
  13. 3
      src/core/surface/channel.c
  14. 8
      src/core/transport/connectivity_state.c
  15. 2
      src/core/transport/connectivity_state.h
  16. 13
      test/core/client_config/lb_policies_test.c
  17. 8
      test/core/client_config/set_initial_connect_string_test.c
  18. 12
      test/core/end2end/fixtures/h2_uchannel.c
  19. 6
      test/core/end2end/tests/hpack_size.c

@ -106,8 +106,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
const grpc_channel_filter **filters,
size_t filter_count,
const grpc_channel_args *channel_args,
const char *name,
grpc_channel_stack *stack) {
const char *name, grpc_channel_stack *stack) {
size_t call_size =
ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element));

@ -183,8 +183,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
grpc_iomgr_cb_func destroy, void *destroy_arg,
const grpc_channel_filter **filters,
size_t filter_count, const grpc_channel_args *args,
const char *name,
grpc_channel_stack *stack);
const char *name, grpc_channel_stack *stack);
/* Destroy a channel stack */
void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
grpc_channel_stack *stack);

@ -120,7 +120,8 @@ static void on_lb_policy_state_changed_locked(
/* check if the notification is for a stale policy */
if (w->lb_policy != w->chand->lb_policy) return;
if (publish_state == GRPC_CHANNEL_FATAL_FAILURE && w->chand->resolver != NULL) {
if (publish_state == GRPC_CHANNEL_FATAL_FAILURE &&
w->chand->resolver != NULL) {
publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver);
}
@ -180,7 +181,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
chand->incoming_configuration = NULL;
if (lb_policy != NULL) {
grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties, &chand->interested_parties);
grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties,
&chand->interested_parties);
}
gpr_mu_lock(&chand->mu_config);
@ -226,7 +228,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
if (old_lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx, &old_lb_policy->interested_parties, &chand->interested_parties);
grpc_pollset_set_del_pollset_set(exec_ctx,
&old_lb_policy->interested_parties,
&chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
}
@ -265,7 +269,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
destroy_resolver = chand->resolver;
chand->resolver = NULL;
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties);
grpc_pollset_set_del_pollset_set(exec_ctx,
&chand->lb_policy->interested_parties,
&chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
chand->lb_policy = NULL;
}
@ -401,7 +407,9 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
}
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties);
grpc_pollset_set_del_pollset_set(exec_ctx,
&chand->lb_policy->interested_parties,
&chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
@ -472,11 +480,14 @@ typedef struct {
grpc_closure my_closure;
} external_connectivity_watcher;
static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
external_connectivity_watcher *w = arg;
grpc_closure *follow_up = w->on_complete;
grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, w->pollset);
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher");
grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties,
w->pollset);
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
"external_connectivity_watcher");
gpr_free(w);
follow_up->cb(exec_ctx, follow_up->cb_arg, iomgr_success);
}
@ -491,7 +502,8 @@ void grpc_client_channel_watch_connectivity_state(
w->on_complete = on_complete;
grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset);
grpc_closure_init(&w->my_closure, on_external_watch_complete, w);
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher");
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
"external_connectivity_watcher");
gpr_mu_lock(&chand->mu_config);
grpc_connectivity_state_notify_on_state_change(
exec_ctx, &chand->state_tracker, state, &w->my_closure);

@ -102,15 +102,19 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
if (p->selected != NULL) {
grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected, NULL, &p->connectivity_changed);
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, p->selected, NULL, &p->connectivity_changed);
} else {
grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, &p->connectivity_changed);
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
&p->connectivity_changed);
}
gpr_mu_unlock(&p->mu);
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
pp = next;
@ -127,7 +131,8 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
gpr_free(pp);
@ -147,8 +152,8 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
&p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
&p->base.interested_parties, &p->checking_connectivity,
&p->connectivity_changed);
}
void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
@ -174,7 +179,8 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset);
grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties,
pollset);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollset = pollset;
@ -254,7 +260,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = p->selected;
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
}
@ -273,8 +280,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
&p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
&p->base.interested_parties, &p->checking_connectivity,
&p->connectivity_changed);
} else {
goto loop;
}
@ -286,8 +293,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"connecting_changed");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
&p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
&p->base.interested_parties, &p->checking_connectivity,
&p->connectivity_changed);
break;
case GRPC_CHANNEL_FATAL_FAILURE:
p->num_subchannels--;
@ -305,7 +312,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
}
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
"pick_first_connectivity");
} else {
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,

@ -244,9 +244,7 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
for (i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel,
NULL,
NULL,
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL,
&sd->connectivity_changed_closure);
}
gpr_mu_unlock(&p->mu);
@ -262,7 +260,8 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
gpr_free(pp);
@ -279,15 +278,15 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
size_t i;
p->started_picking = 1;
gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p, p->num_subchannels);
gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p,
p->num_subchannels);
for (i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
sd->connectivity_state = GRPC_CHANNEL_IDLE;
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel,
&p->base.interested_parties,
&sd->connectivity_state,
&sd->connectivity_changed_closure);
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, &p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity");
}
}
@ -323,7 +322,8 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset);
grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties,
pollset);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollset = pollset;
@ -355,8 +355,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_CHANNEL_READY, "connecting_ready");
/* add the newly connected subchannel to the list of connected ones.
* Note that it goes to the "end of the line". */
sd->ready_list_node =
add_connected_sc_locked(p, sd->subchannel);
sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel);
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
@ -375,34 +374,29 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
}
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
exec_ctx,
sd->subchannel,
&p->base.interested_parties,
&sd->connectivity_state,
&sd->connectivity_changed_closure);
exec_ctx, sd->subchannel, &p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
break;
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
sd->connectivity_state, "connecting_changed");
sd->connectivity_state,
"connecting_changed");
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel,
&p->base.interested_parties,
&sd->connectivity_state,
&sd->connectivity_changed_closure);
exec_ctx, sd->subchannel, &p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
/* renew state notification */
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel,
&p->base.interested_parties,
&sd->connectivity_state,
&sd->connectivity_changed_closure);
exec_ctx, sd->subchannel, &p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
/* remove from ready list if still present */
if (sd->ready_list_node != NULL) {
@ -415,16 +409,14 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
break;
case GRPC_CHANNEL_FATAL_FAILURE:
if (sd->ready_list_node != NULL) {
remove_disconnected_sc_locked(
p, sd->ready_list_node);
remove_disconnected_sc_locked(p, sd->ready_list_node);
sd->ready_list_node = NULL;
}
p->num_subchannels--;
GPR_SWAP(subchannel_data *, p->subchannels[sd->index],
p->subchannels[p->num_subchannels]);
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel,
"round_robin");
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin");
p->subchannels[sd->index]->index = sd->index;
gpr_free(sd);
@ -491,8 +483,7 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory,
memset(p, 0, sizeof(*p));
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable);
p->num_subchannels = args->num_subchannels;
p->subchannels =
gpr_malloc(sizeof(*p->subchannels) * p->num_subchannels);
p->subchannels = gpr_malloc(sizeof(*p->subchannels) * p->num_subchannels);
memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_subchannels);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
@ -505,8 +496,8 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory,
sd->policy = p;
sd->index = i;
sd->subchannel = args->subchannels[i];
grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed,
sd);
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed, sd);
}
/* The (dummy node) root of the ready list */

@ -54,10 +54,14 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
#define REF_MUTATE_PASS_ARGS(x)
#endif
static gpr_atm ref_mutate(grpc_lb_policy *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) {
gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
static gpr_atm ref_mutate(grpc_lb_policy *c, gpr_atm delta,
int barrier REF_MUTATE_EXTRA_ARGS) {
gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta)
: gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"LB_POLICY: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val,
old_val + delta, reason);
#endif
return old_val;
}
@ -66,22 +70,28 @@ void grpc_lb_policy_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF"));
}
void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
gpr_atm old_val = ref_mutate(policy, (gpr_atm)1-(gpr_atm)(1 << WEAK_REF_BITS), 1 REF_MUTATE_PASS_ARGS("STRONG_UNREF"));
void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
gpr_atm old_val =
ref_mutate(policy, (gpr_atm)1 - (gpr_atm)(1 << WEAK_REF_BITS),
1 REF_MUTATE_PASS_ARGS("STRONG_UNREF"));
gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1);
gpr_atm check = 1 << WEAK_REF_BITS;
if ((old_val & mask) == check) {
policy->vtable->shutdown(exec_ctx, policy);
}
grpc_lb_policy_weak_unref(exec_ctx, policy REF_FUNC_PASS_ARGS("strong-unref"));
grpc_lb_policy_weak_unref(exec_ctx,
policy REF_FUNC_PASS_ARGS("strong-unref"));
}
void grpc_lb_policy_weak_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
ref_mutate(policy, 1, 0 REF_MUTATE_PASS_ARGS("WEAK_REF"));
}
void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
gpr_atm old_val = ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF"));
void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
gpr_atm old_val =
ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF"));
if (old_val == 1) {
grpc_pollset_set_destroy(&policy->interested_parties);
policy->vtable->destroy(exec_ctx, policy);

@ -93,9 +93,9 @@ void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line,
void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const char *file, int line, const char *reason);
void grpc_lb_policy_weak_ref(grpc_lb_policy *policy, const char *file, int line,
const char *reason);
const char *reason);
void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const char *file, int line, const char *reason);
const char *file, int line, const char *reason);
#else
#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p))
#define GRPC_LB_POLICY_UNREF(cl, p, r) grpc_lb_policy_unref((cl), (p))

@ -78,9 +78,9 @@ typedef struct external_state_watcher {
struct grpc_subchannel {
grpc_connector *connector;
/** refcount
- lower INTERNAL_REF_BITS bits are for internal references:
these do not keep the subchannel open.
/** refcount
- lower INTERNAL_REF_BITS bits are for internal references:
these do not keep the subchannel open.
- upper remaining bits are for public references: these do
keep the subchannel open */
gpr_atm ref_pair;
@ -155,7 +155,8 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
#define UNREF_LOG(name, p) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \
(name), (p), (p)->refs.count, (p)->refs.count - 1, reason)
#define REF_MUTATE_EXTRA_ARGS GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose
#define REF_MUTATE_EXTRA_ARGS \
GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose
#define REF_MUTATE_PURPOSE(x) , file, line, reason, x
#else
#define REF_REASON ""
@ -209,21 +210,27 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(c);
}
static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) {
gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta,
int barrier REF_MUTATE_EXTRA_ARGS) {
gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta)
: gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val,
old_val + delta, reason);
#endif
return old_val;
}
void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs;
old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), 0 REF_MUTATE_PURPOSE("STRONG_REF"));
old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS),
0 REF_MUTATE_PURPOSE("STRONG_REF"));
GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0);
}
void grpc_subchannel_weak_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
void grpc_subchannel_weak_ref(grpc_subchannel *c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs;
old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF"));
GPR_ASSERT(old_refs != 0);
@ -246,7 +253,8 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs;
old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), 1 REF_MUTATE_PURPOSE("STRONG_UNREF"));
old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS),
1 REF_MUTATE_PURPOSE("STRONG_UNREF"));
if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) {
disconnect(exec_ctx, c);
}
@ -254,7 +262,8 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
}
void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
grpc_subchannel *c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs;
old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF"));
if (old_refs == 0) {
@ -286,7 +295,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
&c->initial_connect_string);
c->args = grpc_channel_args_copy(args->args);
c->random = random_seed();
c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher;
c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
&c->root_external_state_watcher;
grpc_closure_init(&c->connected, subchannel_connected, c);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
@ -326,11 +336,13 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
return state;
}
static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, int success) {
static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
int success) {
external_state_watcher *w = arg;
grpc_closure *follow_up = w->notify;
if (w->pollset_set != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set);
grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set,
w->pollset_set);
}
gpr_mu_lock(&w->subchannel->mu);
w->next->prev = w->prev;
@ -341,21 +353,20 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, i
follow_up->cb(exec_ctx, follow_up->cb_arg, success);
}
void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c,
grpc_pollset_set *interested_parties,
grpc_connectivity_state *state,
grpc_closure *notify) {
void grpc_subchannel_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
grpc_closure *notify) {
int do_connect = 0;
external_state_watcher *w;
if (state == NULL) {
gpr_mu_lock(&c->mu);
for (w = c->root_external_state_watcher.next;
w != &c->root_external_state_watcher;
w = w->next) {
for (w = c->root_external_state_watcher.next;
w != &c->root_external_state_watcher; w = w->next) {
if (w->notify == notify) {
grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker, NULL, &w->closure);
grpc_connectivity_state_notify_on_state_change(
exec_ctx, &c->state_tracker, NULL, &w->closure);
}
}
gpr_mu_unlock(&c->mu);
@ -366,7 +377,8 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
w->notify = notify;
grpc_closure_init(&w->closure, on_external_state_watcher_done, w);
if (interested_parties != NULL) {
grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties);
grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set,
interested_parties);
}
GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher");
gpr_mu_lock(&c->mu);

@ -68,7 +68,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p))
#define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p))
#define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p))
#define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) grpc_subchannel_weak_unref((cl), (p))
#define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) \
grpc_subchannel_weak_unref((cl), (p))
#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p))
#define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \
grpc_connected_subchannel_unref((cl), (p))
@ -84,10 +85,10 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_weak_ref(grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_connected_subchannel_ref(grpc_connected_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
@ -115,11 +116,10 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the channel */
void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel,
grpc_pollset_set *interested_parties,
grpc_connectivity_state *state,
grpc_closure *notify);
void grpc_subchannel_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_subchannel *channel,
grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
grpc_closure *notify);
void grpc_connected_subchannel_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel,
grpc_connectivity_state *state, grpc_closure *notify);

@ -58,10 +58,10 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set,
grpc_pollset *pollset);
void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag,
grpc_pollset_set *item);
grpc_pollset_set *bag,
grpc_pollset_set *item);
void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag,
grpc_pollset_set *item);
grpc_pollset_set *bag,
grpc_pollset_set *item);
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */

@ -107,8 +107,9 @@ void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&bag->mu);
if (bag->pollset_set_count == bag->pollset_set_capacity) {
bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
bag->pollset_sets = gpr_realloc(bag->pollset_sets,
bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
bag->pollset_sets =
gpr_realloc(bag->pollset_sets,
bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
}
bag->pollset_sets[bag->pollset_set_count++] = item;
for (i = 0, j = 0; i < bag->fd_count; i++) {
@ -131,9 +132,8 @@ void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
for (i = 0; i < bag->pollset_set_count; i++) {
if (bag->pollset_sets[i] == item) {
bag->pollset_set_count--;
GPR_SWAP(grpc_pollset_set *,
bag->pollset_sets[i],
bag->pollset_sets[bag->pollset_set_count]);
GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
bag->pollset_sets[bag->pollset_set_count]);
break;
}
}

@ -49,12 +49,12 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx,
grpc_pollset_set* pollset_set,
grpc_pollset* pollset) {}
void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag,
grpc_pollset_set *item) {}
void grpc_pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx,
grpc_pollset_set* bag,
grpc_pollset_set* item) {}
void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag,
grpc_pollset_set *item) {}
void grpc_pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx,
grpc_pollset_set* bag,
grpc_pollset_set* item) {}
#endif /* GPR_WINSOCK_SOCKET */

@ -152,7 +152,8 @@ grpc_channel *grpc_channel_create_from_filters(
}
grpc_channel_stack_init(exec_ctx, 1, destroy_channel, channel, filters,
num_filters, args, is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL",
num_filters, args,
is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL",
CHANNEL_STACK_FROM_CHANNEL(channel));
return channel;

@ -99,11 +99,11 @@ int grpc_connectivity_state_notify_on_state_change(
grpc_connectivity_state *current, grpc_closure *notify) {
if (grpc_connectivity_state_trace) {
if (current == NULL) {
gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p",
tracker, tracker->name, notify);
gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", tracker,
tracker->name, notify);
} else {
gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p",
tracker, tracker->name, grpc_connectivity_state_name(*current),
gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker,
tracker->name, grpc_connectivity_state_name(*current),
grpc_connectivity_state_name(tracker->current_state), notify);
}
}

@ -74,7 +74,7 @@ grpc_connectivity_state grpc_connectivity_state_check(
grpc_connectivity_state_tracker *tracker);
/** Return 1 if the channel should start connecting, 0 otherwise.
If current==NULL cancel notify if it is already queued (success==0 in that
If current==NULL cancel notify if it is already queued (success==0 in that
case) */
int grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,

@ -135,9 +135,8 @@ static void kill_server(const servers_fixture *f, size_t i) {
gpr_log(GPR_INFO, "KILLING SERVER %d", i);
GPR_ASSERT(f->servers[i] != NULL);
grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000));
GPR_ASSERT(
grpc_completion_queue_pluck(f->cq, tag(10000), n_millis_time(5000), NULL)
.type == GRPC_OP_COMPLETE);
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000), n_millis_time(5000),
NULL).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->servers[i]);
f->servers[i] = NULL;
}
@ -203,8 +202,8 @@ static void teardown_servers(servers_fixture *f) {
if (f->servers[i] == NULL) continue;
grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000),
n_millis_time(5000), NULL)
.type == GRPC_OP_COMPLETE);
n_millis_time(5000),
NULL).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->servers[i]);
}
grpc_completion_queue_shutdown(f->cq);
@ -304,8 +303,8 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
s_idx = -1;
while ((ev = grpc_completion_queue_next(
f->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), NULL))
.type != GRPC_QUEUE_TIMEOUT) {
f->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), NULL)).type !=
GRPC_QUEUE_TIMEOUT) {
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
read_tag = ((int)(gpr_intptr)ev.tag);
gpr_log(GPR_DEBUG, "EVENT: success:%d, type:%d, tag:%d iter:%d",

@ -66,15 +66,15 @@ static grpc_closure on_read;
static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
GPR_ASSERT(success);
gpr_slice_buffer_move_into(
&state.temp_incoming_buffer, &state.incoming_buffer);
gpr_slice_buffer_move_into(&state.temp_incoming_buffer,
&state.incoming_buffer);
if (state.incoming_buffer.length > strlen(magic_connect_string)) {
state.done = 1;
grpc_endpoint_shutdown(exec_ctx, state.tcp);
grpc_endpoint_destroy(exec_ctx, state.tcp);
} else {
grpc_endpoint_read(
exec_ctx, state.tcp, &state.temp_incoming_buffer, &on_read);
grpc_endpoint_read(exec_ctx, state.tcp, &state.temp_incoming_buffer,
&on_read);
}
}

@ -239,7 +239,8 @@ grpc_pollset_set g_interested_parties;
static void state_changed(grpc_exec_ctx *exec_ctx, void *arg, int success) {
if (g_state != GRPC_CHANNEL_READY) {
grpc_subchannel_notify_on_state_change(
exec_ctx, arg, &g_interested_parties, &g_state, grpc_closure_create(state_changed, arg));
exec_ctx, arg, &g_interested_parties, &g_state,
grpc_closure_create(state_changed, arg));
}
}
@ -253,7 +254,8 @@ static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) {
grpc_pollset_init(&pollset);
grpc_pollset_set_init(&g_interested_parties);
grpc_pollset_set_add_pollset(&exec_ctx, &g_interested_parties, &pollset);
grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_interested_parties, &g_state,
grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_interested_parties,
&g_state,
grpc_closure_create(state_changed, c));
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(GRPC_POLLSET_MU(&pollset));
@ -330,9 +332,9 @@ static void chttp2_tear_down_micro_fullstack(grpc_end2end_test_fixture *f) {
/* All test configurations */
static grpc_end2end_test_config configs[] = {
{"chttp2/micro_fullstack", 0,
chttp2_create_fixture_micro_fullstack, chttp2_init_client_micro_fullstack,
chttp2_init_server_micro_fullstack, chttp2_tear_down_micro_fullstack},
{"chttp2/micro_fullstack", 0, chttp2_create_fixture_micro_fullstack,
chttp2_init_client_micro_fullstack, chttp2_init_server_micro_fullstack,
chttp2_tear_down_micro_fullstack},
};
int main(int argc, char **argv) {

@ -262,9 +262,9 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(
f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL)
.type == GRPC_OP_COMPLETE);
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000),
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5),
NULL).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}

Loading…
Cancel
Save