Initial fail-fast support

pull/5726/head
Craig Tiller 9 years ago
parent f0ec923498
commit 8c0d96ff86
  1. 6
      include/grpc/impl/codegen/grpc_types.h
  2. 40
      src/core/channel/client_channel.c
  3. 1
      src/core/channel/client_uchannel.c
  4. 10
      src/core/channel/http_client_filter.c
  5. 5
      src/core/channel/subchannel_call_holder.c
  6. 1
      src/core/channel/subchannel_call_holder.h
  7. 36
      src/core/client_config/lb_policies/pick_first.c
  8. 36
      src/core/client_config/lb_policies/round_robin.c
  9. 11
      src/core/client_config/lb_policy.c
  10. 13
      src/core/client_config/lb_policy.h
  11. 3
      src/core/surface/call.c
  12. 6
      src/core/transport/transport.h
  13. 4
      test/core/end2end/dualstack_socket_test.c
  14. 2
      test/core/end2end/tests/simple_delayed_request.c

@ -202,8 +202,12 @@ typedef enum grpc_call_error {
/* Initial metadata flags */
/** Signal that the call is idempotent */
#define GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST (0x00000010u)
/** Signal that the call should not return UNAVAILABLE before it has started */
#define GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY (0x00000020u)
/** Mask of all valid flags */
#define GRPC_INITIAL_METADATA_USED_MASK GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
#define GRPC_INITIAL_METADATA_USED_MASK \
(GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST | \
GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY)
/** A single metadata element */
typedef struct grpc_metadata {

@ -114,6 +114,22 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state);
static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
channel_data *chand,
grpc_connectivity_state state,
const char *reason) {
if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
state == GRPC_CHANNEL_FATAL_FAILURE) &&
chand->lb_policy != NULL) {
/* cancel fail-fast picks */
grpc_lb_policy_cancel_picks(
exec_ctx, chand->lb_policy,
/* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
/* check= */ 0);
}
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, reason);
}
static void on_lb_policy_state_changed_locked(
grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
grpc_connectivity_state publish_state = w->state;
@ -127,8 +143,8 @@ static void on_lb_policy_state_changed_locked(
GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
w->chand->lb_policy = NULL;
}
grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, publish_state,
"lb_changed");
set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
"lb_changed");
if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
}
@ -200,8 +216,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
if (iomgr_success && chand->resolver) {
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state,
"new_lb+resolver");
set_channel_connectivity_state_locked(exec_ctx, chand, state,
"new_lb+resolver");
if (lb_policy != NULL) {
watch_lb_policy(exec_ctx, chand, lb_policy, state);
}
@ -216,8 +232,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
}
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
set_channel_connectivity_state_locked(
exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
gpr_mu_unlock(&chand->mu_config);
}
@ -272,8 +288,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
}
if (op->disconnect && chand->resolver != NULL) {
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
set_channel_connectivity_state_locked(
exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
grpc_resolver_shutdown(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
@ -290,6 +306,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
typedef struct {
grpc_metadata_batch *initial_metadata;
uint32_t initial_metadata_flags;
grpc_connected_subchannel **connected_subchannel;
grpc_closure *on_ready;
grpc_call_element *elem;
@ -298,6 +315,7 @@ typedef struct {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready);
@ -308,6 +326,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
} else if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
} else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags,
cpa->connected_subchannel, cpa->on_ready)) {
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL);
}
@ -316,6 +335,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) {
grpc_call_element *elem = elemp;
@ -349,7 +369,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel");
gpr_mu_unlock(&chand->mu_config);
r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollset,
initial_metadata, connected_subchannel, on_ready);
initial_metadata, initial_metadata_flags,
connected_subchannel, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel");
return r;
}
@ -362,6 +383,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
}
cpa = gpr_malloc(sizeof(*cpa));
cpa->initial_metadata = initial_metadata;
cpa->initial_metadata_flags = initial_metadata_flags;
cpa->connected_subchannel = connected_subchannel;
cpa->on_ready = on_ready;
cpa->elem = elem;

@ -126,6 +126,7 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) {
channel_data *chand = arg;

@ -111,10 +111,12 @@ static void hc_mutate_op(grpc_call_element *elem,
elem);
/* Send : prefixed headers, which have to be before any application
layer headers. */
grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method,
op->send_idempotent_request
? GRPC_MDELEM_METHOD_PUT
: GRPC_MDELEM_METHOD_POST);
grpc_metadata_batch_add_head(
op->send_initial_metadata, &calld->method,
op->send_initial_metadata_flags &
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
? GRPC_MDELEM_METHOD_PUT
: GRPC_MDELEM_METHOD_POST);
grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->scheme,
channeld->static_scheme);
grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->te_trailers,

