Change round_robin LB policy to use the addresses in the order given.

pull/11227/head
Mark D. Roth 8 years ago
parent af3cc761ad
commit f91eb714c2
  1. 457
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
  2. 6
      test/cpp/end2end/grpclb_end2end_test.cc
  3. 31
      test/cpp/end2end/round_robin_end2end_test.cc

@ -99,26 +99,13 @@ typedef struct pending_pick {
grpc_closure *on_complete; grpc_closure *on_complete;
} pending_pick; } pending_pick;
/** List of subchannels in a connectivity READY state */
typedef struct ready_list {
grpc_subchannel *subchannel;
/* references namesake entry in subchannel_data */
void *user_data;
struct ready_list *next;
struct ready_list *prev;
} ready_list;
typedef struct { typedef struct {
/** index within policy->subchannels */
size_t index;
/** backpointer to owning policy */ /** backpointer to owning policy */
round_robin_lb_policy *policy; round_robin_lb_policy *policy;
/** subchannel itself */ /** subchannel itself */
grpc_subchannel *subchannel; grpc_subchannel *subchannel;
/** notification that connectivity has changed on subchannel */ /** notification that connectivity has changed on subchannel */
grpc_closure connectivity_changed_closure; grpc_closure connectivity_changed_closure;
/** this subchannels current position in subchannel->ready_list */
ready_list *ready_list_node;
/** last observed connectivity. Not updated by /** last observed connectivity. Not updated by
* \a grpc_subchannel_notify_on_state_change. Used to determine the previous * \a grpc_subchannel_notify_on_state_change. Used to determine the previous
* state while processing the new state in \a rr_connectivity_changed */ * state while processing the new state in \a rr_connectivity_changed */
@ -126,6 +113,10 @@ typedef struct {
/** current connectivity state. Updated by \a /** current connectivity state. Updated by \a
* grpc_subchannel_notify_on_state_change */ * grpc_subchannel_notify_on_state_change */
grpc_connectivity_state curr_connectivity_state; grpc_connectivity_state curr_connectivity_state;
/** connectivity state to be updated by the watcher, not guarded by
* the combiner. Will be moved to curr_connectivity_state inside of
* the combiner by rr_connectivity_changed_locked(). */
grpc_connectivity_state pending_connectivity_state_unsafe;
/** the subchannel's target user data */ /** the subchannel's target user data */
void *user_data; void *user_data;
/** vtable to operate over \a user_data */ /** vtable to operate over \a user_data */
@ -141,182 +132,105 @@ struct round_robin_lb_policy {
/** all our subchannels */ /** all our subchannels */
size_t num_subchannels; size_t num_subchannels;
subchannel_data **subchannels; subchannel_data *subchannels;
/** how many subchannels are in TRANSIENT_FAILURE */ /** how many subchannels are in state READY */
size_t num_ready;
/** how many subchannels are in state TRANSIENT_FAILURE */
size_t num_transient_failures; size_t num_transient_failures;
/** how many subchannels are IDLE */ /** how many subchannels are in state IDLE */
size_t num_idle; size_t num_idle;
/** have we started picking? */ /** have we started picking? */
int started_picking; bool started_picking;
/** are we shutting down? */ /** are we shutting down? */
int shutdown; bool shutdown;
/** List of picks that are waiting on connectivity */ /** List of picks that are waiting on connectivity */
pending_pick *pending_picks; pending_pick *pending_picks;
/** our connectivity state tracker */ /** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker; grpc_connectivity_state_tracker state_tracker;
/** (Dummy) root of the doubly linked list containing READY subchannels */ // Index into subchannels for last pick.
ready_list ready_list; size_t last_ready_subchannel_index;
/** Last pick from the ready list. */
ready_list *ready_list_last_pick;
}; };
/** Returns the next subchannel from the connected list or NULL if the list is /** Returns the index into p->subchannels of the next subchannel in
* empty. * READY state, or p->num_subchannels if no subchannel is READY.
* *
* Note that this function does *not* advance p->ready_list_last_pick. Use \a * Note that this function does *not* update p->last_ready_subchannel_index.
* advance_last_picked_locked() for that. */ * The caller must do that if it returns a pick. */
static ready_list *peek_next_connected_locked(const round_robin_lb_policy *p) { static size_t get_next_ready_subchannel_index_locked(
ready_list *selected; const round_robin_lb_policy *p) {
selected = p->ready_list_last_pick->next; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
while (selected != NULL) { "[RR: %p] getting next ready subchannel, "
if (selected == &p->ready_list) { "last_ready_subchannel_index=%zu",
GPR_ASSERT(selected->subchannel == NULL); p, p->last_ready_subchannel_index);
/* skip dummy root */
selected = selected->next;
} else {
GPR_ASSERT(selected->subchannel != NULL);
return selected;
}
} }
return NULL; for (size_t i = 0; i < p->num_subchannels; ++i) {
} const size_t index =
(i + p->last_ready_subchannel_index + 1) % p->num_subchannels;
/** Advance the \a ready_list picking head. */ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
static void advance_last_picked_locked(round_robin_lb_policy *p) { gpr_log(GPR_DEBUG, "[RR %p] checking index %zu: state=%d", p, index,
if (p->ready_list_last_pick->next != NULL) { /* non-empty list */ p->subchannels[index].curr_connectivity_state);
p->ready_list_last_pick = p->ready_list_last_pick->next; }
if (p->ready_list_last_pick == &p->ready_list) { if (p->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) {
/* skip dummy root */ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
p->ready_list_last_pick = p->ready_list_last_pick->next; gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel at index %zu",
p, index);
}
return index;
} }
} else { /* should be an empty list */
GPR_ASSERT(p->ready_list_last_pick == &p->ready_list);
} }
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", p);
"[READYLIST, RR: %p] ADVANCED LAST PICK. NOW AT NODE %p (SC %p, "
"CSC %p)",
(void *)p, (void *)p->ready_list_last_pick,
(void *)p->ready_list_last_pick->subchannel,
(void *)grpc_subchannel_get_connected_subchannel(
p->ready_list_last_pick->subchannel));
} }
return p->num_subchannels;
} }
/** Prepends (relative to the root at p->ready_list) the connected subchannel \a // Sets p->last_ready_subchannel_index to last_ready_index.
* csc to the list of ready subchannels. */ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p,
static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, size_t last_ready_index) {
subchannel_data *sd) { GPR_ASSERT(last_ready_index < p->num_subchannels);
ready_list *new_elem = gpr_zalloc(sizeof(ready_list)); p->last_ready_subchannel_index = last_ready_index;
new_elem->subchannel = sd->subchannel;
new_elem->user_data = sd->user_data;
if (p->ready_list.prev == NULL) {
/* first element */
new_elem->next = &p->ready_list;
new_elem->prev = &p->ready_list;
p->ready_list.next = new_elem;
p->ready_list.prev = new_elem;
} else {
new_elem->next = &p->ready_list;
new_elem->prev = p->ready_list.prev;
p->ready_list.prev->next = new_elem;
p->ready_list.prev = new_elem;
}
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)", gpr_log(GPR_DEBUG,
(void *)new_elem, (void *)sd->subchannel); "[RR: %p] setting last_ready_subchannel_index=%zu (SC %p, CSC %p)",
} (void *)p, last_ready_index,
return new_elem; (void *)p->subchannels[last_ready_index].subchannel,
} (void *)grpc_subchannel_get_connected_subchannel(
p->subchannels[last_ready_index].subchannel));
/** Removes \a node from the list of connected subchannels */
static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
ready_list *node) {
if (node == NULL) {
return;
}
if (node == p->ready_list_last_pick) {
p->ready_list_last_pick = p->ready_list_last_pick->prev;
}
/* removing last item */
if (node->next == &p->ready_list && node->prev == &p->ready_list) {
GPR_ASSERT(p->ready_list.next == node);
GPR_ASSERT(p->ready_list.prev == node);
p->ready_list.next = NULL;
p->ready_list.prev = NULL;
} else {
node->prev->next = node->next;
node->next->prev = node->prev;
}
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node,
(void *)node->subchannel);
} }
node->next = NULL;
node->prev = NULL;
node->subchannel = NULL;
gpr_free(node);
}
static bool is_ready_list_empty(round_robin_lb_policy *p) {
return p->ready_list.prev == NULL;
} }
static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
ready_list *elem;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol); gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol);
} }
for (size_t i = 0; i < p->num_subchannels; i++) { for (size_t i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i]; subchannel_data *sd = &p->subchannels[i];
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy"); if (sd->subchannel != NULL) {
if (sd->user_data != NULL) { GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy");
GPR_ASSERT(sd->user_data_vtable != NULL); if (sd->user_data != NULL) {
sd->user_data_vtable->destroy(exec_ctx, sd->user_data); GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
}
} }
gpr_free(sd);
} }
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels); gpr_free(p->subchannels);
elem = p->ready_list.next;
while (elem != NULL && elem != &p->ready_list) {
ready_list *tmp;
tmp = elem->next;
elem->next = NULL;
elem->prev = NULL;
elem->subchannel = NULL;
gpr_free(elem);
elem = tmp;
}
gpr_free(p); gpr_free(p);
} }
static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
size_t i;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol); gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol);
} }
p->shutdown = true;
p->shutdown = 1; pending_pick *pp;
while ((pp = p->pending_picks)) { while ((pp = p->pending_picks)) {
p->pending_picks = pp->next; p->pending_picks = pp->next;
*pp->target = NULL; *pp->target = NULL;
@ -328,10 +242,13 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_connectivity_state_set( grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown");
for (i = 0; i < p->num_subchannels; i++) { for (size_t i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i]; subchannel_data *sd = &p->subchannels[i];
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, if (sd->subchannel != NULL) {
&sd->connectivity_changed_closure); grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
NULL,
&sd->connectivity_changed_closure);
}
} }
} }
@ -339,8 +256,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target, grpc_connected_subchannel **target,
grpc_error *error) { grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp; pending_pick *pp = p->pending_picks;
pp = p->pending_picks;
p->pending_picks = NULL; p->pending_picks = NULL;
while (pp != NULL) { while (pp != NULL) {
pending_pick *next = pp->next; pending_pick *next = pp->next;
@ -364,8 +280,7 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_eq, uint32_t initial_metadata_flags_eq,
grpc_error *error) { grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp; pending_pick *pp = p->pending_picks;
pp = p->pending_picks;
p->pending_picks = NULL; p->pending_picks = NULL;
while (pp != NULL) { while (pp != NULL) {
pending_pick *next = pp->next; pending_pick *next = pp->next;
@ -387,21 +302,16 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void start_picking_locked(grpc_exec_ctx *exec_ctx, static void start_picking_locked(grpc_exec_ctx *exec_ctx,
round_robin_lb_policy *p) { round_robin_lb_policy *p) {
size_t i; p->started_picking = true;
p->started_picking = 1; for (size_t i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = &p->subchannels[i];
for (i = 0; i < p->num_subchannels; i++) { if (sd->subchannel != NULL) {
subchannel_data *sd = p->subchannels[i]; GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
/* use some sentinel value outside of the range of grpc_connectivity_state grpc_subchannel_notify_on_state_change(
* to signal an undefined previous state. We won't be referring to this exec_ctx, sd->subchannel, p->base.interested_parties,
* value again and it'll be overwritten after the first call to &sd->pending_connectivity_state_unsafe,
* rr_connectivity_changed */ &sd->connectivity_changed_closure);
sd->prev_connectivity_state = GRPC_CHANNEL_INIT; }
sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->curr_connectivity_state, &sd->connectivity_changed_closure);
} }
} }
@ -418,36 +328,32 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_call_context_element *context, void **user_data, grpc_call_context_element *context, void **user_data,
grpc_closure *on_complete) { grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
ready_list *selected;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol);
} }
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
if ((selected = peek_next_connected_locked(p))) { if (next_ready_index < p->num_subchannels) {
/* readily available, report right away */ /* readily available, report right away */
subchannel_data *sd = &p->subchannels[next_ready_index];
*target = GRPC_CONNECTED_SUBCHANNEL_REF( *target = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected->subchannel), grpc_subchannel_get_connected_subchannel(sd->subchannel), "rr_picked");
"rr_picked");
if (user_data != NULL) { if (user_data != NULL) {
*user_data = selected->user_data; *user_data = sd->user_data;
} }
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (INDEX %zu)",
(void *)*target, (void *)selected); (void *)*target, next_ready_index);
} }
/* only advance the last picked pointer if the selection was used */ /* only advance the last picked pointer if the selection was used */
advance_last_picked_locked(p); update_last_ready_subchannel_index_locked(p, next_ready_index);
return 1; return 1;
} else { } else {
/* no pick currently available. Save for later in list of pending picks */ /* no pick currently available. Save for later in list of pending picks */
if (!p->started_picking) { if (!p->started_picking) {
start_picking_locked(exec_ctx, p); start_picking_locked(exec_ctx, p);
} }
pp = gpr_malloc(sizeof(*pp)); pending_pick *pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks; pp->next = p->pending_picks;
pp->target = target; pp->target = target;
pp->on_complete = on_complete; pp->on_complete = on_complete;
@ -458,25 +364,31 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
} }
} }
static void update_state_counters(subchannel_data *sd) { static void update_state_counters_locked(subchannel_data *sd) {
round_robin_lb_policy *p = sd->policy; round_robin_lb_policy *p = sd->policy;
if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
/* update p->num_transient_failures (resp. p->num_idle): if the previous GPR_ASSERT(p->num_ready > 0);
* state was TRANSIENT_FAILURE (resp. IDLE), decrement --p->num_ready;
* p->num_transient_failures (resp. p->num_idle). */ } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(p->num_transient_failures > 0); GPR_ASSERT(p->num_transient_failures > 0);
--p->num_transient_failures; --p->num_transient_failures;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(p->num_idle > 0); GPR_ASSERT(p->num_idle > 0);
--p->num_idle; --p->num_idle;
} }
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
++p->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++p->num_transient_failures;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
++p->num_idle;
}
} }
/* sd is the subchannel_data associted with the updated subchannel. /* sd is the subchannel_data associted with the updated subchannel.
* shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE
* or SHUTDOWN */ * or SHUTDOWN */
static grpc_connectivity_state update_lb_connectivity_status( static grpc_connectivity_state update_lb_connectivity_status_locked(
grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) { grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) {
/* In priority order. The first rule to match terminates the search (ie, if we /* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled). * are on rule n, all previous rules were unfulfilled).
@ -498,7 +410,7 @@ static grpc_connectivity_state update_lb_connectivity_status(
* CHECK: p->num_idle == p->num_subchannels. * CHECK: p->num_idle == p->num_subchannels.
*/ */
round_robin_lb_policy *p = sd->policy; round_robin_lb_policy *p = sd->policy;
if (!is_ready_list_empty(p)) { /* 1) READY */ if (p->num_ready > 0) { /* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready"); GRPC_ERROR_NONE, "rr_ready");
return GRPC_CHANNEL_READY; return GRPC_CHANNEL_READY;
@ -532,32 +444,62 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
subchannel_data *sd = arg; subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->policy; round_robin_lb_policy *p = sd->policy;
pending_pick *pp; // Now that we're inside the combiner, copy the pending connectivity
// state (which was set by the connectivity state watcher) to
GRPC_ERROR_REF(error); // curr_connectivity_state, which is what we use inside of the combiner.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG,
"[RR %p] connectivity changed for subchannel %p: "
"prev_state=%d new_state=%d",
p, sd->subchannel, sd->prev_connectivity_state,
sd->curr_connectivity_state);
}
// If we're shutting down, unref and return.
if (p->shutdown) { if (p->shutdown) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
GRPC_ERROR_UNREF(error);
return; return;
} }
switch (sd->curr_connectivity_state) { // Update state counters and determine new overall state.
case GRPC_CHANNEL_INIT: update_state_counters_locked(sd);
GPR_UNREACHABLE_CODE(return ); sd->prev_connectivity_state = sd->curr_connectivity_state;
case GRPC_CHANNEL_READY: grpc_connectivity_state new_connectivity_state =
/* add the newly connected subchannel to the list of connected ones. update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
* Note that it goes to the "end of the line". */ // If the new state is SHUTDOWN, unref the subchannel, and if the new
sd->ready_list_node = add_connected_sc_locked(p, sd); // overall state is SHUTDOWN, clean up.
if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown");
sd->subchannel = NULL;
if (sd->user_data != NULL) {
GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
}
if (new_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
/* the policy is shutting down. Flush all the pending picks... */
pending_pick *pp;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
}
/* unref the "rr_connectivity" weak ref from start_picking */
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
} else {
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
/* at this point we know there's at least one suitable subchannel. Go /* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in * ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */ * p->pending_picks. This preemtively replicates rr_pick()'s actions. */
ready_list *selected = peek_next_connected_locked(p); const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
GPR_ASSERT(selected != NULL); GPR_ASSERT(next_ready_index < p->num_subchannels);
subchannel_data *selected = &p->subchannels[next_ready_index];
if (p->pending_picks != NULL) { if (p->pending_picks != NULL) {
/* if the selected subchannel is going to be used for the pending /* if the selected subchannel is going to be used for the pending
* picks, update the last picked pointer */ * picks, update the last picked pointer */
advance_last_picked_locked(p); update_last_ready_subchannel_index_locked(p, next_ready_index);
} }
pending_pick *pp;
while ((pp = p->pending_picks)) { while ((pp = p->pending_picks)) {
p->pending_picks = pp->next; p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
@ -568,72 +510,19 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
} }
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (INDEX %zu)",
(void *)selected->subchannel, (void *)selected); (void *)selected->subchannel, next_ready_index);
} }
grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp); gpr_free(pp);
} }
update_lb_connectivity_status(exec_ctx, sd, error); }
sd->prev_connectivity_state = sd->curr_connectivity_state; /* renew notification: reuses the "rr_connectivity" weak ref */
/* renew notification: reuses the "rr_connectivity" weak ref */ grpc_subchannel_notify_on_state_change(
grpc_subchannel_notify_on_state_change( exec_ctx, sd->subchannel, p->base.interested_parties,
exec_ctx, sd->subchannel, p->base.interested_parties, &sd->pending_connectivity_state_unsafe,
&sd->curr_connectivity_state, &sd->connectivity_changed_closure); &sd->connectivity_changed_closure);
break;
case GRPC_CHANNEL_IDLE:
++p->num_idle;
/* fallthrough */
case GRPC_CHANNEL_CONNECTING:
update_state_counters(sd);
update_lb_connectivity_status(exec_ctx, sd, error);
sd->prev_connectivity_state = sd->curr_connectivity_state;
/* renew notification: reuses the "rr_connectivity" weak ref */
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->curr_connectivity_state, &sd->connectivity_changed_closure);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
++p->num_transient_failures;
/* remove from ready list if still present */
if (sd->ready_list_node != NULL) {
remove_disconnected_sc_locked(p, sd->ready_list_node);
sd->ready_list_node = NULL;
}
update_lb_connectivity_status(exec_ctx, sd, error);
sd->prev_connectivity_state = sd->curr_connectivity_state;
/* renew notification: reuses the "rr_connectivity" weak ref */
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->curr_connectivity_state, &sd->connectivity_changed_closure);
break;
case GRPC_CHANNEL_SHUTDOWN:
update_state_counters(sd);
if (sd->ready_list_node != NULL) {
remove_disconnected_sc_locked(p, sd->ready_list_node);
sd->ready_list_node = NULL;
}
--p->num_subchannels;
GPR_SWAP(subchannel_data *, p->subchannels[sd->index],
p->subchannels[p->num_subchannels]);
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown");
p->subchannels[sd->index]->index = sd->index;
if (update_lb_connectivity_status(exec_ctx, sd, error) ==
GRPC_CHANNEL_SHUTDOWN) {
/* the policy is shutting down. Flush all the pending picks... */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
}
gpr_free(sd);
/* unref the "rr_connectivity" weak ref from start_picking */
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
break;
} }
GRPC_ERROR_UNREF(error);
} }
static grpc_connectivity_state rr_check_connectivity_locked( static grpc_connectivity_state rr_check_connectivity_locked(
@ -654,10 +543,10 @@ static void rr_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *closure) { grpc_closure *closure) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
ready_list *selected; const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
grpc_connected_subchannel *target; if (next_ready_index < p->num_subchannels) {
if ((selected = peek_next_connected_locked(p))) { subchannel_data *selected = &p->subchannels[next_ready_index];
target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected->subchannel), grpc_subchannel_get_connected_subchannel(selected->subchannel),
"rr_picked"); "rr_picked");
grpc_connected_subchannel_ping(exec_ctx, target, closure); grpc_connected_subchannel_ping(exec_ctx, target, closure);
@ -708,7 +597,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs); p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs);
grpc_subchannel_args sc_args; grpc_subchannel_args sc_args;
size_t subchannel_idx = 0; size_t subchannel_index = 0;
for (size_t i = 0; i < addresses->num_addresses; i++) { for (size_t i = 0; i < addresses->num_addresses; i++) {
/* Skip balancer addresses, since we only know how to handle backends. */ /* Skip balancer addresses, since we only know how to handle backends. */
if (addresses->addresses[i].is_balancer) continue; if (addresses->addresses[i].is_balancer) continue;
@ -727,42 +616,44 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
char *address_uri = char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address); grpc_sockaddr_to_uri(&addresses->addresses[i].address);
gpr_log(GPR_DEBUG, "Created subchannel %p for address uri %s", gpr_log(GPR_DEBUG, "index %zu: Created subchannel %p for address uri %s",
(void *)subchannel, address_uri); subchannel_index, (void *)subchannel, address_uri);
gpr_free(address_uri); gpr_free(address_uri);
} }
grpc_channel_args_destroy(exec_ctx, new_args); grpc_channel_args_destroy(exec_ctx, new_args);
if (subchannel != NULL) { if (subchannel != NULL) {
subchannel_data *sd = gpr_zalloc(sizeof(*sd)); subchannel_data *sd = &p->subchannels[subchannel_index];
p->subchannels[subchannel_idx] = sd;
sd->policy = p; sd->policy = p;
sd->index = subchannel_idx;
sd->subchannel = subchannel; sd->subchannel = subchannel;
/* use some sentinel value outside of the range of grpc_connectivity_state
* to signal an undefined previous state. We won't be referring to this
* value again and it'll be overwritten after the first call to
* rr_connectivity_changed */
sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
sd->user_data_vtable = addresses->user_data_vtable; sd->user_data_vtable = addresses->user_data_vtable;
if (sd->user_data_vtable != NULL) { if (sd->user_data_vtable != NULL) {
sd->user_data = sd->user_data =
sd->user_data_vtable->copy(addresses->addresses[i].user_data); sd->user_data_vtable->copy(addresses->addresses[i].user_data);
} }
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure, grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed_locked, sd, rr_connectivity_changed_locked, sd,
grpc_combiner_scheduler(args->combiner, false)); grpc_combiner_scheduler(args->combiner, false));
++subchannel_index;
} }
} }
if (subchannel_idx == 0) { if (subchannel_index == 0) {
/* couldn't create any subchannel. Bail out */ /* couldn't create any subchannel. Bail out */
gpr_free(p->subchannels); gpr_free(p->subchannels);
gpr_free(p); gpr_free(p);
return NULL; return NULL;
} }
p->num_subchannels = subchannel_idx; p->num_subchannels = subchannel_index;
/* The (dummy node) root of the ready list */ // Initialize the last pick index to the last subchannel, so that the
p->ready_list.subchannel = NULL; // first pick will start at the beginning of the list.
p->ready_list.prev = NULL; p->last_ready_subchannel_index = subchannel_index - 1;
p->ready_list.next = NULL;
p->ready_list_last_pick = &p->ready_list;
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,

