Fixes to pick first

pull/11604/head
David Garcia Quintas 8 years ago
parent b8be14309d
commit af084dc37c
  1. 45
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
  2. 30
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
  3. 3
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c

@ -95,6 +95,9 @@ static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_free(p->subchannels); gpr_free(p->subchannels);
gpr_free(p->new_subchannels); gpr_free(p->new_subchannels);
gpr_free(p); gpr_free(p);
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p destroyed.", (void *)p);
}
} }
static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
@ -268,11 +271,20 @@ static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx,
pick_first_lb_policy *p) { pick_first_lb_policy *p) {
if (p->num_subchannels > 0) { if (p->num_subchannels > 0) {
GPR_ASSERT(p->selected == NULL); GPR_ASSERT(p->selected == NULL);
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p unsubscribing from subchannel %p",
(void *)p, (void *)p->subchannels[p->checking_subchannel]);
}
grpc_subchannel_notify_on_state_change( grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
&p->connectivity_changed); &p->connectivity_changed);
p->updating_subchannels = true; p->updating_subchannels = true;
} else if (p->selected != NULL) { } else if (p->selected != NULL) {
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG,
"Pick First %p unsubscribing from selected subchannel %p",
(void *)p, (void *)p->selected);
}
grpc_connected_subchannel_notify_on_state_change( grpc_connected_subchannel_notify_on_state_change(
exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
p->updating_selected = true; p->updating_selected = true;
@ -451,12 +463,25 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_subchannel *selected_subchannel; grpc_subchannel *selected_subchannel;
pending_pick *pp; pending_pick *pp;
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(
GPR_DEBUG,
"Pick First %p connectivity changed. Updating selected: %d; Updating "
"subchannels: %d; Checking %lu index (%lu total); State: %d; ",
(void *)p, p->updating_selected, p->updating_subchannels,
(unsigned long)p->checking_subchannel,
(unsigned long)p->num_subchannels, p->checking_connectivity);
}
bool restart = false; bool restart = false;
if (p->updating_selected && error == GRPC_ERROR_CANCELLED) { if (p->updating_selected && error != GRPC_ERROR_NONE) {
/* Captured the unsubscription for p->selected */ /* Captured the unsubscription for p->selected */
GPR_ASSERT(p->selected != NULL); GPR_ASSERT(p->selected != NULL);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
"pf_update_connectivity"); "pf_update_connectivity");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p unreffing selected subchannel %p",
(void *)p, (void *)p->selected);
}
p->updating_selected = false; p->updating_selected = false;
if (p->num_new_subchannels == 0) { if (p->num_new_subchannels == 0) {
p->selected = NULL; p->selected = NULL;
@ -464,12 +489,16 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
} }
restart = true; restart = true;
} }
if (p->updating_subchannels && error == GRPC_ERROR_CANCELLED) { if (p->updating_subchannels && error != GRPC_ERROR_NONE) {
/* Captured the unsubscription for the checking subchannel */ /* Captured the unsubscription for the checking subchannel */
GPR_ASSERT(p->selected == NULL); GPR_ASSERT(p->selected == NULL);
for (size_t i = 0; i < p->num_subchannels; i++) { for (size_t i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i],
"pf_update_connectivity"); "pf_update_connectivity");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p unreffing subchannel %p", (void *)p,
(void *)p->subchannels[i]);
}
} }
gpr_free(p->subchannels); gpr_free(p->subchannels);
p->subchannels = NULL; p->subchannels = NULL;
@ -481,14 +510,12 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (restart) { if (restart) {
p->selected = NULL; p->selected = NULL;
p->selected_key = NULL; p->selected_key = NULL;
GPR_ASSERT(p->new_subchannels != NULL); GPR_ASSERT(p->new_subchannels != NULL);
GPR_ASSERT(p->num_new_subchannels > 0); GPR_ASSERT(p->num_new_subchannels > 0);
p->num_subchannels = p->num_new_subchannels; p->num_subchannels = p->num_new_subchannels;
p->subchannels = p->new_subchannels; p->subchannels = p->new_subchannels;
p->num_new_subchannels = 0; p->num_new_subchannels = 0;
p->new_subchannels = NULL; p->new_subchannels = NULL;
if (p->started_picking) { if (p->started_picking) {
/* If we were picking, continue to do so over the new subchannels, /* If we were picking, continue to do so over the new subchannels,
* starting from the 0th index. */ * starting from the 0th index. */
@ -542,7 +569,9 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
"picked_first"); "picked_first");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Selected subchannel %p", (void *)p->selected); gpr_log(GPR_INFO,
"Pick First %p selected subchannel %p (connected %p)",
(void *)p, (void *)selected_subchannel, (void *)p->selected);
} }
p->selected_key = grpc_subchannel_get_key(selected_subchannel); p->selected_key = grpc_subchannel_get_key(selected_subchannel);
/* drop the pick list: we are connected now */ /* drop the pick list: we are connected now */
@ -568,7 +597,8 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
p->checking_subchannel = p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels; (p->checking_subchannel + 1) % p->num_subchannels;
if (p->checking_subchannel == 0) { if (p->checking_subchannel == 0) {
/* only trigger transient failure when we've tried all alternatives */ /* only trigger transient failure when we've tried all alternatives
*/
grpc_connectivity_state_set( grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure"); GRPC_ERROR_REF(error), "connecting_transient_failure");
@ -652,6 +682,9 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args *args) { grpc_lb_policy_args *args) {
GPR_ASSERT(args->client_channel_factory != NULL); GPR_ASSERT(args->client_channel_factory != NULL);
pick_first_lb_policy *p = gpr_zalloc(sizeof(*p)); pick_first_lb_policy *p = gpr_zalloc(sizeof(*p));
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p created.", (void *)p);
}
pf_update_locked(exec_ctx, &p->base, args); pf_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p, GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p,

@ -478,29 +478,30 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
if (subchannel_list->num_ready > 0) { /* 1) READY */ if (subchannel_list->num_ready > 0) { /* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready"); GRPC_ERROR_NONE, "rr_ready");
new_state = GRPC_CHANNEL_READY; new_state = GRPC_CHANNEL_READY;
} else if (sd->curr_connectivity_state == } else if (sd->curr_connectivity_state ==
GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */ GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
"rr_connecting"); "rr_connecting");
new_state = GRPC_CHANNEL_CONNECTING; new_state = GRPC_CHANNEL_CONNECTING;
} else if (p->subchannel_list->num_shutdown == } else if (p->subchannel_list->num_shutdown ==
p->subchannel_list->num_subchannels) { /* 3) SHUTDOWN */ p->subchannel_list->num_subchannels) { /* 3) SHUTDOWN */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
new_state = GRPC_CHANNEL_SHUTDOWN; "rr_shutdown");
new_state = GRPC_CHANNEL_SHUTDOWN;
} else if (subchannel_list->num_transient_failures == } else if (subchannel_list->num_transient_failures ==
p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */ p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), GRPC_CHANNEL_TRANSIENT_FAILURE,
"rr_transient_failure"); GRPC_ERROR_REF(error), "rr_transient_failure");
new_state = GRPC_CHANNEL_TRANSIENT_FAILURE; new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
} else if (subchannel_list->num_idle == } else if (subchannel_list->num_idle ==
p->subchannel_list->num_subchannels) { /* 5) IDLE */ p->subchannel_list->num_subchannels) { /* 5) IDLE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE, "rr_idle"); GRPC_ERROR_NONE, "rr_idle");
new_state = GRPC_CHANNEL_IDLE; new_state = GRPC_CHANNEL_IDLE;
} }
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
return new_state; return new_state;
@ -581,14 +582,14 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(!sd->subchannel_list->shutting_down); GPR_ASSERT(!sd->subchannel_list->shutting_down);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
const unsigned long num_subchannels = const unsigned long num_subchannels =
p->subchannel_list != NULL ? p->subchannel_list->num_subchannels p->subchannel_list != NULL
: 0; ? (unsigned long)p->subchannel_list->num_subchannels
: 0;
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"[RR %p] phasing out subchannel list %p (size %lu) in favor " "[RR %p] phasing out subchannel list %p (size %lu) in favor "
"of %p (size %lu)", "of %p (size %lu)",
(void *)p, (void *)p->subchannel_list, num_subchannels, (void *)p, (void *)p->subchannel_list, num_subchannels,
(void *)sd->subchannel_list, (void *)sd->subchannel_list, num_subchannels);
(unsigned long)sd->subchannel_list->num_subchannels);
} }
if (p->subchannel_list != NULL) { if (p->subchannel_list != NULL) {
// dispose of the current subchannel_list // dispose of the current subchannel_list
@ -666,9 +667,8 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel_ping(exec_ctx, target, closure); grpc_connected_subchannel_ping(exec_ctx, target, closure);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked"); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked");
} else { } else {
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
exec_ctx, closure, "Round Robin not connected"));
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Round Robin not connected"));
} }
} }

@ -101,6 +101,9 @@ static void fake_resolver_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx,
static void fake_resolver_channel_saw_error_locked(grpc_exec_ctx* exec_ctx, static void fake_resolver_channel_saw_error_locked(grpc_exec_ctx* exec_ctx,
grpc_resolver* resolver) { grpc_resolver* resolver) {
fake_resolver* r = (fake_resolver*)resolver; fake_resolver* r = (fake_resolver*)resolver;
gpr_log(
GPR_INFO,
"FOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO");
if (r->next_results == NULL && r->results_upon_error != NULL) { if (r->next_results == NULL && r->results_upon_error != NULL) {
// Pretend we re-resolved. // Pretend we re-resolved.
r->next_results = grpc_channel_args_copy(r->results_upon_error); r->next_results = grpc_channel_args_copy(r->results_upon_error);

Loading…
Cancel
Save