@ -127,7 +127,7 @@ retry:
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
&holder->connected_subchannel, NULL);
0, &holder->connected_subchannel, NULL);
break;
}
gpr_mu_unlock(&holder->mu);
@ -145,7 +145,8 @@ retry:
GRPC_CALL_STACK_REF(holder->owning_call, "pick_subchannel");
if (holder->pick_subchannel(
exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata,
&holder->connected_subchannel, &holder->next_step)) {
op->send_initial_metadata_flags, &holder->connected_subchannel,
&holder->next_step)) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel");
}

@ -42,6 +42,7 @@
called when the subchannel is available) */
typedef int (*grpc_subchannel_call_holder_pick_subchannel)(
grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready);
typedef enum {

@ -42,6 +42,7 @@
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@ -151,6 +152,32 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
}
static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
*pp->target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
p->pending_picks = pp;
}
pp = next;
}
gpr_mu_unlock(&p->mu);
}
static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
p->started_picking = 1;
p->checking_subchannel = 0;
@ -173,7 +200,8 @@ void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
grpc_connected_subchannel **target, grpc_closure *on_complete) {
uint32_t initial_metadata_flags, grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
@ -200,6 +228,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
pp->next = p->pending_picks;
pp->pollset = pollset;
pp->target = target;
pp->initial_metadata_flags = initial_metadata_flags;
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
@ -378,8 +407,9 @@ void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_ping_one, pf_exit_idle,
pf_check_connectivity, pf_notify_on_state_change};
pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_cancel_picks,
pf_ping_one, pf_exit_idle, pf_check_connectivity,
pf_notify_on_state_change};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}

@ -48,6 +48,7 @@ int grpc_lb_round_robin_trace = 0;
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@ -274,6 +275,32 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
}
static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
*pp->target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
p->pending_picks = pp;
}
pp = next;
}
gpr_mu_unlock(&p->mu);
}
static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
size_t i;
p->started_picking = 1;
@ -302,7 +329,8 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
grpc_connected_subchannel **target, grpc_closure *on_complete) {
uint32_t initial_metadata_flags, grpc_connected_subchannel **target,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
ready_list *selected;
@ -328,6 +356,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
pp->pollset = pollset;
pp->target = target;
pp->on_complete = on_complete;
pp->initial_metadata_flags = initial_metadata_flags;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0;
@ -483,8 +512,9 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_ping_one, rr_exit_idle,
rr_check_connectivity, rr_notify_on_state_change};
rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_cancel_picks,
rr_ping_one, rr_exit_idle, rr_check_connectivity,
rr_notify_on_state_change};
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}

@ -101,10 +101,11 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata,
target, on_complete);
initial_metadata_flags, target, on_complete);
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
@ -112,6 +113,14 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
policy->vtable->cancel_pick(exec_ctx, policy, target);
}
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq) {
policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask,
initial_metadata_flags_eq);
}
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
policy->vtable->exit_idle(exec_ctx, policy);
}

@ -60,9 +60,13 @@ struct grpc_lb_policy_vtable {
/** implement grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_closure *on_complete);
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq);
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
@ -122,6 +126,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete);
@ -131,6 +136,14 @@ void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
/** Cancel all pending picks which have:
(initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq */
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq);
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,

@ -1229,8 +1229,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->metadata_batch[0][0].deadline = call->send_deadline;
stream_op.send_initial_metadata =
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
stream_op.send_idempotent_request =
(op->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) != 0;
stream_op.send_initial_metadata_flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {

@ -84,9 +84,9 @@ typedef struct grpc_transport_stream_op {
/** Send initial metadata to the peer, from the provided metadata batch.
idempotent_request MUST be set if this is non-null */
grpc_metadata_batch *send_initial_metadata;
/** Iff send_initial_metadata != NULL, flags if this is an idempotent request
or not */
bool send_idempotent_request;
/** Iff send_initial_metadata != NULL, flags associated with
send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
uint32_t send_initial_metadata_flags;
/** Send trailing metadata to the peer, from the provided metadata batch. */
grpc_metadata_batch *send_trailing_metadata;

@ -39,9 +39,9 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "src/core/support/string.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/support/string.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/util/port.h"
@ -237,7 +237,7 @@ void test_connect(const char *server_host, const char *client_host, int port,
cq_expect_completion(cqv, tag(1), 1);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_DEADLINE_EXCEEDED);
GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
}
grpc_call_destroy(c);

@ -120,7 +120,7 @@ static void simple_delayed_request_body(grpc_end2end_test_config config,
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->flags = GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;

Loading…
Cancel
Save