@ -568,9 +568,11 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) {
// only the first half of the backends will receive them. // only the first half of the backends will receive them.
for (size_t i = 0; i < backends_.size(); ++i) { for (size_t i = 0; i < backends_.size(); ++i) {
if (i < backends_.size() / 2) if (i < backends_.size() / 2)
EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); EXPECT_EQ(1U, backend_servers_[i].service_->request_count())
<< "for backend #" << i;
else else
EXPECT_EQ(0U, backend_servers_[i].service_->request_count()); EXPECT_EQ(0U, backend_servers_[i].service_->request_count())
<< "for backend #" << i;
} }
EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2); EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2);
for (const auto& status_and_response : statuses_and_responses) { for (const auto& status_and_response : statuses_and_responses) {

@ -42,7 +42,6 @@
#include <grpc++/server_builder.h> #include <grpc++/server_builder.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
#include "src/proto/grpc/testing/echo.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h"
@ -131,22 +130,10 @@ class RoundRobinEnd2endTest : public ::testing::Test {
int port_; int port_;
std::unique_ptr<Server> server_; std::unique_ptr<Server> server_;
MyTestServiceImpl service_; MyTestServiceImpl service_;
std::unique_ptr<std::thread> thread_;
explicit ServerData(const grpc::string& server_host) { explicit ServerData(const grpc::string& server_host) {
port_ = grpc_pick_unused_port_or_die(); port_ = grpc_pick_unused_port_or_die();
gpr_log(GPR_INFO, "starting server on port %d", port_); gpr_log(GPR_INFO, "starting server on port %d", port_);
std::mutex mu;
std::condition_variable cond;
thread_.reset(new std::thread(
std::bind(&ServerData::Start, this, server_host, &mu, &cond)));
std::unique_lock<std::mutex> lock(mu);
cond.wait(lock);
gpr_log(GPR_INFO, "server startup complete");
}
void Start(const grpc::string& server_host, std::mutex* mu,
std::condition_variable* cond) {
std::ostringstream server_address; std::ostringstream server_address;
server_address << server_host << ":" << port_; server_address << server_host << ":" << port_;
ServerBuilder builder; ServerBuilder builder;
@ -154,18 +141,13 @@ class RoundRobinEnd2endTest : public ::testing::Test {
InsecureServerCredentials()); InsecureServerCredentials());
builder.RegisterService(&service_); builder.RegisterService(&service_);
server_ = builder.BuildAndStart(); server_ = builder.BuildAndStart();
std::lock_guard<std::mutex> lock(*mu); gpr_log(GPR_INFO, "server startup complete");
cond->notify_one();
} }
void Shutdown() { void Shutdown() { server_->Shutdown(); }
server_->Shutdown();
thread_->join();
}
}; };
const grpc::string server_host_; const grpc::string server_host_;
CompletionQueue cli_cq_;
std::shared_ptr<Channel> channel_; std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::vector<std::unique_ptr<ServerData>> servers_; std::vector<std::unique_ptr<ServerData>> servers_;
@ -197,10 +179,13 @@ TEST_F(RoundRobinEnd2endTest, RoundRobin) {
const int kNumServers = 3; const int kNumServers = 3;
StartServers(kNumServers); StartServers(kNumServers);
ResetStub(true /* round_robin */); ResetStub(true /* round_robin */);
SendRpc(kNumServers); // Send one RPC per backend and make sure they are used in order.
// One request should have gone to each server. // Note: This relies on the fact that the subchannels are reported in
// state READY in the order in which the addresses are specified,
// which is only true because the backends are all local.
for (size_t i = 0; i < servers_.size(); ++i) { for (size_t i = 0; i < servers_.size(); ++i) {
EXPECT_EQ(1, servers_[i]->service_.request_count()); SendRpc(1);
EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
} }
// Check LB policy name for the channel. // Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());

Loading…
Cancel
Save