|
|
|
@ -46,12 +46,12 @@ grpc_tracer_flag grpc_lb_round_robin_trace = |
|
|
|
|
* |
|
|
|
|
* Once a pick is available, \a target is updated and \a on_complete called. */ |
|
|
|
|
typedef struct pending_pick { |
|
|
|
|
struct pending_pick *next; |
|
|
|
|
struct pending_pick* next; |
|
|
|
|
|
|
|
|
|
/* output argument where to store the pick()ed user_data. It'll be NULL if no
|
|
|
|
|
* such data is present or there's an error (the definite test for errors is |
|
|
|
|
* \a target being NULL). */ |
|
|
|
|
void **user_data; |
|
|
|
|
void** user_data; |
|
|
|
|
|
|
|
|
|
/* bitmask passed to pick() and used for selective cancelling. See
|
|
|
|
|
* grpc_lb_policy_cancel_picks() */ |
|
|
|
@ -59,24 +59,24 @@ typedef struct pending_pick { |
|
|
|
|
|
|
|
|
|
/* output argument where to store the pick()ed connected subchannel, or NULL
|
|
|
|
|
* upon error. */ |
|
|
|
|
grpc_connected_subchannel **target; |
|
|
|
|
grpc_connected_subchannel** target; |
|
|
|
|
|
|
|
|
|
/* to be invoked once the pick() has completed (regardless of success) */ |
|
|
|
|
grpc_closure *on_complete; |
|
|
|
|
grpc_closure* on_complete; |
|
|
|
|
} pending_pick; |
|
|
|
|
|
|
|
|
|
typedef struct round_robin_lb_policy { |
|
|
|
|
/** base policy: must be first */ |
|
|
|
|
grpc_lb_policy base; |
|
|
|
|
|
|
|
|
|
grpc_lb_subchannel_list *subchannel_list; |
|
|
|
|
grpc_lb_subchannel_list* subchannel_list; |
|
|
|
|
|
|
|
|
|
/** have we started picking? */ |
|
|
|
|
bool started_picking; |
|
|
|
|
/** are we shutting down? */ |
|
|
|
|
bool shutdown; |
|
|
|
|
/** List of picks that are waiting on connectivity */ |
|
|
|
|
pending_pick *pending_picks; |
|
|
|
|
pending_pick* pending_picks; |
|
|
|
|
|
|
|
|
|
/** our connectivity state tracker */ |
|
|
|
|
grpc_connectivity_state_tracker state_tracker; |
|
|
|
@ -89,7 +89,7 @@ typedef struct round_robin_lb_policy { |
|
|
|
|
* lists if they equal \a latest_pending_subchannel_list. In other words, |
|
|
|
|
* racing callbacks that reference outdated subchannel lists won't perform any |
|
|
|
|
* update. */ |
|
|
|
|
grpc_lb_subchannel_list *latest_pending_subchannel_list; |
|
|
|
|
grpc_lb_subchannel_list* latest_pending_subchannel_list; |
|
|
|
|
} round_robin_lb_policy; |
|
|
|
|
|
|
|
|
|
/** Returns the index into p->subchannel_list->subchannels of the next
|
|
|
|
@ -99,13 +99,13 @@ typedef struct round_robin_lb_policy { |
|
|
|
|
* Note that this function does *not* update p->last_ready_subchannel_index. |
|
|
|
|
* The caller must do that if it returns a pick. */ |
|
|
|
|
static size_t get_next_ready_subchannel_index_locked( |
|
|
|
|
const round_robin_lb_policy *p) { |
|
|
|
|
const round_robin_lb_policy* p) { |
|
|
|
|
GPR_ASSERT(p->subchannel_list != NULL); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[RR %p] getting next ready subchannel (out of %lu), " |
|
|
|
|
"last_ready_subchannel_index=%lu", |
|
|
|
|
(void *)p, (unsigned long)p->subchannel_list->num_subchannels, |
|
|
|
|
(void*)p, (unsigned long)p->subchannel_list->num_subchannels, |
|
|
|
|
(unsigned long)p->last_ready_subchannel_index); |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { |
|
|
|
@ -116,8 +116,8 @@ static size_t get_next_ready_subchannel_index_locked( |
|
|
|
|
GPR_DEBUG, |
|
|
|
|
"[RR %p] checking subchannel %p, subchannel_list %p, index %lu: " |
|
|
|
|
"state=%s", |
|
|
|
|
(void *)p, (void *)p->subchannel_list->subchannels[index].subchannel, |
|
|
|
|
(void *)p->subchannel_list, (unsigned long)index, |
|
|
|
|
(void*)p, (void*)p->subchannel_list->subchannels[index].subchannel, |
|
|
|
|
(void*)p->subchannel_list, (unsigned long)index, |
|
|
|
|
grpc_connectivity_state_name( |
|
|
|
|
p->subchannel_list->subchannels[index].curr_connectivity_state)); |
|
|
|
|
} |
|
|
|
@ -127,40 +127,39 @@ static size_t get_next_ready_subchannel_index_locked( |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"[RR %p] found next ready subchannel (%p) at index %lu of " |
|
|
|
|
"subchannel_list %p", |
|
|
|
|
(void *)p, |
|
|
|
|
(void *)p->subchannel_list->subchannels[index].subchannel, |
|
|
|
|
(unsigned long)index, (void *)p->subchannel_list); |
|
|
|
|
(void*)p, |
|
|
|
|
(void*)p->subchannel_list->subchannels[index].subchannel, |
|
|
|
|
(unsigned long)index, (void*)p->subchannel_list); |
|
|
|
|
} |
|
|
|
|
return index; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", (void *)p); |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", (void*)p); |
|
|
|
|
} |
|
|
|
|
return p->subchannel_list->num_subchannels; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Sets p->last_ready_subchannel_index to last_ready_index.
|
|
|
|
|
static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p, |
|
|
|
|
static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p, |
|
|
|
|
size_t last_ready_index) { |
|
|
|
|
GPR_ASSERT(last_ready_index < p->subchannel_list->num_subchannels); |
|
|
|
|
p->last_ready_subchannel_index = last_ready_index; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { |
|
|
|
|
gpr_log( |
|
|
|
|
GPR_DEBUG, |
|
|
|
|
"[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", |
|
|
|
|
(void *)p, (unsigned long)last_ready_index, |
|
|
|
|
(void *)p->subchannel_list->subchannels[last_ready_index].subchannel, |
|
|
|
|
(void *)p->subchannel_list->subchannels[last_ready_index] |
|
|
|
|
.connected_subchannel); |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", |
|
|
|
|
(void*)p, (unsigned long)last_ready_index, |
|
|
|
|
(void*)p->subchannel_list->subchannels[last_ready_index].subchannel, |
|
|
|
|
(void*)p->subchannel_list->subchannels[last_ready_index] |
|
|
|
|
.connected_subchannel); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
|
|
|
|
static void rr_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { |
|
|
|
|
round_robin_lb_policy* p = (round_robin_lb_policy*)pol; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p", |
|
|
|
|
(void *)pol, (void *)pol); |
|
|
|
|
(void*)pol, (void*)pol); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(p->subchannel_list == NULL); |
|
|
|
|
GPR_ASSERT(p->latest_pending_subchannel_list == NULL); |
|
|
|
@ -169,13 +168,13 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { |
|
|
|
|
gpr_free(p); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void shutdown_locked(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
static void shutdown_locked(grpc_exec_ctx* exec_ctx, round_robin_lb_policy* p, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p); |
|
|
|
|
} |
|
|
|
|
p->shutdown = true; |
|
|
|
|
pending_pick *pp; |
|
|
|
|
pending_pick* pp; |
|
|
|
|
while ((pp = p->pending_picks) != NULL) { |
|
|
|
|
p->pending_picks = pp->next; |
|
|
|
|
*pp->target = NULL; |
|
|
|
@ -199,20 +198,20 @@ static void shutdown_locked(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p, |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
|
|
|
|
static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { |
|
|
|
|
round_robin_lb_policy* p = (round_robin_lb_policy*)pol; |
|
|
|
|
shutdown_locked(exec_ctx, p, |
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, |
|
|
|
|
grpc_connected_subchannel **target, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
|
|
|
|
pending_pick *pp = p->pending_picks; |
|
|
|
|
static void rr_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, |
|
|
|
|
grpc_connected_subchannel** target, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
round_robin_lb_policy* p = (round_robin_lb_policy*)pol; |
|
|
|
|
pending_pick* pp = p->pending_picks; |
|
|
|
|
p->pending_picks = NULL; |
|
|
|
|
while (pp != NULL) { |
|
|
|
|
pending_pick *next = pp->next; |
|
|
|
|
pending_pick* next = pp->next; |
|
|
|
|
if (pp->target == target) { |
|
|
|
|
*target = NULL; |
|
|
|
|
GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, |
|
|
|
@ -228,15 +227,15 @@ static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, |
|
|
|
|
static void rr_cancel_picks_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, |
|
|
|
|
uint32_t initial_metadata_flags_mask, |
|
|
|
|
uint32_t initial_metadata_flags_eq, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
|
|
|
|
pending_pick *pp = p->pending_picks; |
|
|
|
|
grpc_error* error) { |
|
|
|
|
round_robin_lb_policy* p = (round_robin_lb_policy*)pol; |
|
|
|
|
pending_pick* pp = p->pending_picks; |
|
|
|
|
p->pending_picks = NULL; |
|
|
|
|
while (pp != NULL) { |
|
|
|
|
pending_pick *next = pp->next; |
|
|
|
|
pending_pick* next = pp->next; |
|
|
|
|
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == |
|
|
|
|
initial_metadata_flags_eq) { |
|
|
|
|
*pp->target = NULL; |
|
|
|
@ -253,8 +252,8 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void start_picking_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
round_robin_lb_policy *p) { |
|
|
|
|
static void start_picking_locked(grpc_exec_ctx* exec_ctx, |
|
|
|
|
round_robin_lb_policy* p) { |
|
|
|
|
p->started_picking = true; |
|
|
|
|
for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { |
|
|
|
|
grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list, |
|
|
|
@ -264,28 +263,29 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
|
|
|
|
static void rr_exit_idle_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { |
|
|
|
|
round_robin_lb_policy* p = (round_robin_lb_policy*)pol; |
|
|
|
|
if (!p->started_picking) { |
|
|
|
|
start_picking_locked(exec_ctx, p); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, |
|
|
|
|
const grpc_lb_policy_pick_args *pick_args, |
|
|
|
|
grpc_connected_subchannel **target, |
|
|
|
|
grpc_call_context_element *context, void **user_data, |
|
|
|
|
grpc_closure *on_complete) { |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
|
|
|
|
GPR_ASSERT(!p->shutdown); |
|
|
|
|
static int rr_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, |
|
|
|
|
const grpc_lb_policy_pick_args* pick_args, |
|
|
|
|
grpc_connected_subchannel** target, |
|
|
|
|
grpc_call_context_element* context, void** user_data, |
|
|
|
|
grpc_closure* on_complete) { |
|
|
|
|
round_robin_lb_policy* p = (round_robin_lb_policy*)pol; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[RR %p] Trying to pick", (void *)pol); |
|
|
|
|
gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", (void*)pol, |
|
|
|
|
p->shutdown); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(!p->shutdown); |
|
|
|
|
if (p->subchannel_list != NULL) { |
|
|
|
|
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); |
|
|
|
|
if (next_ready_index < p->subchannel_list->num_subchannels) { |
|
|
|
|
/* readily available, report right away */ |
|
|
|
|
grpc_lb_subchannel_data *sd = |
|
|
|
|
grpc_lb_subchannel_data* sd = |
|
|
|
|
&p->subchannel_list->subchannels[next_ready_index]; |
|
|
|
|
*target = |
|
|
|
|
GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked"); |
|
|
|
@ -297,8 +297,8 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, |
|
|
|
|
GPR_DEBUG, |
|
|
|
|
"[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " |
|
|
|
|
"index %lu)", |
|
|
|
|
(void *)p, (void *)sd->subchannel, (void *)*target, |
|
|
|
|
(void *)sd->subchannel_list, (unsigned long)next_ready_index); |
|
|
|
|
(void*)p, (void*)sd->subchannel, (void*)*target, |
|
|
|
|
(void*)sd->subchannel_list, (unsigned long)next_ready_index); |
|
|
|
|
} |
|
|
|
|
/* only advance the last picked pointer if the selection was used */ |
|
|
|
|
update_last_ready_subchannel_index_locked(p, next_ready_index); |
|
|
|
@ -309,7 +309,7 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, |
|
|
|
|
if (!p->started_picking) { |
|
|
|
|
start_picking_locked(exec_ctx, p); |
|
|
|
|
} |
|
|
|
|
pending_pick *pp = (pending_pick *)gpr_malloc(sizeof(*pp)); |
|
|
|
|
pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp)); |
|
|
|
|
pp->next = p->pending_picks; |
|
|
|
|
pp->target = target; |
|
|
|
|
pp->on_complete = on_complete; |
|
|
|
@ -319,8 +319,8 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, |
|
|
|
|
return 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void update_state_counters_locked(grpc_lb_subchannel_data *sd) { |
|
|
|
|
grpc_lb_subchannel_list *subchannel_list = sd->subchannel_list; |
|
|
|
|
static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { |
|
|
|
|
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; |
|
|
|
|
if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { |
|
|
|
|
GPR_ASSERT(subchannel_list->num_ready > 0); |
|
|
|
|
--subchannel_list->num_ready; |
|
|
|
@ -352,7 +352,7 @@ static void update_state_counters_locked(grpc_lb_subchannel_data *sd) { |
|
|
|
|
* used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the |
|
|
|
|
* connectivity status set. */ |
|
|
|
|
static grpc_connectivity_state update_lb_connectivity_status_locked( |
|
|
|
|
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, grpc_error *error) { |
|
|
|
|
grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd, grpc_error* error) { |
|
|
|
|
/* In priority order. The first rule to match terminates the search (ie, if we
|
|
|
|
|
* are on rule n, all previous rules were unfulfilled). |
|
|
|
|
* |
|
|
|
@ -374,8 +374,8 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( |
|
|
|
|
* CHECK: p->num_idle == p->subchannel_list->num_subchannels. |
|
|
|
|
*/ |
|
|
|
|
grpc_connectivity_state new_state = sd->curr_connectivity_state; |
|
|
|
|
grpc_lb_subchannel_list *subchannel_list = sd->subchannel_list; |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)subchannel_list->policy; |
|
|
|
|
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; |
|
|
|
|
round_robin_lb_policy* p = (round_robin_lb_policy*)subchannel_list->policy; |
|
|
|
|
if (subchannel_list->num_ready > 0) { /* 1) READY */ |
|
|
|
|
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, |
|
|
|
|
GRPC_ERROR_NONE, "rr_ready"); |
|
|
|
@ -393,6 +393,11 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( |
|
|
|
|
"rr_shutdown"); |
|
|
|
|
p->shutdown = true; |
|
|
|
|
new_state = GRPC_CHANNEL_SHUTDOWN; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[RR %p] Shutting down: all subchannels have gone into shutdown", |
|
|
|
|
(void*)p); |
|
|
|
|
} |
|
|
|
|
} else if (subchannel_list->num_transient_failures == |
|
|
|
|
p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */ |
|
|
|
|
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, |
|
|
|
@ -409,18 +414,18 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( |
|
|
|
|
return new_state; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
grpc_lb_subchannel_data *sd = (grpc_lb_subchannel_data *)arg; |
|
|
|
|
round_robin_lb_policy *p = |
|
|
|
|
(round_robin_lb_policy *)sd->subchannel_list->policy; |
|
|
|
|
static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
grpc_lb_subchannel_data* sd = (grpc_lb_subchannel_data*)arg; |
|
|
|
|
round_robin_lb_policy* p = |
|
|
|
|
(round_robin_lb_policy*)sd->subchannel_list->policy; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { |
|
|
|
|
gpr_log( |
|
|
|
|
GPR_DEBUG, |
|
|
|
|
"[RR %p] connectivity changed for subchannel %p, subchannel_list %p: " |
|
|
|
|
"prev_state=%s new_state=%s p->shutdown=%d " |
|
|
|
|
"sd->subchannel_list->shutting_down=%d error=%s", |
|
|
|
|
(void *)p, (void *)sd->subchannel, (void *)sd->subchannel_list, |
|
|
|
|
(void*)p, (void*)sd->subchannel, (void*)sd->subchannel_list, |
|
|
|
|
grpc_connectivity_state_name(sd->prev_connectivity_state), |
|
|
|
|
grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), |
|
|
|
|
p->shutdown, sd->subchannel_list->shutting_down, |
|
|
|
@ -487,8 +492,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"[RR %p] phasing out subchannel list %p (size %lu) in favor " |
|
|
|
|
"of %p (size %lu)", |
|
|
|
|
(void *)p, (void *)p->subchannel_list, num_subchannels, |
|
|
|
|
(void *)sd->subchannel_list, num_subchannels); |
|
|
|
|
(void*)p, (void*)p->subchannel_list, num_subchannels, |
|
|
|
|
(void*)sd->subchannel_list, num_subchannels); |
|
|
|
|
} |
|
|
|
|
if (p->subchannel_list != NULL) { |
|
|
|
|
// dispose of the current subchannel_list
|
|
|
|
@ -503,14 +508,14 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */ |
|
|
|
|
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); |
|
|
|
|
GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels); |
|
|
|
|
grpc_lb_subchannel_data *selected = |
|
|
|
|
grpc_lb_subchannel_data* selected = |
|
|
|
|
&p->subchannel_list->subchannels[next_ready_index]; |
|
|
|
|
if (p->pending_picks != NULL) { |
|
|
|
|
// if the selected subchannel is going to be used for the pending
|
|
|
|
|
// picks, update the last picked pointer
|
|
|
|
|
update_last_ready_subchannel_index_locked(p, next_ready_index); |
|
|
|
|
} |
|
|
|
|
pending_pick *pp; |
|
|
|
|
pending_pick* pp; |
|
|
|
|
while ((pp = p->pending_picks)) { |
|
|
|
|
p->pending_picks = pp->next; |
|
|
|
|
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( |
|
|
|
@ -522,8 +527,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"[RR %p] Fulfilling pending pick. Target <-- subchannel %p " |
|
|
|
|
"(subchannel_list %p, index %lu)", |
|
|
|
|
(void *)p, (void *)selected->subchannel, |
|
|
|
|
(void *)p->subchannel_list, (unsigned long)next_ready_index); |
|
|
|
|
(void*)p, (void*)selected->subchannel, |
|
|
|
|
(void*)p->subchannel_list, (unsigned long)next_ready_index); |
|
|
|
|
} |
|
|
|
|
GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); |
|
|
|
|
gpr_free(pp); |
|
|
|
@ -535,41 +540,42 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_connectivity_state rr_check_connectivity_locked( |
|
|
|
|
grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) { |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
|
|
|
|
grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, grpc_error** error) { |
|
|
|
|
round_robin_lb_policy* p = (round_robin_lb_policy*)pol; |
|
|
|
|
return grpc_connectivity_state_get(&p->state_tracker, error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_lb_policy *pol, |
|
|
|
|
grpc_connectivity_state *current, |
|
|
|
|
grpc_closure *notify) { |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
|
|
|
|
static void rr_notify_on_state_change_locked(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_lb_policy* pol, |
|
|
|
|
grpc_connectivity_state* current, |
|
|
|
|
grpc_closure* notify) { |
|
|
|
|
round_robin_lb_policy* p = (round_robin_lb_policy*)pol; |
|
|
|
|
grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, |
|
|
|
|
current, notify); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, |
|
|
|
|
grpc_closure *closure) { |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
|
|
|
|
static void rr_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, |
|
|
|
|
grpc_closure* closure) { |
|
|
|
|
round_robin_lb_policy* p = (round_robin_lb_policy*)pol; |
|
|
|
|
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); |
|
|
|
|
if (next_ready_index < p->subchannel_list->num_subchannels) { |
|
|
|
|
grpc_lb_subchannel_data *selected = |
|
|
|
|
grpc_lb_subchannel_data* selected = |
|
|
|
|
&p->subchannel_list->subchannels[next_ready_index]; |
|
|
|
|
grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF( |
|
|
|
|
grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF( |
|
|
|
|
selected->connected_subchannel, "rr_ping"); |
|
|
|
|
grpc_connected_subchannel_ping(exec_ctx, target, closure); |
|
|
|
|
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_ping"); |
|
|
|
|
} else { |
|
|
|
|
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"Round Robin not connected")); |
|
|
|
|
GRPC_CLOSURE_SCHED( |
|
|
|
|
exec_ctx, closure, |
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Round Robin not connected")); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, |
|
|
|
|
const grpc_lb_policy_args *args) { |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)policy; |
|
|
|
|
const grpc_arg *arg = |
|
|
|
|
static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, |
|
|
|
|
const grpc_lb_policy_args* args) { |
|
|
|
|
round_robin_lb_policy* p = (round_robin_lb_policy*)policy; |
|
|
|
|
const grpc_arg* arg = |
|
|
|
|
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); |
|
|
|
|
if (arg == NULL || arg->type != GRPC_ARG_POINTER) { |
|
|
|
|
gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", p); |
|
|
|
@ -583,12 +589,12 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p; |
|
|
|
|
grpc_lb_addresses* addresses = (grpc_lb_addresses*)arg->value.pointer.p; |
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", p, |
|
|
|
|
addresses->num_addresses); |
|
|
|
|
} |
|
|
|
|
grpc_lb_subchannel_list *subchannel_list = grpc_lb_subchannel_list_create( |
|
|
|
|
grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create( |
|
|
|
|
exec_ctx, &p->base, &grpc_lb_round_robin_trace, addresses, args, |
|
|
|
|
rr_connectivity_changed_locked); |
|
|
|
|
if (subchannel_list->num_subchannels == 0) { |
|
|
|
@ -609,8 +615,8 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"[RR %p] Shutting down latest pending subchannel list %p, " |
|
|
|
|
"about to be replaced by newer latest %p", |
|
|
|
|
(void *)p, (void *)p->latest_pending_subchannel_list, |
|
|
|
|
(void *)subchannel_list); |
|
|
|
|
(void*)p, (void*)p->latest_pending_subchannel_list, |
|
|
|
|
(void*)subchannel_list); |
|
|
|
|
} |
|
|
|
|
grpc_lb_subchannel_list_shutdown_and_unref( |
|
|
|
|
exec_ctx, p->latest_pending_subchannel_list, "sl_outdated"); |
|
|
|
@ -649,22 +655,22 @@ static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { |
|
|
|
|
rr_notify_on_state_change_locked, |
|
|
|
|
rr_update_locked}; |
|
|
|
|
|
|
|
|
|
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} |
|
|
|
|
static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {} |
|
|
|
|
|
|
|
|
|
static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} |
|
|
|
|
static void round_robin_factory_unref(grpc_lb_policy_factory* factory) {} |
|
|
|
|
|
|
|
|
|
static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_lb_policy_factory *factory, |
|
|
|
|
grpc_lb_policy_args *args) { |
|
|
|
|
static grpc_lb_policy* round_robin_create(grpc_exec_ctx* exec_ctx, |
|
|
|
|
grpc_lb_policy_factory* factory, |
|
|
|
|
grpc_lb_policy_args* args) { |
|
|
|
|
GPR_ASSERT(args->client_channel_factory != NULL); |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)gpr_zalloc(sizeof(*p)); |
|
|
|
|
round_robin_lb_policy* p = (round_robin_lb_policy*)gpr_zalloc(sizeof(*p)); |
|
|
|
|
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); |
|
|
|
|
grpc_subchannel_index_ref(); |
|
|
|
|
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, |
|
|
|
|
"round_robin"); |
|
|
|
|
rr_update_locked(exec_ctx, &p->base, args); |
|
|
|
|
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] Created with %lu subchannels", (void *)p, |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] Created with %lu subchannels", (void*)p, |
|
|
|
|
(unsigned long)p->subchannel_list->num_subchannels); |
|
|
|
|
} |
|
|
|
|
return &p->base; |
|
|
|
@ -677,7 +683,7 @@ static const grpc_lb_policy_factory_vtable round_robin_factory_vtable = { |
|
|
|
|
static grpc_lb_policy_factory round_robin_lb_policy_factory = { |
|
|
|
|
&round_robin_factory_vtable}; |
|
|
|
|
|
|
|
|
|
static grpc_lb_policy_factory *round_robin_lb_factory_create() { |
|
|
|
|
static grpc_lb_policy_factory* round_robin_lb_factory_create() { |
|
|
|
|
return &round_robin_lb_policy_factory; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|