Propagate errors through grpc_lb_policy_cancel_pick().

pull/7970/head
Mark D. Roth 8 years ago
parent f7dd851c2a
commit 5f84400ba8
  1. 11
      src/core/ext/client_config/client_channel.c
  2. 5
      src/core/ext/client_config/lb_policy.c
  3. 5
      src/core/ext/client_config/lb_policy.h
  4. 7
      src/core/ext/lb_policy/grpclb/grpclb.c
  5. 7
      src/core/ext/lb_policy/pick_first/pick_first.c
  6. 9
      src/core/ext/lb_policy/round_robin/round_robin.c

@ -466,6 +466,9 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
const char* msg = grpc_error_string(error);
gpr_log(GPR_INFO, "==> %s(): error=%s", __func__, msg);
grpc_error_free_string(msg);
call_data *calld = arg;
gpr_mu_lock(&calld->mu);
GPR_ASSERT(calld->creation_phase ==
@ -544,6 +547,8 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready, grpc_error *error) {
gpr_log(GPR_INFO, "==> %s()", __func__);
GPR_TIMER_BEGIN("pick_subchannel", 0);
channel_data *chand = elem->channel_data;
@ -556,11 +561,13 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
gpr_mu_lock(&chand->mu);
if (initial_metadata == NULL) {
if (chand->lb_policy != NULL) {
gpr_log(GPR_INFO, "asking LB policy to cancel pick");
grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy,
connected_subchannel);
connected_subchannel, GRPC_ERROR_REF(error));
}
for (closure = chand->waiting_for_config_closures.head; closure != NULL;
closure = closure->next_data.next) {
gpr_log(GPR_INFO, "top of closure loop");
cpa = closure->cb_arg;
if (cpa->connected_subchannel == connected_subchannel) {
cpa->connected_subchannel = NULL;
@ -572,6 +579,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
gpr_mu_unlock(&chand->mu);
GPR_TIMER_END("pick_subchannel", 0);
GRPC_ERROR_UNREF(error);
gpr_log(GPR_INFO, "returning from pick_subchannel()");
return true;
}
GPR_ASSERT(error == GRPC_ERROR_NONE);
@ -621,6 +629,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
gpr_log(GPR_INFO, "==> %s()", __func__);
call_data *calld = elem->call_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);

@ -110,8 +110,9 @@ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target) {
policy->vtable->cancel_pick(exec_ctx, policy, target);
grpc_connected_subchannel **target,
grpc_error *error) {
policy->vtable->cancel_pick(exec_ctx, policy, target, error);
}
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,

@ -65,7 +65,7 @@ struct grpc_lb_policy_vtable {
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_closure *on_complete);
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
grpc_connected_subchannel **target, grpc_error *error);
void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq);
@ -139,7 +139,8 @@ void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
grpc_connected_subchannel **target,
grpc_error *error);
/** Cancel all pending picks which have:
(initial_metadata_flags & initial_metadata_flags_mask) ==

@ -533,7 +533,8 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target) {
grpc_connected_subchannel **target,
grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
gpr_mu_lock(&glb_policy->mu);
pending_pick *pp = glb_policy->pending_picks;
@ -545,7 +546,8 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
exec_ctx, pp->pollent, glb_policy->base.interested_parties);
*target = NULL;
grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CANCELLED, NULL);
GRPC_ERROR_CREATE_REFERENCING(
"Pick Cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
@ -554,6 +556,7 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&glb_policy->mu);
GRPC_ERROR_UNREF(error);
}
static grpc_call *lb_client_data_get_call(struct lb_client_data *lb_client);

@ -128,7 +128,8 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target) {
grpc_connected_subchannel **target,
grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@ -141,7 +142,8 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
p->base.interested_parties);
*target = NULL;
grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE("Pick Cancelled"), NULL);
GRPC_ERROR_CREATE_REFERENCING(
"Pick Cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@ -150,6 +152,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error);
}
static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,

@ -281,7 +281,8 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target) {
grpc_connected_subchannel **target,
grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@ -293,8 +294,9 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*target = NULL;
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED,
NULL);
grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING(
"Pick cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@ -303,6 +305,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error);
}
static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,

Loading…
Cancel
Save