|
|
|
@ -38,6 +38,8 @@ |
|
|
|
|
#include <grpc/support/alloc.h> |
|
|
|
|
#include "src/core/transport/connectivity_state.h" |
|
|
|
|
|
|
|
|
|
typedef struct round_robin_lb_policy round_robin_lb_policy; |
|
|
|
|
|
|
|
|
|
int grpc_lb_round_robin_trace = 0; |
|
|
|
|
|
|
|
|
|
/** List of entities waiting for a pick.
|
|
|
|
@ -46,7 +48,7 @@ int grpc_lb_round_robin_trace = 0; |
|
|
|
|
typedef struct pending_pick { |
|
|
|
|
struct pending_pick *next; |
|
|
|
|
grpc_pollset *pollset; |
|
|
|
|
grpc_subchannel **target; |
|
|
|
|
grpc_connected_subchannel **target; |
|
|
|
|
grpc_closure *on_complete; |
|
|
|
|
} pending_pick; |
|
|
|
|
|
|
|
|
@ -58,22 +60,27 @@ typedef struct ready_list { |
|
|
|
|
} ready_list; |
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
|
size_t subchannel_idx; /**< Index over p->subchannels */ |
|
|
|
|
void *p; /**< round_robin_lb_policy instance */ |
|
|
|
|
} connectivity_changed_cb_arg; |
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
|
/** index within policy->subchannels */ |
|
|
|
|
size_t index; |
|
|
|
|
/** backpointer to owning policy */ |
|
|
|
|
round_robin_lb_policy *policy; |
|
|
|
|
/** subchannel itself */ |
|
|
|
|
grpc_subchannel *subchannel; |
|
|
|
|
/** notification that connectivity has changed on subchannel */ |
|
|
|
|
grpc_closure connectivity_changed_closure; |
|
|
|
|
/** this subchannels current position in subchannel->ready_list */ |
|
|
|
|
ready_list *ready_list_node; |
|
|
|
|
/** last observed connectivity */ |
|
|
|
|
grpc_connectivity_state connectivity_state; |
|
|
|
|
} subchannel_data; |
|
|
|
|
|
|
|
|
|
struct round_robin_lb_policy { |
|
|
|
|
/** base policy: must be first */ |
|
|
|
|
grpc_lb_policy base; |
|
|
|
|
|
|
|
|
|
/** all our subchannels */ |
|
|
|
|
grpc_subchannel **subchannels; |
|
|
|
|
size_t num_subchannels; |
|
|
|
|
|
|
|
|
|
/** Callbacks, one per subchannel being watched, to be called when their
|
|
|
|
|
* respective connectivity changes */ |
|
|
|
|
grpc_closure *connectivity_changed_cbs; |
|
|
|
|
connectivity_changed_cb_arg *cb_args; |
|
|
|
|
subchannel_data **subchannels; |
|
|
|
|
|
|
|
|
|
/** mutex protecting remaining members */ |
|
|
|
|
gpr_mu mu; |
|
|
|
@ -81,8 +88,6 @@ typedef struct { |
|
|
|
|
int started_picking; |
|
|
|
|
/** are we shutting down? */ |
|
|
|
|
int shutdown; |
|
|
|
|
/** Connectivity state of the subchannels being watched */ |
|
|
|
|
grpc_connectivity_state *subchannel_connectivity; |
|
|
|
|
/** List of picks that are waiting on connectivity */ |
|
|
|
|
pending_pick *pending_picks; |
|
|
|
|
|
|
|
|
@ -93,13 +98,7 @@ typedef struct { |
|
|
|
|
ready_list ready_list; |
|
|
|
|
/** Last pick from the ready list. */ |
|
|
|
|
ready_list *ready_list_last_pick; |
|
|
|
|
|
|
|
|
|
/** Subchannel index to ready_list node.
|
|
|
|
|
* |
|
|
|
|
* Kept in order to remove nodes from the ready list associated with a |
|
|
|
|
* subchannel */ |
|
|
|
|
ready_list **subchannel_index_to_readylist_node; |
|
|
|
|
} round_robin_lb_policy; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/** Returns the next subchannel from the connected list or NULL if the list is
|
|
|
|
|
* empty. |
|
|
|
@ -144,9 +143,9 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) { |
|
|
|
|
/** Prepends (relative to the root at p->ready_list) the connected subchannel \a
|
|
|
|
|
* csc to the list of ready subchannels. */ |
|
|
|
|
static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, |
|
|
|
|
grpc_subchannel *csc) { |
|
|
|
|
grpc_subchannel *sc) { |
|
|
|
|
ready_list *new_elem = gpr_malloc(sizeof(ready_list)); |
|
|
|
|
new_elem->subchannel = csc; |
|
|
|
|
new_elem->subchannel = sc; |
|
|
|
|
if (p->ready_list.prev == NULL) { |
|
|
|
|
/* first element */ |
|
|
|
|
new_elem->next = &p->ready_list; |
|
|
|
@ -160,7 +159,7 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, |
|
|
|
|
p->ready_list.prev = new_elem; |
|
|
|
|
} |
|
|
|
|
if (grpc_lb_round_robin_trace) { |
|
|
|
|
gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, csc); |
|
|
|
|
gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc); |
|
|
|
|
} |
|
|
|
|
return new_elem; |
|
|
|
|
} |
|
|
|
@ -200,28 +199,15 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, |
|
|
|
|
gpr_free(node); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
round_robin_lb_policy *p, |
|
|
|
|
const size_t subchannel_idx) { |
|
|
|
|
pending_pick *pp; |
|
|
|
|
for (pp = p->pending_picks; pp; pp = pp->next) { |
|
|
|
|
grpc_subchannel_del_interested_party( |
|
|
|
|
exec_ctx, p->subchannels[subchannel_idx], pp->pollset); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
|
|
|
|
size_t i; |
|
|
|
|
ready_list *elem; |
|
|
|
|
for (i = 0; i < p->num_subchannels; i++) { |
|
|
|
|
del_interested_parties_locked(exec_ctx, p, i); |
|
|
|
|
} |
|
|
|
|
for (i = 0; i < p->num_subchannels; i++) { |
|
|
|
|
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "round_robin"); |
|
|
|
|
subchannel_data *sd = p->subchannels[i]; |
|
|
|
|
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); |
|
|
|
|
gpr_free(sd); |
|
|
|
|
} |
|
|
|
|
gpr_free(p->connectivity_changed_cbs); |
|
|
|
|
gpr_free(p->subchannel_connectivity); |
|
|
|
|
|
|
|
|
|
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); |
|
|
|
|
gpr_free(p->subchannels); |
|
|
|
@ -237,20 +223,15 @@ void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { |
|
|
|
|
gpr_free(elem); |
|
|
|
|
elem = tmp; |
|
|
|
|
} |
|
|
|
|
gpr_free(p->subchannel_index_to_readylist_node); |
|
|
|
|
gpr_free(p->cb_args); |
|
|
|
|
gpr_free(p); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { |
|
|
|
|
size_t i; |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
|
|
|
|
pending_pick *pp; |
|
|
|
|
gpr_mu_lock(&p->mu); |
|
|
|
|
size_t i; |
|
|
|
|
|
|
|
|
|
for (i = 0; i < p->num_subchannels; i++) { |
|
|
|
|
del_interested_parties_locked(exec_ctx, p, i); |
|
|
|
|
} |
|
|
|
|
gpr_mu_lock(&p->mu); |
|
|
|
|
|
|
|
|
|
p->shutdown = 1; |
|
|
|
|
while ((pp = p->pending_picks)) { |
|
|
|
@ -261,24 +242,26 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { |
|
|
|
|
} |
|
|
|
|
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, |
|
|
|
|
GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); |
|
|
|
|
for (i = 0; i < p->num_subchannels; i++) { |
|
|
|
|
subchannel_data *sd = p->subchannels[i]; |
|
|
|
|
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, |
|
|
|
|
&sd->connectivity_changed_closure); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&p->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, |
|
|
|
|
grpc_subchannel **target) { |
|
|
|
|
grpc_connected_subchannel **target) { |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
|
|
|
|
pending_pick *pp; |
|
|
|
|
size_t i; |
|
|
|
|
gpr_mu_lock(&p->mu); |
|
|
|
|
pp = p->pending_picks; |
|
|
|
|
p->pending_picks = NULL; |
|
|
|
|
while (pp != NULL) { |
|
|
|
|
pending_pick *next = pp->next; |
|
|
|
|
if (pp->target == target) { |
|
|
|
|
for (i = 0; i < p->num_subchannels; i++) { |
|
|
|
|
grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i], |
|
|
|
|
pp->pollset); |
|
|
|
|
} |
|
|
|
|
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, |
|
|
|
|
pp->pollset); |
|
|
|
|
*target = NULL; |
|
|
|
|
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); |
|
|
|
|
gpr_free(pp); |
|
|
|
@ -295,12 +278,16 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { |
|
|
|
|
size_t i; |
|
|
|
|
p->started_picking = 1; |
|
|
|
|
|
|
|
|
|
gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p, |
|
|
|
|
p->num_subchannels); |
|
|
|
|
|
|
|
|
|
for (i = 0; i < p->num_subchannels; i++) { |
|
|
|
|
p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE; |
|
|
|
|
grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[i], |
|
|
|
|
&p->subchannel_connectivity[i], |
|
|
|
|
&p->connectivity_changed_cbs[i]); |
|
|
|
|
GRPC_LB_POLICY_REF(&p->base, "round_robin_connectivity"); |
|
|
|
|
subchannel_data *sd = p->subchannels[i]; |
|
|
|
|
sd->connectivity_state = GRPC_CHANNEL_IDLE; |
|
|
|
|
grpc_subchannel_notify_on_state_change( |
|
|
|
|
exec_ctx, sd->subchannel, &p->base.interested_parties, |
|
|
|
|
&sd->connectivity_state, &sd->connectivity_changed_closure); |
|
|
|
|
GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -314,18 +301,18 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, |
|
|
|
|
grpc_metadata_batch *initial_metadata, grpc_subchannel **target, |
|
|
|
|
grpc_closure *on_complete) { |
|
|
|
|
size_t i; |
|
|
|
|
grpc_metadata_batch *initial_metadata, |
|
|
|
|
grpc_connected_subchannel **target, grpc_closure *on_complete) { |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
|
|
|
|
pending_pick *pp; |
|
|
|
|
ready_list *selected; |
|
|
|
|
gpr_mu_lock(&p->mu); |
|
|
|
|
if ((selected = peek_next_connected_locked(p))) { |
|
|
|
|
gpr_mu_unlock(&p->mu); |
|
|
|
|
*target = selected->subchannel; |
|
|
|
|
*target = grpc_subchannel_get_connected_subchannel(selected->subchannel); |
|
|
|
|
if (grpc_lb_round_robin_trace) { |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- SUBCHANNEL %p (NODE %p)", |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", |
|
|
|
|
selected->subchannel, selected); |
|
|
|
|
} |
|
|
|
|
/* only advance the last picked pointer if the selection was used */ |
|
|
|
@ -335,10 +322,8 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, |
|
|
|
|
if (!p->started_picking) { |
|
|
|
|
start_picking(exec_ctx, p); |
|
|
|
|
} |
|
|
|
|
for (i = 0; i < p->num_subchannels; i++) { |
|
|
|
|
grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i], |
|
|
|
|
pollset); |
|
|
|
|
} |
|
|
|
|
grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, |
|
|
|
|
pollset); |
|
|
|
|
pp = gpr_malloc(sizeof(*pp)); |
|
|
|
|
pp->next = p->pending_picks; |
|
|
|
|
pp->pollset = pollset; |
|
|
|
@ -352,33 +337,25 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, |
|
|
|
|
|
|
|
|
|
static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
int iomgr_success) { |
|
|
|
|
connectivity_changed_cb_arg *cb_arg = arg; |
|
|
|
|
round_robin_lb_policy *p = cb_arg->p; |
|
|
|
|
/* index over p->subchannels of this cb's subchannel */ |
|
|
|
|
const size_t this_idx = cb_arg->subchannel_idx; |
|
|
|
|
subchannel_data *sd = arg; |
|
|
|
|
round_robin_lb_policy *p = sd->policy; |
|
|
|
|
pending_pick *pp; |
|
|
|
|
ready_list *selected; |
|
|
|
|
|
|
|
|
|
int unref = 0; |
|
|
|
|
|
|
|
|
|
/* connectivity state of this cb's subchannel */ |
|
|
|
|
grpc_connectivity_state *this_connectivity; |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&p->mu); |
|
|
|
|
|
|
|
|
|
this_connectivity = &p->subchannel_connectivity[this_idx]; |
|
|
|
|
|
|
|
|
|
if (p->shutdown) { |
|
|
|
|
unref = 1; |
|
|
|
|
} else { |
|
|
|
|
switch (*this_connectivity) { |
|
|
|
|
switch (sd->connectivity_state) { |
|
|
|
|
case GRPC_CHANNEL_READY: |
|
|
|
|
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, |
|
|
|
|
GRPC_CHANNEL_READY, "connecting_ready"); |
|
|
|
|
/* add the newly connected subchannel to the list of connected ones.
|
|
|
|
|
* Note that it goes to the "end of the line". */ |
|
|
|
|
p->subchannel_index_to_readylist_node[this_idx] = |
|
|
|
|
add_connected_sc_locked(p, p->subchannels[this_idx]); |
|
|
|
|
sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel); |
|
|
|
|
/* at this point we know there's at least one suitable subchannel. Go
|
|
|
|
|
* ahead and pick one and notify the pending suitors in |
|
|
|
|
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */ |
|
|
|
@ -390,60 +367,60 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
} |
|
|
|
|
while ((pp = p->pending_picks)) { |
|
|
|
|
p->pending_picks = pp->next; |
|
|
|
|
*pp->target = selected->subchannel; |
|
|
|
|
*pp->target = |
|
|
|
|
grpc_subchannel_get_connected_subchannel(selected->subchannel); |
|
|
|
|
if (grpc_lb_round_robin_trace) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", |
|
|
|
|
selected->subchannel, selected); |
|
|
|
|
} |
|
|
|
|
grpc_subchannel_del_interested_party(exec_ctx, selected->subchannel, |
|
|
|
|
pp->pollset); |
|
|
|
|
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, |
|
|
|
|
pp->pollset); |
|
|
|
|
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); |
|
|
|
|
gpr_free(pp); |
|
|
|
|
} |
|
|
|
|
grpc_subchannel_notify_on_state_change( |
|
|
|
|
exec_ctx, p->subchannels[this_idx], this_connectivity, |
|
|
|
|
&p->connectivity_changed_cbs[this_idx]); |
|
|
|
|
exec_ctx, sd->subchannel, &p->base.interested_parties, |
|
|
|
|
&sd->connectivity_state, &sd->connectivity_changed_closure); |
|
|
|
|
break; |
|
|
|
|
case GRPC_CHANNEL_CONNECTING: |
|
|
|
|
case GRPC_CHANNEL_IDLE: |
|
|
|
|
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, |
|
|
|
|
*this_connectivity, "connecting_changed"); |
|
|
|
|
sd->connectivity_state, |
|
|
|
|
"connecting_changed"); |
|
|
|
|
grpc_subchannel_notify_on_state_change( |
|
|
|
|
exec_ctx, p->subchannels[this_idx], this_connectivity, |
|
|
|
|
&p->connectivity_changed_cbs[this_idx]); |
|
|
|
|
exec_ctx, sd->subchannel, &p->base.interested_parties, |
|
|
|
|
&sd->connectivity_state, &sd->connectivity_changed_closure); |
|
|
|
|
break; |
|
|
|
|
case GRPC_CHANNEL_TRANSIENT_FAILURE: |
|
|
|
|
del_interested_parties_locked(exec_ctx, p, this_idx); |
|
|
|
|
/* renew state notification */ |
|
|
|
|
grpc_subchannel_notify_on_state_change( |
|
|
|
|
exec_ctx, p->subchannels[this_idx], this_connectivity, |
|
|
|
|
&p->connectivity_changed_cbs[this_idx]); |
|
|
|
|
exec_ctx, sd->subchannel, &p->base.interested_parties, |
|
|
|
|
&sd->connectivity_state, &sd->connectivity_changed_closure); |
|
|
|
|
|
|
|
|
|
/* remove from ready list if still present */ |
|
|
|
|
if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { |
|
|
|
|
remove_disconnected_sc_locked( |
|
|
|
|
p, p->subchannel_index_to_readylist_node[this_idx]); |
|
|
|
|
p->subchannel_index_to_readylist_node[this_idx] = NULL; |
|
|
|
|
if (sd->ready_list_node != NULL) { |
|
|
|
|
remove_disconnected_sc_locked(p, sd->ready_list_node); |
|
|
|
|
sd->ready_list_node = NULL; |
|
|
|
|
} |
|
|
|
|
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, |
|
|
|
|
GRPC_CHANNEL_TRANSIENT_FAILURE, |
|
|
|
|
"connecting_transient_failure"); |
|
|
|
|
break; |
|
|
|
|
case GRPC_CHANNEL_FATAL_FAILURE: |
|
|
|
|
del_interested_parties_locked(exec_ctx, p, this_idx); |
|
|
|
|
if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { |
|
|
|
|
remove_disconnected_sc_locked( |
|
|
|
|
p, p->subchannel_index_to_readylist_node[this_idx]); |
|
|
|
|
p->subchannel_index_to_readylist_node[this_idx] = NULL; |
|
|
|
|
if (sd->ready_list_node != NULL) { |
|
|
|
|
remove_disconnected_sc_locked(p, sd->ready_list_node); |
|
|
|
|
sd->ready_list_node = NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GPR_SWAP(grpc_subchannel *, p->subchannels[this_idx], |
|
|
|
|
p->subchannels[p->num_subchannels - 1]); |
|
|
|
|
p->num_subchannels--; |
|
|
|
|
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels], |
|
|
|
|
"round_robin"); |
|
|
|
|
GPR_SWAP(subchannel_data *, p->subchannels[sd->index], |
|
|
|
|
p->subchannels[p->num_subchannels]); |
|
|
|
|
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); |
|
|
|
|
p->subchannels[sd->index]->index = sd->index; |
|
|
|
|
gpr_free(sd); |
|
|
|
|
|
|
|
|
|
unref = 1; |
|
|
|
|
if (p->num_subchannels == 0) { |
|
|
|
|
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, |
|
|
|
|
GRPC_CHANNEL_FATAL_FAILURE, |
|
|
|
@ -454,7 +431,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); |
|
|
|
|
gpr_free(pp); |
|
|
|
|
} |
|
|
|
|
unref = 1; |
|
|
|
|
} else { |
|
|
|
|
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, |
|
|
|
|
GRPC_CHANNEL_TRANSIENT_FAILURE, |
|
|
|
@ -466,31 +442,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
gpr_mu_unlock(&p->mu); |
|
|
|
|
|
|
|
|
|
if (unref) { |
|
|
|
|
GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "round_robin_connectivity"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, |
|
|
|
|
grpc_transport_op *op) { |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
|
|
|
|
size_t i; |
|
|
|
|
size_t n; |
|
|
|
|
grpc_subchannel **subchannels; |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(&p->mu); |
|
|
|
|
n = p->num_subchannels; |
|
|
|
|
subchannels = gpr_malloc(n * sizeof(*subchannels)); |
|
|
|
|
for (i = 0; i < n; i++) { |
|
|
|
|
subchannels[i] = p->subchannels[i]; |
|
|
|
|
GRPC_SUBCHANNEL_REF(subchannels[i], "rr_broadcast"); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&p->mu); |
|
|
|
|
|
|
|
|
|
for (i = 0; i < n; i++) { |
|
|
|
|
grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); |
|
|
|
|
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "rr_broadcast"); |
|
|
|
|
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "round_robin_connectivity"); |
|
|
|
|
} |
|
|
|
|
gpr_free(subchannels); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx, |
|
|
|
@ -514,9 +467,25 @@ static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx, |
|
|
|
|
gpr_mu_unlock(&p->mu); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, |
|
|
|
|
grpc_closure *closure) { |
|
|
|
|
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
|
|
|
|
ready_list *selected; |
|
|
|
|
grpc_connected_subchannel *target; |
|
|
|
|
gpr_mu_lock(&p->mu); |
|
|
|
|
if ((selected = peek_next_connected_locked(p))) { |
|
|
|
|
gpr_mu_unlock(&p->mu); |
|
|
|
|
target = grpc_subchannel_get_connected_subchannel(selected->subchannel); |
|
|
|
|
grpc_connected_subchannel_ping(exec_ctx, target, closure); |
|
|
|
|
} else { |
|
|
|
|
gpr_mu_unlock(&p->mu); |
|
|
|
|
grpc_exec_ctx_enqueue(exec_ctx, closure, 0); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { |
|
|
|
|
rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_exit_idle, |
|
|
|
|
rr_broadcast, rr_check_connectivity, rr_notify_on_state_change}; |
|
|
|
|
rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_ping_one, rr_exit_idle, |
|
|
|
|
rr_check_connectivity, rr_notify_on_state_change}; |
|
|
|
|
|
|
|
|
|
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} |
|
|
|
|
|
|
|
|
@ -529,27 +498,22 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, |
|
|
|
|
GPR_ASSERT(args->num_subchannels > 0); |
|
|
|
|
memset(p, 0, sizeof(*p)); |
|
|
|
|
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); |
|
|
|
|
p->subchannels = |
|
|
|
|
gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); |
|
|
|
|
p->num_subchannels = args->num_subchannels; |
|
|
|
|
p->subchannels = gpr_malloc(sizeof(*p->subchannels) * p->num_subchannels); |
|
|
|
|
memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_subchannels); |
|
|
|
|
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, |
|
|
|
|
"round_robin"); |
|
|
|
|
memcpy(p->subchannels, args->subchannels, |
|
|
|
|
sizeof(grpc_subchannel *) * args->num_subchannels); |
|
|
|
|
|
|
|
|
|
gpr_mu_init(&p->mu); |
|
|
|
|
p->connectivity_changed_cbs = |
|
|
|
|
gpr_malloc(sizeof(grpc_closure) * args->num_subchannels); |
|
|
|
|
p->subchannel_connectivity = |
|
|
|
|
gpr_malloc(sizeof(grpc_connectivity_state) * args->num_subchannels); |
|
|
|
|
|
|
|
|
|
p->cb_args = |
|
|
|
|
gpr_malloc(sizeof(connectivity_changed_cb_arg) * args->num_subchannels); |
|
|
|
|
for (i = 0; i < args->num_subchannels; i++) { |
|
|
|
|
p->cb_args[i].subchannel_idx = i; |
|
|
|
|
p->cb_args[i].p = p; |
|
|
|
|
grpc_closure_init(&p->connectivity_changed_cbs[i], rr_connectivity_changed, |
|
|
|
|
&p->cb_args[i]); |
|
|
|
|
subchannel_data *sd = gpr_malloc(sizeof(*sd)); |
|
|
|
|
memset(sd, 0, sizeof(*sd)); |
|
|
|
|
p->subchannels[i] = sd; |
|
|
|
|
sd->policy = p; |
|
|
|
|
sd->index = i; |
|
|
|
|
sd->subchannel = args->subchannels[i]; |
|
|
|
|
grpc_closure_init(&sd->connectivity_changed_closure, |
|
|
|
|
rr_connectivity_changed, sd); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* The (dummy node) root of the ready list */ |
|
|
|
@ -558,10 +522,6 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, |
|
|
|
|
p->ready_list.next = NULL; |
|
|
|
|
p->ready_list_last_pick = &p->ready_list; |
|
|
|
|
|
|
|
|
|
p->subchannel_index_to_readylist_node = |
|
|
|
|
gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); |
|
|
|
|
memset(p->subchannel_index_to_readylist_node, 0, |
|
|
|
|
sizeof(grpc_subchannel *) * args->num_subchannels); |
|
|
|
|
return &p->base; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|