Merge pull request #12850 from markdroth/lb_pick_error_fix

Return LB picks with an error upon shutdown.
pull/9855/merge
Mark D. Roth 7 years ago committed by GitHub
commit 7e08c1a606
  1. 12
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  2. 31
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  3. 31
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc

@ -1027,15 +1027,19 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(
exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
gpr_free(pp);
pp = next;
}
while (pping != NULL) {
pending_ping *next = pping->next;
GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(
exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
gpr_free(pping);
pping = next;
}
}

@ -102,12 +102,23 @@ static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
}
static void fail_pending_picks_for_shutdown(grpc_exec_ctx *exec_ctx,
pick_first_lb_policy *p) {
pending_pick *pp;
while ((pp = p->pending_picks) != NULL) {
p->pending_picks = pp->next;
*pp->target = NULL;
GRPC_CLOSURE_SCHED(
exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
gpr_free(pp);
}
}
static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
p->shutdown = true;
pp = p->pending_picks;
p->pending_picks = NULL;
fail_pending_picks_for_shutdown(exec_ctx, p);
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"), "shutdown");
@ -120,13 +131,6 @@ static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
&p->connectivity_changed);
}
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
pp = next;
}
}
static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
@ -637,12 +641,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick first exhausted channels", &error, 1),
"no_more_channels");
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
fail_pending_picks_for_shutdown(exec_ctx, p);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
"pick_first_connectivity");
} else {

@ -315,15 +315,10 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_free(p);
}
static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "[RR %p] Shutting down Round Robin policy at %p",
(void *)pol, (void *)pol);
}
p->shutdown = true;
static void fail_pending_picks_for_shutdown(grpc_exec_ctx *exec_ctx,
round_robin_lb_policy *p) {
pending_pick *pp;
while ((pp = p->pending_picks)) {
while ((pp = p->pending_picks) != NULL) {
p->pending_picks = pp->next;
*pp->target = NULL;
GRPC_CLOSURE_SCHED(
@ -331,6 +326,16 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
gpr_free(pp);
}
}
static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "[RR %p] Shutting down Round Robin policy at %p",
(void *)pol, (void *)pol);
}
p->shutdown = true;
fail_pending_picks_for_shutdown(exec_ctx, p);
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown");
@ -621,14 +626,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
sd->user_data = NULL;
}
if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
// the policy is shutting down. Flush all the pending picks...
pending_pick *pp;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
// The policy is shutting down. Fail all of the pending picks.
fail_pending_picks_for_shutdown(exec_ctx, p);
}
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list,
"sd_shutdown+started_picking");

Loading…
Cancel
Save