|
|
|
@ -238,14 +238,23 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_connectivity_state state, |
|
|
|
|
grpc_error *error, |
|
|
|
|
const char *reason) { |
|
|
|
|
if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE || |
|
|
|
|
state == GRPC_CHANNEL_SHUTDOWN) && |
|
|
|
|
chand->lb_policy != NULL) { |
|
|
|
|
/* cancel picks with wait_for_ready=false */ |
|
|
|
|
grpc_lb_policy_cancel_picks_locked( |
|
|
|
|
exec_ctx, chand->lb_policy, |
|
|
|
|
/* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, |
|
|
|
|
/* check= */ 0, GRPC_ERROR_REF(error)); |
|
|
|
|
/* TODO: Improve failure handling:
|
|
|
|
|
* - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE. |
|
|
|
|
* - Hand over pending picks from old policies during the switch that happens |
|
|
|
|
* when resolver provides an update. */ |
|
|
|
|
if (chand->lb_policy != NULL) { |
|
|
|
|
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
|
|
|
|
/* cancel picks with wait_for_ready=false */ |
|
|
|
|
grpc_lb_policy_cancel_picks_locked( |
|
|
|
|
exec_ctx, chand->lb_policy, |
|
|
|
|
/* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, |
|
|
|
|
/* check= */ 0, GRPC_ERROR_REF(error)); |
|
|
|
|
} else if (state == GRPC_CHANNEL_SHUTDOWN) { |
|
|
|
|
/* cancel all picks */ |
|
|
|
|
grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy, |
|
|
|
|
/* mask= */ 0, /* check= */ 0, |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error, |
|
|
|
|
reason); |
|
|
|
@ -348,6 +357,33 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Wrap a closure associated with \a lb_policy. The associated callback (\a
|
|
|
|
|
// wrapped_on_pick_closure_cb) is responsible for unref'ing \a lb_policy after
|
|
|
|
|
// scheduling \a wrapped_closure.
|
|
|
|
|
typedef struct wrapped_on_pick_closure_arg { |
|
|
|
|
/* the closure instance using this struct as argument */ |
|
|
|
|
grpc_closure wrapper_closure; |
|
|
|
|
|
|
|
|
|
/* the original closure. Usually a on_complete/notify cb for pick() and ping()
|
|
|
|
|
* calls against the internal RR instance, respectively. */ |
|
|
|
|
grpc_closure *wrapped_closure; |
|
|
|
|
|
|
|
|
|
/* The policy instance related to the closure */ |
|
|
|
|
grpc_lb_policy *lb_policy; |
|
|
|
|
} wrapped_on_pick_closure_arg; |
|
|
|
|
|
|
|
|
|
// Invoke \a arg->wrapped_closure, unref \a arg->lb_policy and free \a arg.
|
|
|
|
|
static void wrapped_on_pick_closure_cb(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
wrapped_on_pick_closure_arg *wc_arg = arg; |
|
|
|
|
GPR_ASSERT(wc_arg != NULL); |
|
|
|
|
GPR_ASSERT(wc_arg->wrapped_closure != NULL); |
|
|
|
|
GPR_ASSERT(wc_arg->lb_policy != NULL); |
|
|
|
|
grpc_closure_run(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); |
|
|
|
|
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->lb_policy, "pick_subchannel_wrapping"); |
|
|
|
|
gpr_free(wc_arg); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *arg, grpc_error *error) { |
|
|
|
|
channel_data *chand = arg; |
|
|
|
@ -1037,11 +1073,29 @@ static bool pick_subchannel_locked( |
|
|
|
|
const grpc_lb_policy_pick_args inputs = { |
|
|
|
|
initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem, |
|
|
|
|
gpr_inf_future(GPR_CLOCK_MONOTONIC)}; |
|
|
|
|
const bool result = grpc_lb_policy_pick_locked( |
|
|
|
|
exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready); |
|
|
|
|
|
|
|
|
|
// Wrap the user-provided callback in order to hold a strong reference to
|
|
|
|
|
// the LB policy for the duration of the pick.
|
|
|
|
|
wrapped_on_pick_closure_arg *w_on_pick_arg = |
|
|
|
|
gpr_zalloc(sizeof(*w_on_pick_arg)); |
|
|
|
|
grpc_closure_init(&w_on_pick_arg->wrapper_closure, |
|
|
|
|
wrapped_on_pick_closure_cb, w_on_pick_arg, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
w_on_pick_arg->wrapped_closure = on_ready; |
|
|
|
|
GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping"); |
|
|
|
|
w_on_pick_arg->lb_policy = lb_policy; |
|
|
|
|
const bool pick_done = grpc_lb_policy_pick_locked( |
|
|
|
|
exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, |
|
|
|
|
&w_on_pick_arg->wrapper_closure); |
|
|
|
|
if (pick_done) { |
|
|
|
|
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ |
|
|
|
|
GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy, |
|
|
|
|
"pick_subchannel_wrapping"); |
|
|
|
|
gpr_free(w_on_pick_arg); |
|
|
|
|
} |
|
|
|
|
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); |
|
|
|
|
GPR_TIMER_END("pick_subchannel", 0); |
|
|
|
|
return result; |
|
|
|
|
return pick_done; |
|
|
|
|
} |
|
|
|
|
if (chand->resolver != NULL && !chand->started_resolving) { |
|
|
|
|
chand->started_resolving = true; |
|
|
|
|