Propagate error through grpc_lb_policy_cancel_picks(). Also fix error

reporting (and improve control flow) in on_writable() in tcp_client_posix.c.
pull/7970/head
Mark D. Roth 8 years ago
parent b3405f0a73
commit e65ff11161
  1. 3
      src/core/ext/client_config/client_channel.c
  2. 5
      src/core/ext/client_config/lb_policy.c
  3. 5
      src/core/ext/client_config/lb_policy.h
  4. 2
      src/core/ext/client_config/subchannel.c
  5. 7
      src/core/ext/lb_policy/grpclb/grpclb.c
  6. 7
      src/core/ext/lb_policy/pick_first/pick_first.c
  7. 9
      src/core/ext/lb_policy/round_robin/round_robin.c
  8. 44
      src/core/lib/iomgr/tcp_client_posix.c

@ -112,7 +112,8 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_cancel_picks( grpc_lb_policy_cancel_picks(
exec_ctx, chand->lb_policy, exec_ctx, chand->lb_policy,
/* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY, /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
/* check= */ 0); /* check= */ 0,
GRPC_ERROR_REF(error));
} }
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error, grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
reason); reason);

@ -118,9 +118,10 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq) { uint32_t initial_metadata_flags_eq,
grpc_error *error) {
policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask, policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask,
initial_metadata_flags_eq); initial_metadata_flags_eq, error);
} }
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {

@ -68,7 +68,7 @@ struct grpc_lb_policy_vtable {
grpc_connected_subchannel **target, grpc_error *error); grpc_connected_subchannel **target, grpc_error *error);
void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq); uint32_t initial_metadata_flags_eq, grpc_error *error);
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure); grpc_closure *closure);
@ -148,7 +148,8 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq); uint32_t initial_metadata_flags_eq,
grpc_error *error);
/** Try to enter a READY connectivity state */ /** Try to enter a READY connectivity state */
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);

@ -635,7 +635,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
c->have_alarm = 1; c->have_alarm = 1;
grpc_connectivity_state_set( grpc_connectivity_state_set(
exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1), GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
"connect_failed"); "connect_failed");
gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now); gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
const char *errmsg = grpc_error_string(error); const char *errmsg = grpc_error_string(error);

@ -562,7 +562,8 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static grpc_call *lb_client_data_get_call(struct lb_client_data *lb_client); static grpc_call *lb_client_data_get_call(struct lb_client_data *lb_client);
static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq) { uint32_t initial_metadata_flags_eq,
grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol; glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
gpr_mu_lock(&glb_policy->mu); gpr_mu_lock(&glb_policy->mu);
if (glb_policy->lb_client != NULL) { if (glb_policy->lb_client != NULL) {
@ -578,7 +579,8 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set( grpc_polling_entity_del_from_pollset_set(
exec_ctx, pp->pollent, glb_policy->base.interested_parties); exec_ctx, pp->pollent, glb_policy->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CANCELLED, NULL); GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled",
&error, 1), NULL);
gpr_free(pp); gpr_free(pp);
} else { } else {
pp->next = glb_policy->pending_picks; pp->next = glb_policy->pending_picks;
@ -587,6 +589,7 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next; pp = next;
} }
gpr_mu_unlock(&glb_policy->mu); gpr_mu_unlock(&glb_policy->mu);
GRPC_ERROR_UNREF(error);
} }
static void query_for_backends(grpc_exec_ctx *exec_ctx, static void query_for_backends(grpc_exec_ctx *exec_ctx,

@ -157,7 +157,8 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq) { uint32_t initial_metadata_flags_eq,
grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp; pending_pick *pp;
gpr_mu_lock(&p->mu); gpr_mu_lock(&p->mu);
@ -170,7 +171,8 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties); p->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE("Pick Cancelled"), NULL); GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled",
&error, 1), NULL);
gpr_free(pp); gpr_free(pp);
} else { } else {
pp->next = p->pending_picks; pp->next = p->pending_picks;
@ -179,6 +181,7 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next; pp = next;
} }
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error);
} }
static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {

@ -310,7 +310,8 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq) { uint32_t initial_metadata_flags_eq,
grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp; pending_pick *pp;
gpr_mu_lock(&p->mu); gpr_mu_lock(&p->mu);
@ -323,8 +324,9 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties); p->base.interested_parties);
*pp->target = NULL; *pp->target = NULL;
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
NULL); GRPC_ERROR_CREATE_REFERENCING("Pick cancelled",
&error, 1), NULL);
gpr_free(pp); gpr_free(pp);
} else { } else {
pp->next = p->pending_picks; pp->next = p->pending_picks;
@ -333,6 +335,7 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next; pp = next;
} }
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error);
} }
static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {

@ -146,7 +146,12 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
grpc_timer_cancel(exec_ctx, &ac->alarm); grpc_timer_cancel(exec_ctx, &ac->alarm);
gpr_mu_lock(&ac->mu); gpr_mu_lock(&ac->mu);
if (error == GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
error =
grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, "Timeout occurred");
goto finish;
}
do { do {
so_error_size = sizeof(so_error); so_error_size = sizeof(so_error);
err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error, err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
@ -155,8 +160,15 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (err < 0) { if (err < 0) {
error = GRPC_OS_ERROR(errno, "getsockopt"); error = GRPC_OS_ERROR(errno, "getsockopt");
goto finish; goto finish;
} else if (so_error != 0) { }
if (so_error == ENOBUFS) {
switch (so_error) {
case 0:
grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
*ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str);
fd = NULL;
break;
case ENOBUFS:
/* We will get one of these errors if we have run out of /* We will get one of these errors if we have run out of
memory in the kernel for the data structures allocated memory in the kernel for the data structures allocated
when you connect a socket. If this happens it is very when you connect a socket. If this happens it is very
@ -175,32 +187,16 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
gpr_mu_unlock(&ac->mu); gpr_mu_unlock(&ac->mu);
grpc_fd_notify_on_write(exec_ctx, fd, &ac->write_closure); grpc_fd_notify_on_write(exec_ctx, fd, &ac->write_closure);
return; return;
} else {
switch (so_error) {
case ECONNREFUSED: case ECONNREFUSED:
error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, errno); /* This error shouldn't happen for anything other than connect(). */
error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, error = GRPC_OS_ERROR(so_error, "connect");
"Connection refused");
break; break;
default: default:
error = GRPC_OS_ERROR(errno, "getsockopt(SO_ERROR)"); /* We don't really know which syscall triggered the problem here,
so punt by reporting getsockopt(). */
error = GRPC_OS_ERROR(so_error, "getsockopt(SO_ERROR)");
break; break;
} }
goto finish;
}
} else {
grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
*ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str);
fd = NULL;
goto finish;
}
} else {
error =
grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, "Timeout occurred");
goto finish;
}
GPR_UNREACHABLE_CODE(return );
finish: finish:
if (fd != NULL) { if (fd != NULL) {

Loading…
Cancel
Save