|
|
|
@ -69,8 +69,8 @@ |
|
|
|
|
* possible scenarios: |
|
|
|
|
* |
|
|
|
|
* 1. This is the first server list received. There was no previous instance of |
|
|
|
|
* the Round Robin policy. \a rr_handover() will instantiate the RR policy |
|
|
|
|
* and perform all the pending operations over it. |
|
|
|
|
* the Round Robin policy. \a rr_handover_locked() will instantiate the RR |
|
|
|
|
* policy and perform all the pending operations over it. |
|
|
|
|
* 2. There's already a RR policy instance active. We need to introduce the new |
|
|
|
|
* one build from the new serverlist, but taking care not to disrupt the |
|
|
|
|
* operations in progress over the old RR instance. This is done by |
|
|
|
@ -78,7 +78,7 @@ |
|
|
|
|
* references are held on the old RR policy, it'll be destroyed and \a |
|
|
|
|
* glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN |
|
|
|
|
* state. At this point we can transition to a new RR instance safely, which |
|
|
|
|
* is done once again via \a rr_handover(). |
|
|
|
|
* is done once again via \a rr_handover_locked(). |
|
|
|
|
* |
|
|
|
|
* |
|
|
|
|
* Once a RR policy instance is in place (and getting updated as described), |
|
|
|
@ -86,8 +86,8 @@ |
|
|
|
|
* forwarding them to the RR instance. Any time there's no RR policy available |
|
|
|
|
* (ie, right after the creation of the gRPCLB policy, if an empty serverlist |
|
|
|
|
* is received, etc), pick/ping requests are added to a list of pending |
|
|
|
|
* picks/pings to be flushed and serviced as part of \a rr_handover() the moment |
|
|
|
|
* the RR policy instance becomes available. |
|
|
|
|
* picks/pings to be flushed and serviced as part of \a rr_handover_locked() the |
|
|
|
|
* moment the RR policy instance becomes available. |
|
|
|
|
* |
|
|
|
|
* \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
|
|
|
|
|
* high level design and details. */ |
|
|
|
@ -158,6 +158,12 @@ typedef struct wrapped_rr_closure_arg { |
|
|
|
|
/* when not NULL, represents a pending_{pick,ping} node to be freed upon
|
|
|
|
|
* closure execution */ |
|
|
|
|
void *owning_pending_node; /* to be freed if not NULL */ |
|
|
|
|
|
|
|
|
|
/* Pointer ot heap memory if the closure and its argument were allocated
|
|
|
|
|
* dynamically outside of a pending pick. It'll be NULL otherwise. |
|
|
|
|
* |
|
|
|
|
* TODO(dgq): This is by no means pretty. */ |
|
|
|
|
void *closure_mem_or_null; |
|
|
|
|
} wrapped_rr_closure_arg; |
|
|
|
|
|
|
|
|
|
/* The \a on_complete closure passed as part of the pick requires keeping a
|
|
|
|
@ -186,7 +192,17 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
|
|
|
|
|
grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error), |
|
|
|
|
NULL); |
|
|
|
|
gpr_free(wc_arg->owning_pending_node); |
|
|
|
|
|
|
|
|
|
/* Make sure this closure and its arg are EITHER on the heap on their oen OR
|
|
|
|
|
* part of a pending pick (thus part of the pending pick's memory) */ |
|
|
|
|
GPR_ASSERT((wc_arg->closure_mem_or_null != NULL) + |
|
|
|
|
(wc_arg->owning_pending_node != NULL) == |
|
|
|
|
1); |
|
|
|
|
if (wc_arg->closure_mem_or_null) { |
|
|
|
|
gpr_free(wc_arg->closure_mem_or_null); |
|
|
|
|
} else { |
|
|
|
|
gpr_free(wc_arg->owning_pending_node); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Linked list of pending pick requests. It stores all information needed to
|
|
|
|
@ -307,13 +323,6 @@ typedef struct glb_lb_policy { |
|
|
|
|
|
|
|
|
|
/** for tracking of the RR connectivity */ |
|
|
|
|
rr_connectivity_data *rr_connectivity; |
|
|
|
|
|
|
|
|
|
/* a wrapped (see \a wrapped_rr_closure) on-complete closure for readily
|
|
|
|
|
* available RR picks */ |
|
|
|
|
grpc_closure wrapped_on_complete; |
|
|
|
|
|
|
|
|
|
/* arguments for the wrapped_on_complete closure */ |
|
|
|
|
wrapped_rr_closure_arg wc_arg; |
|
|
|
|
} glb_lb_policy; |
|
|
|
|
|
|
|
|
|
/* Keeps track and reacts to changes in connectivity of the RR instance */ |
|
|
|
@ -424,9 +433,9 @@ static void lb_token_destroy(void *token) { |
|
|
|
|
if (token != NULL) GRPC_MDELEM_UNREF(token); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, |
|
|
|
|
const grpc_grpclb_serverlist *serverlist, |
|
|
|
|
glb_lb_policy *glb_policy) { |
|
|
|
|
static grpc_lb_policy *create_rr_locked( |
|
|
|
|
grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist, |
|
|
|
|
glb_lb_policy *glb_policy) { |
|
|
|
|
GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); |
|
|
|
|
|
|
|
|
|
grpc_lb_policy_args args; |
|
|
|
@ -446,12 +455,12 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, |
|
|
|
|
return rr; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, |
|
|
|
|
grpc_error *error) { |
|
|
|
|
static void rr_handover_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
glb_lb_policy *glb_policy, grpc_error *error) { |
|
|
|
|
GPR_ASSERT(glb_policy->serverlist != NULL && |
|
|
|
|
glb_policy->serverlist->num_servers > 0); |
|
|
|
|
glb_policy->rr_policy = |
|
|
|
|
create_rr(exec_ctx, glb_policy->serverlist, glb_policy); |
|
|
|
|
create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy); |
|
|
|
|
|
|
|
|
|
if (grpc_lb_glb_trace) { |
|
|
|
|
gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")", |
|
|
|
@ -474,6 +483,7 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, |
|
|
|
|
glb_policy->pending_picks = pp->next; |
|
|
|
|
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick"); |
|
|
|
|
pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy; |
|
|
|
|
pp->wrapped_on_complete_arg.owning_pending_node = pp; |
|
|
|
|
if (grpc_lb_glb_trace) { |
|
|
|
|
gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", |
|
|
|
|
(intptr_t)glb_policy->rr_policy); |
|
|
|
@ -482,7 +492,6 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, |
|
|
|
|
pp->target, |
|
|
|
|
(void **)&pp->wrapped_on_complete_arg.lb_token, |
|
|
|
|
&pp->wrapped_on_complete); |
|
|
|
|
pp->wrapped_on_complete_arg.owning_pending_node = pp; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pending_ping *pping; |
|
|
|
@ -490,13 +499,13 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, |
|
|
|
|
glb_policy->pending_pings = pping->next; |
|
|
|
|
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); |
|
|
|
|
pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy; |
|
|
|
|
pping->wrapped_notify_arg.owning_pending_node = pping; |
|
|
|
|
if (grpc_lb_glb_trace) { |
|
|
|
|
gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "", |
|
|
|
|
(intptr_t)glb_policy->rr_policy); |
|
|
|
|
} |
|
|
|
|
grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, |
|
|
|
|
&pping->wrapped_notify); |
|
|
|
|
pping->wrapped_notify_arg.owning_pending_node = pping; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -509,13 +518,16 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
if (glb_policy->serverlist != NULL) { |
|
|
|
|
/* a RR policy is shutting down but there's a serverlist available ->
|
|
|
|
|
* perform a handover */ |
|
|
|
|
rr_handover(exec_ctx, glb_policy, error); |
|
|
|
|
gpr_mu_lock(&glb_policy->mu); |
|
|
|
|
rr_handover_locked(exec_ctx, glb_policy, error); |
|
|
|
|
gpr_mu_unlock(&glb_policy->mu); |
|
|
|
|
} else { |
|
|
|
|
/* shutting down and no new serverlist available. Bail out. */ |
|
|
|
|
gpr_free(rr_conn_data); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
if (error == GRPC_ERROR_NONE) { |
|
|
|
|
gpr_mu_lock(&glb_policy->mu); |
|
|
|
|
/* RR not shutting down. Mimic the RR's policy state */ |
|
|
|
|
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, |
|
|
|
|
rr_conn_data->state, GRPC_ERROR_REF(error), |
|
|
|
@ -524,6 +536,7 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, |
|
|
|
|
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, |
|
|
|
|
&rr_conn_data->state, |
|
|
|
|
&rr_conn_data->on_change); |
|
|
|
|
gpr_mu_unlock(&glb_policy->mu); |
|
|
|
|
} else { /* error */ |
|
|
|
|
gpr_free(rr_conn_data); |
|
|
|
|
} |
|
|
|
@ -775,37 +788,50 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, |
|
|
|
|
(intptr_t)glb_policy->rr_policy); |
|
|
|
|
} |
|
|
|
|
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); |
|
|
|
|
memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg)); |
|
|
|
|
glb_policy->wc_arg.rr_policy = glb_policy->rr_policy; |
|
|
|
|
glb_policy->wc_arg.target = target; |
|
|
|
|
glb_policy->wc_arg.wrapped_closure = on_complete; |
|
|
|
|
glb_policy->wc_arg.lb_token_mdelem_storage = |
|
|
|
|
pick_args->lb_token_mdelem_storage; |
|
|
|
|
glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata; |
|
|
|
|
glb_policy->wc_arg.owning_pending_node = NULL; |
|
|
|
|
grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure, |
|
|
|
|
&glb_policy->wc_arg); |
|
|
|
|
|
|
|
|
|
/* we need to allocate the closure on the stack because we may be serving
|
|
|
|
|
* concurrent picks: a single field in glb_policy isn't good enough */ |
|
|
|
|
void *closure_mem = |
|
|
|
|
gpr_malloc(sizeof(grpc_closure) + sizeof(wrapped_rr_closure_arg)); |
|
|
|
|
grpc_closure *wrapped_on_complete = closure_mem; |
|
|
|
|
memset(wrapped_on_complete, 0, sizeof(grpc_closure)); |
|
|
|
|
|
|
|
|
|
wrapped_rr_closure_arg *wc_arg = closure_mem + sizeof(grpc_closure); |
|
|
|
|
memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg)); |
|
|
|
|
|
|
|
|
|
grpc_closure_init(wrapped_on_complete, wrapped_rr_closure, wc_arg); |
|
|
|
|
|
|
|
|
|
wc_arg->rr_policy = glb_policy->rr_policy; |
|
|
|
|
wc_arg->target = target; |
|
|
|
|
wc_arg->wrapped_closure = on_complete; |
|
|
|
|
wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; |
|
|
|
|
wc_arg->initial_metadata = pick_args->initial_metadata; |
|
|
|
|
wc_arg->owning_pending_node = NULL; |
|
|
|
|
wc_arg->closure_mem_or_null = closure_mem; |
|
|
|
|
|
|
|
|
|
pick_done = |
|
|
|
|
grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target, |
|
|
|
|
(void **)&glb_policy->wc_arg.lb_token, |
|
|
|
|
&glb_policy->wrapped_on_complete); |
|
|
|
|
(void **)&wc_arg->lb_token, wrapped_on_complete); |
|
|
|
|
if (pick_done) { |
|
|
|
|
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ |
|
|
|
|
if (grpc_lb_glb_trace) { |
|
|
|
|
gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", |
|
|
|
|
(intptr_t)glb_policy->wc_arg.rr_policy); |
|
|
|
|
(intptr_t)wc_arg->rr_policy); |
|
|
|
|
} |
|
|
|
|
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick"); |
|
|
|
|
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick"); |
|
|
|
|
|
|
|
|
|
/* add the load reporting initial metadata */ |
|
|
|
|
initial_metadata_add_lb_token( |
|
|
|
|
pick_args->initial_metadata, pick_args->lb_token_mdelem_storage, |
|
|
|
|
GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token)); |
|
|
|
|
initial_metadata_add_lb_token(pick_args->initial_metadata, |
|
|
|
|
pick_args->lb_token_mdelem_storage, |
|
|
|
|
GRPC_MDELEM_REF(wc_arg->lb_token)); |
|
|
|
|
|
|
|
|
|
gpr_free(closure_mem); |
|
|
|
|
} |
|
|
|
|
/* else, !pick_done, the pending pick will be registered and taken care of
|
|
|
|
|
* by the pending pick list inside the RR policy (glb_policy->rr_policy). |
|
|
|
|
* Eventually, wrapped_on_complete will be called, which will -among other |
|
|
|
|
* things- add the LB token to the call's initial metadata */ |
|
|
|
|
} else { |
|
|
|
|
/* else, the pending pick will be registered and taken care of by the
|
|
|
|
|
* pending pick list inside the RR policy (glb_policy->rr_policy) */ |
|
|
|
|
grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, |
|
|
|
|
glb_policy->base.interested_parties); |
|
|
|
|
add_pending_pick(&glb_policy->pending_picks, pick_args, target, |
|
|
|
@ -1076,6 +1102,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { |
|
|
|
|
|
|
|
|
|
/* update serverlist */ |
|
|
|
|
if (serverlist->num_servers > 0) { |
|
|
|
|
gpr_mu_lock(&lb_client->glb_policy->mu); |
|
|
|
|
if (grpc_grpclb_serverlist_equals(lb_client->glb_policy->serverlist, |
|
|
|
|
serverlist)) { |
|
|
|
|
if (grpc_lb_glb_trace) { |
|
|
|
@ -1093,7 +1120,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { |
|
|
|
|
if (lb_client->glb_policy->rr_policy == NULL) { |
|
|
|
|
/* initial "handover", in this case from a null RR policy, meaning
|
|
|
|
|
* it'll just create the first RR policy instance */ |
|
|
|
|
rr_handover(exec_ctx, lb_client->glb_policy, error); |
|
|
|
|
rr_handover_locked(exec_ctx, lb_client->glb_policy, error); |
|
|
|
|
} else { |
|
|
|
|
/* unref the RR policy, eventually leading to its substitution with a
|
|
|
|
|
* new one constructed from the received serverlist (see |
|
|
|
@ -1101,6 +1128,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { |
|
|
|
|
GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy, |
|
|
|
|
"serverlist_received"); |
|
|
|
|
} |
|
|
|
|
gpr_mu_unlock(&lb_client->glb_policy->mu); |
|
|
|
|
} else { |
|
|
|
|
if (grpc_lb_glb_trace) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|