Removed call to grpc_subchannel_check_connectivity and loop in pf_conn cb

reviewable/pr13343/r2
David Garcia Quintas 7 years ago
parent 94eae043c7
commit b07d7a8051
  1. 217
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc

@ -445,131 +445,112 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
}
bool updated_error = false;
while (true) {
switch (sd->curr_connectivity_state) {
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list to
// p->subchannel_list.
if (sd->subchannel_list == p->latest_pending_subchannel_list) {
GPR_ASSERT(p->subchannel_list != NULL);
grpc_lb_subchannel_list_shutdown_and_unref(
exec_ctx, p->subchannel_list, "finish_update");
p->subchannel_list = p->latest_pending_subchannel_list;
p->latest_pending_subchannel_list = NULL;
}
// Cases 1 and 2.
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
"connecting_ready");
sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(sd->subchannel),
"connected");
p->selected = sd;
switch (sd->curr_connectivity_state) {
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list to
// p->subchannel_list.
if (sd->subchannel_list == p->latest_pending_subchannel_list) {
GPR_ASSERT(p->subchannel_list != NULL);
grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
"finish_update");
p->subchannel_list = p->latest_pending_subchannel_list;
p->latest_pending_subchannel_list = NULL;
}
// Cases 1 and 2.
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
"connecting_ready");
sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(sd->subchannel),
"connected");
p->selected = sd;
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p,
(void*)sd->subchannel);
}
// Drop all other subchannels, since we are now connected.
destroy_unselected_subchannels_locked(exec_ctx, p);
// Update any calls that were waiting for a pick.
pending_pick* pp;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
p->selected->connected_subchannel, "picked");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p,
(void*)sd->subchannel);
}
// Drop all other subchannels, since we are now connected.
destroy_unselected_subchannels_locked(exec_ctx, p);
// Update any calls that were waiting for a pick.
pending_pick* pp;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
p->selected->connected_subchannel, "picked");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p",
(void*)p->selected);
}
GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p",
(void*)p->selected);
}
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
if (updated_error) GRPC_ERROR_UNREF(error);
return;
GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
do {
sd->subchannel_list->checking_subchannel =
(sd->subchannel_list->checking_subchannel + 1) %
sd->subchannel_list->num_subchannels;
sd = &sd->subchannel_list
->subchannels[sd->subchannel_list->checking_subchannel];
} while (sd->subchannel == NULL);
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
if (sd->subchannel_list->checking_subchannel == 0 &&
sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
}
sd->curr_connectivity_state =
grpc_subchannel_check_connectivity(sd->subchannel, &error);
if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GRPC_ERROR_UNREF(error);
// Reuses the connectivity refs from the previous watch.
grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
return;
} else {
updated_error = true;
}
break; // Go back to top of loop.
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
if (updated_error) GRPC_ERROR_UNREF(error);
break;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
do {
sd->subchannel_list->checking_subchannel =
(sd->subchannel_list->checking_subchannel + 1) %
sd->subchannel_list->num_subchannels;
sd = &sd->subchannel_list
->subchannels[sd->subchannel_list->checking_subchannel];
} while (sd->subchannel == NULL);
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
if (sd->subchannel_list->checking_subchannel == 0 &&
sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
}
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE: {
// Only update connectivity state in case 1.
if (sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_REF(error), "connecting_changed");
}
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
// Reuses the connectivity refs from the previous watch.
grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
break;
}
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE: {
// Only update connectivity state in case 1.
if (sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_REF(error), "connecting_changed");
}
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
if (updated_error) GRPC_ERROR_UNREF(error);
break;
}
case GRPC_CHANNEL_SHUTDOWN: {
grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
"pf_candidate_shutdown");
// Advance to next subchannel and check its state.
grpc_lb_subchannel_data* original_sd = sd;
do {
sd->subchannel_list->checking_subchannel =
(sd->subchannel_list->checking_subchannel + 1) %
sd->subchannel_list->num_subchannels;
sd = &sd->subchannel_list
->subchannels[sd->subchannel_list->checking_subchannel];
} while (sd->subchannel == NULL && sd != original_sd);
if (sd == original_sd) {
grpc_lb_subchannel_list_unref_for_connectivity_watch(
exec_ctx, sd->subchannel_list, "pf_candidate_shutdown");
shutdown_locked(exec_ctx, p,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick first exhausted channels", &error, 1));
if (updated_error) GRPC_ERROR_UNREF(error);
return;
break;
}
case GRPC_CHANNEL_SHUTDOWN: {
grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
"pf_candidate_shutdown");
// Advance to next subchannel and check its state.
grpc_lb_subchannel_data* original_sd = sd;
do {
sd->subchannel_list->checking_subchannel =
(sd->subchannel_list->checking_subchannel + 1) %
sd->subchannel_list->num_subchannels;
sd = &sd->subchannel_list
->subchannels[sd->subchannel_list->checking_subchannel];
} while (sd->subchannel == NULL && sd != original_sd);
if (sd == original_sd) {
grpc_lb_subchannel_list_unref_for_connectivity_watch(
exec_ctx, sd->subchannel_list, "pf_candidate_shutdown");
shutdown_locked(exec_ctx, p,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick first exhausted channels", &error, 1));
if (updated_error) GRPC_ERROR_UNREF(error);
return;
}
if (sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "subchannel_failed");
}
sd->curr_connectivity_state =
grpc_subchannel_check_connectivity(sd->subchannel, &error);
if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GRPC_ERROR_UNREF(error);
// Reuses the connectivity refs from the previous watch.
grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
return;
} else {
updated_error = true;
}
// For any other state, go back to top of loop.
// We will reuse the connectivity refs from the previous watch.
if (sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "subchannel_failed");
}
// Reuses the connectivity refs from the previous watch.
grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
break;
}
}
}

Loading…
Cancel
Save