diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index 6e7f4106358..2acde70f4c9 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -99,26 +99,13 @@ typedef struct pending_pick { grpc_closure *on_complete; } pending_pick; -/** List of subchannels in a connectivity READY state */ -typedef struct ready_list { - grpc_subchannel *subchannel; - /* references namesake entry in subchannel_data */ - void *user_data; - struct ready_list *next; - struct ready_list *prev; -} ready_list; - typedef struct { - /** index within policy->subchannels */ - size_t index; /** backpointer to owning policy */ round_robin_lb_policy *policy; /** subchannel itself */ grpc_subchannel *subchannel; /** notification that connectivity has changed on subchannel */ grpc_closure connectivity_changed_closure; - /** this subchannels current position in subchannel->ready_list */ - ready_list *ready_list_node; /** last observed connectivity. Not updated by * \a grpc_subchannel_notify_on_state_change. Used to determine the previous * state while processing the new state in \a rr_connectivity_changed */ @@ -126,6 +113,10 @@ typedef struct { /** current connectivity state. Updated by \a * grpc_subchannel_notify_on_state_change */ grpc_connectivity_state curr_connectivity_state; + /** connectivity state to be updated by the watcher, not guarded by + * the combiner. Will be moved to curr_connectivity_state inside of + * the combiner by rr_connectivity_changed_locked(). */ + grpc_connectivity_state pending_connectivity_state_unsafe; /** the subchannel's target user data */ void *user_data; /** vtable to operate over \a user_data */ @@ -141,182 +132,105 @@ struct round_robin_lb_policy { /** all our subchannels */ size_t num_subchannels; - subchannel_data **subchannels; + subchannel_data *subchannels; - /** how many subchannels are in TRANSIENT_FAILURE */ + /** how many subchannels are in state READY */ + size_t num_ready; + /** how many subchannels are in state TRANSIENT_FAILURE */ size_t num_transient_failures; - /** how many subchannels are IDLE */ + /** how many subchannels are in state IDLE */ size_t num_idle; /** have we started picking? */ - int started_picking; + bool started_picking; /** are we shutting down? */ - int shutdown; + bool shutdown; /** List of picks that are waiting on connectivity */ pending_pick *pending_picks; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker; - /** (Dummy) root of the doubly linked list containing READY subchannels */ - ready_list ready_list; - /** Last pick from the ready list. */ - ready_list *ready_list_last_pick; + // Index into subchannels for last pick. + size_t last_ready_subchannel_index; }; -/** Returns the next subchannel from the connected list or NULL if the list is - * empty. +/** Returns the index into p->subchannels of the next subchannel in + * READY state, or p->num_subchannels if no subchannel is READY. * - * Note that this function does *not* advance p->ready_list_last_pick. Use \a - * advance_last_picked_locked() for that. */ -static ready_list *peek_next_connected_locked(const round_robin_lb_policy *p) { - ready_list *selected; - selected = p->ready_list_last_pick->next; - - while (selected != NULL) { - if (selected == &p->ready_list) { - GPR_ASSERT(selected->subchannel == NULL); - /* skip dummy root */ - selected = selected->next; - } else { - GPR_ASSERT(selected->subchannel != NULL); - return selected; - } + * Note that this function does *not* update p->last_ready_subchannel_index. + * The caller must do that if it returns a pick. */ +static size_t get_next_ready_subchannel_index_locked( + const round_robin_lb_policy *p) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, + "[RR: %p] getting next ready subchannel, " + "last_ready_subchannel_index=%zu", + p, p->last_ready_subchannel_index); } - return NULL; -} - -/** Advance the \a ready_list picking head. */ -static void advance_last_picked_locked(round_robin_lb_policy *p) { - if (p->ready_list_last_pick->next != NULL) { /* non-empty list */ - p->ready_list_last_pick = p->ready_list_last_pick->next; - if (p->ready_list_last_pick == &p->ready_list) { - /* skip dummy root */ - p->ready_list_last_pick = p->ready_list_last_pick->next; + for (size_t i = 0; i < p->num_subchannels; ++i) { + const size_t index = + (i + p->last_ready_subchannel_index + 1) % p->num_subchannels; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "[RR %p] checking index %zu: state=%d", p, index, + p->subchannels[index].curr_connectivity_state); + } + if (p->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel at index %zu", + p, index); + } + return index; } - } else { /* should be an empty list */ - GPR_ASSERT(p->ready_list_last_pick == &p->ready_list); } - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[READYLIST, RR: %p] ADVANCED LAST PICK. NOW AT NODE %p (SC %p, " - "CSC %p)", - (void *)p, (void *)p->ready_list_last_pick, - (void *)p->ready_list_last_pick->subchannel, - (void *)grpc_subchannel_get_connected_subchannel( - p->ready_list_last_pick->subchannel)); + gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", p); } + return p->num_subchannels; } -/** Prepends (relative to the root at p->ready_list) the connected subchannel \a - * csc to the list of ready subchannels. */ -static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, - subchannel_data *sd) { - ready_list *new_elem = gpr_zalloc(sizeof(ready_list)); - new_elem->subchannel = sd->subchannel; - new_elem->user_data = sd->user_data; - if (p->ready_list.prev == NULL) { - /* first element */ - new_elem->next = &p->ready_list; - new_elem->prev = &p->ready_list; - p->ready_list.next = new_elem; - p->ready_list.prev = new_elem; - } else { - new_elem->next = &p->ready_list; - new_elem->prev = p->ready_list.prev; - p->ready_list.prev->next = new_elem; - p->ready_list.prev = new_elem; - } +// Sets p->last_ready_subchannel_index to last_ready_index. +static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p, + size_t last_ready_index) { + GPR_ASSERT(last_ready_index < p->num_subchannels); + p->last_ready_subchannel_index = last_ready_index; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)", - (void *)new_elem, (void *)sd->subchannel); - } - return new_elem; -} - -/** Removes \a node from the list of connected subchannels */ -static void remove_disconnected_sc_locked(round_robin_lb_policy *p, - ready_list *node) { - if (node == NULL) { - return; - } - if (node == p->ready_list_last_pick) { - p->ready_list_last_pick = p->ready_list_last_pick->prev; - } - - /* removing last item */ - if (node->next == &p->ready_list && node->prev == &p->ready_list) { - GPR_ASSERT(p->ready_list.next == node); - GPR_ASSERT(p->ready_list.prev == node); - p->ready_list.next = NULL; - p->ready_list.prev = NULL; - } else { - node->prev->next = node->next; - node->next->prev = node->prev; - } - - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node, - (void *)node->subchannel); + gpr_log(GPR_DEBUG, + "[RR: %p] setting last_ready_subchannel_index=%zu (SC %p, CSC %p)", + (void *)p, last_ready_index, + (void *)p->subchannels[last_ready_index].subchannel, + (void *)grpc_subchannel_get_connected_subchannel( + p->subchannels[last_ready_index].subchannel)); } - - node->next = NULL; - node->prev = NULL; - node->subchannel = NULL; - - gpr_free(node); -} - -static bool is_ready_list_empty(round_robin_lb_policy *p) { - return p->ready_list.prev == NULL; } static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - ready_list *elem; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol); } - for (size_t i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = p->subchannels[i]; - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy"); - if (sd->user_data != NULL) { - GPR_ASSERT(sd->user_data_vtable != NULL); - sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + subchannel_data *sd = &p->subchannels[i]; + if (sd->subchannel != NULL) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy"); + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable != NULL); + sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + } } - gpr_free(sd); } - grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p->subchannels); - - elem = p->ready_list.next; - while (elem != NULL && elem != &p->ready_list) { - ready_list *tmp; - tmp = elem->next; - elem->next = NULL; - elem->prev = NULL; - elem->subchannel = NULL; - gpr_free(elem); - elem = tmp; - } - gpr_free(p); } static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - size_t i; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol); } - - p->shutdown = 1; + p->shutdown = true; + pending_pick *pp; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; @@ -328,10 +242,13 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); - for (i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = p->subchannels[i]; - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, - &sd->connectivity_changed_closure); + for (size_t i = 0; i < p->num_subchannels; i++) { + subchannel_data *sd = &p->subchannels[i]; + if (sd->subchannel != NULL) { + grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, + NULL, + &sd->connectivity_changed_closure); + } } } @@ -339,8 +256,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel **target, grpc_error *error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - pp = p->pending_picks; + pending_pick *pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; @@ -364,8 +280,7 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_eq, grpc_error *error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - pp = p->pending_picks; + pending_pick *pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; @@ -387,21 +302,16 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void start_picking_locked(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { - size_t i; - p->started_picking = 1; - - for (i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = p->subchannels[i]; - /* use some sentinel value outside of the range of grpc_connectivity_state - * to signal an undefined previous state. We won't be referring to this - * value again and it'll be overwritten after the first call to - * rr_connectivity_changed */ - sd->prev_connectivity_state = GRPC_CHANNEL_INIT; - sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->curr_connectivity_state, &sd->connectivity_changed_closure); + p->started_picking = true; + for (size_t i = 0; i < p->num_subchannels; i++) { + subchannel_data *sd = &p->subchannels[i]; + if (sd->subchannel != NULL) { + GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); + } } } @@ -418,36 +328,32 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - ready_list *selected; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); } - - if ((selected = peek_next_connected_locked(p))) { + const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); + if (next_ready_index < p->num_subchannels) { /* readily available, report right away */ + subchannel_data *sd = &p->subchannels[next_ready_index]; *target = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(selected->subchannel), - "rr_picked"); - + grpc_subchannel_get_connected_subchannel(sd->subchannel), "rr_picked"); if (user_data != NULL) { - *user_data = selected->user_data; + *user_data = sd->user_data; } if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, - "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", - (void *)*target, (void *)selected); + "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (INDEX %zu)", + (void *)*target, next_ready_index); } /* only advance the last picked pointer if the selection was used */ - advance_last_picked_locked(p); + update_last_ready_subchannel_index_locked(p, next_ready_index); return 1; } else { /* no pick currently available. Save for later in list of pending picks */ if (!p->started_picking) { start_picking_locked(exec_ctx, p); } - pp = gpr_malloc(sizeof(*pp)); + pending_pick *pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->target = target; pp->on_complete = on_complete; @@ -458,25 +364,31 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } } -static void update_state_counters(subchannel_data *sd) { +static void update_state_counters_locked(subchannel_data *sd) { round_robin_lb_policy *p = sd->policy; - - /* update p->num_transient_failures (resp. p->num_idle): if the previous - * state was TRANSIENT_FAILURE (resp. IDLE), decrement - * p->num_transient_failures (resp. p->num_idle). */ - if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { + GPR_ASSERT(p->num_ready > 0); + --p->num_ready; + } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { GPR_ASSERT(p->num_transient_failures > 0); --p->num_transient_failures; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { GPR_ASSERT(p->num_idle > 0); --p->num_idle; } + if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { + ++p->num_ready; + } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + ++p->num_transient_failures; + } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { + ++p->num_idle; + } } /* sd is the subchannel_data associted with the updated subchannel. * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE * or SHUTDOWN */ -static grpc_connectivity_state update_lb_connectivity_status( +static grpc_connectivity_state update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) { /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). @@ -498,7 +410,7 @@ static grpc_connectivity_state update_lb_connectivity_status( * CHECK: p->num_idle == p->num_subchannels. */ round_robin_lb_policy *p = sd->policy; - if (!is_ready_list_empty(p)) { /* 1) READY */ + if (p->num_ready > 0) { /* 1) READY */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); return GRPC_CHANNEL_READY; @@ -532,32 +444,62 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { subchannel_data *sd = arg; round_robin_lb_policy *p = sd->policy; - pending_pick *pp; - - GRPC_ERROR_REF(error); - + // Now that we're inside the combiner, copy the pending connectivity + // state (which was set by the connectivity state watcher) to + // curr_connectivity_state, which is what we use inside of the combiner. + sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, + "[RR %p] connectivity changed for subchannel %p: " + "prev_state=%d new_state=%d", + p, sd->subchannel, sd->prev_connectivity_state, + sd->curr_connectivity_state); + } + // If we're shutting down, unref and return. if (p->shutdown) { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); - GRPC_ERROR_UNREF(error); return; } - switch (sd->curr_connectivity_state) { - case GRPC_CHANNEL_INIT: - GPR_UNREACHABLE_CODE(return ); - case GRPC_CHANNEL_READY: - /* add the newly connected subchannel to the list of connected ones. - * Note that it goes to the "end of the line". */ - sd->ready_list_node = add_connected_sc_locked(p, sd); + // Update state counters and determine new overall state. + update_state_counters_locked(sd); + sd->prev_connectivity_state = sd->curr_connectivity_state; + grpc_connectivity_state new_connectivity_state = + update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error)); + // If the new state is SHUTDOWN, unref the subchannel, and if the new + // overall state is SHUTDOWN, clean up. + if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); + sd->subchannel = NULL; + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable != NULL); + sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + } + if (new_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + /* the policy is shutting down. Flush all the pending picks... */ + pending_pick *pp; + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + gpr_free(pp); + } + } + /* unref the "rr_connectivity" weak ref from start_picking */ + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); + } else { + if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ - ready_list *selected = peek_next_connected_locked(p); - GPR_ASSERT(selected != NULL); + const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); + GPR_ASSERT(next_ready_index < p->num_subchannels); + subchannel_data *selected = &p->subchannels[next_ready_index]; if (p->pending_picks != NULL) { /* if the selected subchannel is going to be used for the pending * picks, update the last picked pointer */ - advance_last_picked_locked(p); + update_last_ready_subchannel_index_locked(p, next_ready_index); } + pending_pick *pp; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( @@ -568,72 +510,19 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, } if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, - "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", - (void *)selected->subchannel, (void *)selected); + "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (INDEX %zu)", + (void *)selected->subchannel, next_ready_index); } grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } - update_lb_connectivity_status(exec_ctx, sd, error); - sd->prev_connectivity_state = sd->curr_connectivity_state; - /* renew notification: reuses the "rr_connectivity" weak ref */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->curr_connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_IDLE: - ++p->num_idle; - /* fallthrough */ - case GRPC_CHANNEL_CONNECTING: - update_state_counters(sd); - update_lb_connectivity_status(exec_ctx, sd, error); - sd->prev_connectivity_state = sd->curr_connectivity_state; - /* renew notification: reuses the "rr_connectivity" weak ref */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->curr_connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - ++p->num_transient_failures; - /* remove from ready list if still present */ - if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked(p, sd->ready_list_node); - sd->ready_list_node = NULL; - } - update_lb_connectivity_status(exec_ctx, sd, error); - sd->prev_connectivity_state = sd->curr_connectivity_state; - /* renew notification: reuses the "rr_connectivity" weak ref */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->curr_connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_SHUTDOWN: - update_state_counters(sd); - if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked(p, sd->ready_list_node); - sd->ready_list_node = NULL; - } - --p->num_subchannels; - GPR_SWAP(subchannel_data *, p->subchannels[sd->index], - p->subchannels[p->num_subchannels]); - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); - p->subchannels[sd->index]->index = sd->index; - if (update_lb_connectivity_status(exec_ctx, sd, error) == - GRPC_CHANNEL_SHUTDOWN) { - /* the policy is shutting down. Flush all the pending picks... */ - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); - } - } - gpr_free(sd); - /* unref the "rr_connectivity" weak ref from start_picking */ - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); - break; + } + /* renew notification: reuses the "rr_connectivity" weak ref */ + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); } - GRPC_ERROR_UNREF(error); } static grpc_connectivity_state rr_check_connectivity_locked( @@ -654,10 +543,10 @@ static void rr_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *closure) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - ready_list *selected; - grpc_connected_subchannel *target; - if ((selected = peek_next_connected_locked(p))) { - target = GRPC_CONNECTED_SUBCHANNEL_REF( + const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); + if (next_ready_index < p->num_subchannels) { + subchannel_data *selected = &p->subchannels[next_ready_index]; + grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected->subchannel), "rr_picked"); grpc_connected_subchannel_ping(exec_ctx, target, closure); @@ -708,7 +597,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; - size_t subchannel_idx = 0; + size_t subchannel_index = 0; for (size_t i = 0; i < addresses->num_addresses; i++) { /* Skip balancer addresses, since we only know how to handle backends. */ if (addresses->addresses[i].is_balancer) continue; @@ -727,42 +616,44 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, "Created subchannel %p for address uri %s", - (void *)subchannel, address_uri); + gpr_log(GPR_DEBUG, "index %zu: Created subchannel %p for address uri %s", + subchannel_index, (void *)subchannel, address_uri); gpr_free(address_uri); } grpc_channel_args_destroy(exec_ctx, new_args); if (subchannel != NULL) { - subchannel_data *sd = gpr_zalloc(sizeof(*sd)); - p->subchannels[subchannel_idx] = sd; + subchannel_data *sd = &p->subchannels[subchannel_index]; sd->policy = p; - sd->index = subchannel_idx; sd->subchannel = subchannel; + /* use some sentinel value outside of the range of grpc_connectivity_state + * to signal an undefined previous state. We won't be referring to this + * value again and it'll be overwritten after the first call to + * rr_connectivity_changed */ + sd->prev_connectivity_state = GRPC_CHANNEL_INIT; + sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; sd->user_data_vtable = addresses->user_data_vtable; if (sd->user_data_vtable != NULL) { sd->user_data = sd->user_data_vtable->copy(addresses->addresses[i].user_data); } - ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed_locked, sd, grpc_combiner_scheduler(args->combiner, false)); + ++subchannel_index; } } - if (subchannel_idx == 0) { + if (subchannel_index == 0) { /* couldn't create any subchannel. Bail out */ gpr_free(p->subchannels); gpr_free(p); return NULL; } - p->num_subchannels = subchannel_idx; + p->num_subchannels = subchannel_index; - /* The (dummy node) root of the ready list */ - p->ready_list.subchannel = NULL; - p->ready_list.prev = NULL; - p->ready_list.next = NULL; - p->ready_list_last_pick = &p->ready_list; + // Initialize the last pick index to the last subchannel, so that the + // first pick will start at the beginning of the list. + p->last_ready_subchannel_index = subchannel_index - 1; grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 4e1bcc7a60d..b0d4e2dadf5 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -568,9 +568,11 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { // only the first half of the backends will receive them. for (size_t i = 0; i < backends_.size(); ++i) { if (i < backends_.size() / 2) - EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); + EXPECT_EQ(1U, backend_servers_[i].service_->request_count()) + << "for backend #" << i; else - EXPECT_EQ(0U, backend_servers_[i].service_->request_count()); + EXPECT_EQ(0U, backend_servers_[i].service_->request_count()) + << "for backend #" << i; } EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2); for (const auto& status_and_response : statuses_and_responses) { diff --git a/test/cpp/end2end/round_robin_end2end_test.cc b/test/cpp/end2end/round_robin_end2end_test.cc index f8e3cc06c0a..ea7639bc8f0 100644 --- a/test/cpp/end2end/round_robin_end2end_test.cc +++ b/test/cpp/end2end/round_robin_end2end_test.cc @@ -42,7 +42,6 @@ #include #include #include -#include #include #include "src/proto/grpc/testing/echo.grpc.pb.h" @@ -131,22 +130,10 @@ class RoundRobinEnd2endTest : public ::testing::Test { int port_; std::unique_ptr server_; MyTestServiceImpl service_; - std::unique_ptr thread_; explicit ServerData(const grpc::string& server_host) { port_ = grpc_pick_unused_port_or_die(); gpr_log(GPR_INFO, "starting server on port %d", port_); - std::mutex mu; - std::condition_variable cond; - thread_.reset(new std::thread( - std::bind(&ServerData::Start, this, server_host, &mu, &cond))); - std::unique_lock lock(mu); - cond.wait(lock); - gpr_log(GPR_INFO, "server startup complete"); - } - - void Start(const grpc::string& server_host, std::mutex* mu, - std::condition_variable* cond) { std::ostringstream server_address; server_address << server_host << ":" << port_; ServerBuilder builder; @@ -154,18 +141,13 @@ class RoundRobinEnd2endTest : public ::testing::Test { InsecureServerCredentials()); builder.RegisterService(&service_); server_ = builder.BuildAndStart(); - std::lock_guard lock(*mu); - cond->notify_one(); + gpr_log(GPR_INFO, "server startup complete"); } - void Shutdown() { - server_->Shutdown(); - thread_->join(); - } + void Shutdown() { server_->Shutdown(); } }; const grpc::string server_host_; - CompletionQueue cli_cq_; std::shared_ptr channel_; std::unique_ptr stub_; std::vector> servers_; @@ -197,10 +179,13 @@ TEST_F(RoundRobinEnd2endTest, RoundRobin) { const int kNumServers = 3; StartServers(kNumServers); ResetStub(true /* round_robin */); - SendRpc(kNumServers); - // One request should have gone to each server. + // Send one RPC per backend and make sure they are used in order. + // Note: This relies on the fact that the subchannels are reported in + // state READY in the order in which the addresses are specified, + // which is only true because the backends are all local. for (size_t i = 0; i < servers_.size(); ++i) { - EXPECT_EQ(1, servers_[i]->service_.request_count()); + SendRpc(1); + EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i; } // Check LB policy name for the channel. EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());