Merge pull request #8372 from dgquintas/grpclb_races

gRPCLB and RR race fixes
pull/8086/head^2
David G. Quintas 8 years ago committed by GitHub
commit f04942592a
  1. 169
      src/core/ext/lb_policy/grpclb/grpclb.c
  2. 2
      src/core/ext/lb_policy/round_robin/round_robin.c

@ -69,8 +69,8 @@
* possible scenarios: * possible scenarios:
* *
* 1. This is the first server list received. There was no previous instance of * 1. This is the first server list received. There was no previous instance of
* the Round Robin policy. \a rr_handover() will instantiate the RR policy * the Round Robin policy. \a rr_handover_locked() will instantiate the RR
* and perform all the pending operations over it. * policy and perform all the pending operations over it.
* 2. There's already a RR policy instance active. We need to introduce the new * 2. There's already a RR policy instance active. We need to introduce the new
* one build from the new serverlist, but taking care not to disrupt the * one build from the new serverlist, but taking care not to disrupt the
* operations in progress over the old RR instance. This is done by * operations in progress over the old RR instance. This is done by
@ -78,7 +78,7 @@
* references are held on the old RR policy, it'll be destroyed and \a * references are held on the old RR policy, it'll be destroyed and \a
* glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
* state. At this point we can transition to a new RR instance safely, which * state. At this point we can transition to a new RR instance safely, which
* is done once again via \a rr_handover(). * is done once again via \a rr_handover_locked().
* *
* *
* Once a RR policy instance is in place (and getting updated as described), * Once a RR policy instance is in place (and getting updated as described),
@ -86,8 +86,8 @@
* forwarding them to the RR instance. Any time there's no RR policy available * forwarding them to the RR instance. Any time there's no RR policy available
* (ie, right after the creation of the gRPCLB policy, if an empty serverlist * (ie, right after the creation of the gRPCLB policy, if an empty serverlist
* is received, etc), pick/ping requests are added to a list of pending * is received, etc), pick/ping requests are added to a list of pending
* picks/pings to be flushed and serviced as part of \a rr_handover() the moment * picks/pings to be flushed and serviced as part of \a rr_handover_locked() the
* the RR policy instance becomes available. * moment the RR policy instance becomes available.
* *
* \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
* high level design and details. */ * high level design and details. */
@ -134,6 +134,9 @@ static void initial_metadata_add_lb_token(
} }
typedef struct wrapped_rr_closure_arg { typedef struct wrapped_rr_closure_arg {
/* the closure instance using this struct as argument */
grpc_closure wrapper_closure;
/* the original closure. Usually a on_complete/notify cb for pick() and ping() /* the original closure. Usually a on_complete/notify cb for pick() and ping()
* calls against the internal RR instance, respectively. */ * calls against the internal RR instance, respectively. */
grpc_closure *wrapped_closure; grpc_closure *wrapped_closure;
@ -155,9 +158,8 @@ typedef struct wrapped_rr_closure_arg {
/* The RR instance related to the closure */ /* The RR instance related to the closure */
grpc_lb_policy *rr_policy; grpc_lb_policy *rr_policy;
/* when not NULL, represents a pending_{pick,ping} node to be freed upon /* heap memory to be freed upon closure execution. */
* closure execution */ void *free_when_done;
void *owning_pending_node; /* to be freed if not NULL */
} wrapped_rr_closure_arg; } wrapped_rr_closure_arg;
/* The \a on_complete closure passed as part of the pick requires keeping a /* The \a on_complete closure passed as part of the pick requires keeping a
@ -183,10 +185,10 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
} }
} }
GPR_ASSERT(wc_arg->wrapped_closure != NULL); GPR_ASSERT(wc_arg->wrapped_closure != NULL);
grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error), grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error),
NULL); NULL);
gpr_free(wc_arg->owning_pending_node); GPR_ASSERT(wc_arg->free_when_done != NULL);
gpr_free(wc_arg->free_when_done);
} }
/* Linked list of pending pick requests. It stores all information needed to /* Linked list of pending pick requests. It stores all information needed to
@ -207,10 +209,6 @@ typedef struct pending_pick {
* upon error. */ * upon error. */
grpc_connected_subchannel **target; grpc_connected_subchannel **target;
/* a closure wrapping the original on_complete one to be invoked once the
* pick() has completed (regardless of success) */
grpc_closure wrapped_on_complete;
/* args for wrapped_on_complete */ /* args for wrapped_on_complete */
wrapped_rr_closure_arg wrapped_on_complete_arg; wrapped_rr_closure_arg wrapped_on_complete_arg;
} pending_pick; } pending_pick;
@ -230,8 +228,9 @@ static void add_pending_pick(pending_pick **root,
pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
pp->wrapped_on_complete_arg.lb_token_mdelem_storage = pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
pick_args->lb_token_mdelem_storage; pick_args->lb_token_mdelem_storage;
grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure, pp->wrapped_on_complete_arg.free_when_done = pp;
&pp->wrapped_on_complete_arg); grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure,
wrapped_rr_closure, &pp->wrapped_on_complete_arg);
*root = pp; *root = pp;
} }
@ -239,10 +238,6 @@ static void add_pending_pick(pending_pick **root,
typedef struct pending_ping { typedef struct pending_ping {
struct pending_ping *next; struct pending_ping *next;
/* a closure wrapping the original on_complete one to be invoked once the
* ping() has completed (regardless of success) */
grpc_closure wrapped_notify;
/* args for wrapped_notify */ /* args for wrapped_notify */
wrapped_rr_closure_arg wrapped_notify_arg; wrapped_rr_closure_arg wrapped_notify_arg;
} pending_ping; } pending_ping;
@ -251,10 +246,11 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
pending_ping *pping = gpr_malloc(sizeof(*pping)); pending_ping *pping = gpr_malloc(sizeof(*pping));
memset(pping, 0, sizeof(pending_ping)); memset(pping, 0, sizeof(pending_ping));
memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg)); memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg));
pping->next = *root;
grpc_closure_init(&pping->wrapped_notify, wrapped_rr_closure,
&pping->wrapped_notify_arg);
pping->wrapped_notify_arg.wrapped_closure = notify; pping->wrapped_notify_arg.wrapped_closure = notify;
pping->wrapped_notify_arg.free_when_done = pping;
pping->next = *root;
grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure,
wrapped_rr_closure, &pping->wrapped_notify_arg);
*root = pping; *root = pping;
} }
@ -307,13 +303,6 @@ typedef struct glb_lb_policy {
/** for tracking of the RR connectivity */ /** for tracking of the RR connectivity */
rr_connectivity_data *rr_connectivity; rr_connectivity_data *rr_connectivity;
/* a wrapped (see \a wrapped_rr_closure) on-complete closure for readily
* available RR picks */
grpc_closure wrapped_on_complete;
/* arguments for the wrapped_on_complete closure */
wrapped_rr_closure_arg wc_arg;
} glb_lb_policy; } glb_lb_policy;
/* Keeps track and reacts to changes in connectivity of the RR instance */ /* Keeps track and reacts to changes in connectivity of the RR instance */
@ -424,9 +413,43 @@ static void lb_token_destroy(void *token) {
if (token != NULL) GRPC_MDELEM_UNREF(token); if (token != NULL) GRPC_MDELEM_UNREF(token);
} }
static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, /* perform a pick over \a rr_policy. Given that a pick can return immediately
const grpc_grpclb_serverlist *serverlist, * (ignoring its completion callback) we need to perform the cleanups this
glb_lb_policy *glb_policy) { * callback would be otherwise resposible for */
static bool pick_from_internal_rr_locked(
grpc_exec_ctx *exec_ctx, grpc_lb_policy *rr_policy,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
GPR_ASSERT(rr_policy != NULL);
const bool pick_done =
grpc_lb_policy_pick(exec_ctx, rr_policy, pick_args, target,
(void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
(intptr_t)wc_arg->rr_policy);
}
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick");
/* add the load reporting initial metadata */
initial_metadata_add_lb_token(pick_args->initial_metadata,
pick_args->lb_token_mdelem_storage,
GRPC_MDELEM_REF(wc_arg->lb_token));
gpr_free(wc_arg);
}
/* else, the pending pick will be registered and taken care of by the
* pending pick list inside the RR policy (glb_policy->rr_policy).
* Eventually, wrapped_on_complete will be called, which will -among other
* things- add the LB token to the call's initial metadata */
return pick_done;
}
static grpc_lb_policy *create_rr_locked(
grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist,
glb_lb_policy *glb_policy) {
GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
grpc_lb_policy_args args; grpc_lb_policy_args args;
@ -446,12 +469,12 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
return rr; return rr;
} }
static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
grpc_error *error) { glb_lb_policy *glb_policy, grpc_error *error) {
GPR_ASSERT(glb_policy->serverlist != NULL && GPR_ASSERT(glb_policy->serverlist != NULL &&
glb_policy->serverlist->num_servers > 0); glb_policy->serverlist->num_servers > 0);
glb_policy->rr_policy = glb_policy->rr_policy =
create_rr(exec_ctx, glb_policy->serverlist, glb_policy); create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy);
if (grpc_lb_glb_trace) { if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")", gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")",
@ -481,11 +504,9 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy); (intptr_t)glb_policy->rr_policy);
} }
grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pp->pick_args, pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
pp->target, &pp->pick_args, pp->target,
(void **)&pp->wrapped_on_complete_arg.lb_token, &pp->wrapped_on_complete_arg);
&pp->wrapped_on_complete);
pp->wrapped_on_complete_arg.owning_pending_node = pp;
} }
pending_ping *pping; pending_ping *pping;
@ -498,8 +519,7 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
(intptr_t)glb_policy->rr_policy); (intptr_t)glb_policy->rr_policy);
} }
grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy,
&pping->wrapped_notify); &pping->wrapped_notify_arg.wrapper_closure);
pping->wrapped_notify_arg.owning_pending_node = pping;
} }
} }
@ -512,13 +532,16 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (glb_policy->serverlist != NULL) { if (glb_policy->serverlist != NULL) {
/* a RR policy is shutting down but there's a serverlist available -> /* a RR policy is shutting down but there's a serverlist available ->
* perform a handover */ * perform a handover */
rr_handover(exec_ctx, glb_policy, error); gpr_mu_lock(&glb_policy->mu);
rr_handover_locked(exec_ctx, glb_policy, error);
gpr_mu_unlock(&glb_policy->mu);
} else { } else {
/* shutting down and no new serverlist available. Bail out. */ /* shutting down and no new serverlist available. Bail out. */
gpr_free(rr_conn_data); gpr_free(rr_conn_data);
} }
} else { } else {
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
gpr_mu_lock(&glb_policy->mu);
/* RR not shutting down. Mimic the RR's policy state */ /* RR not shutting down. Mimic the RR's policy state */
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
rr_conn_data->state, GRPC_ERROR_REF(error), rr_conn_data->state, GRPC_ERROR_REF(error),
@ -527,6 +550,7 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
&rr_conn_data->state, &rr_conn_data->state,
&rr_conn_data->on_change); &rr_conn_data->on_change);
gpr_mu_unlock(&glb_policy->mu);
} else { /* error */ } else { /* error */
gpr_free(rr_conn_data); gpr_free(rr_conn_data);
} }
@ -651,15 +675,15 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) { while (pp != NULL) {
pending_pick *next = pp->next; pending_pick *next = pp->next;
*pp->target = NULL; *pp->target = NULL;
grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_NONE, grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
NULL); GRPC_ERROR_NONE, NULL);
pp = next; pp = next;
} }
while (pping != NULL) { while (pping != NULL) {
pending_ping *next = pping->next; pending_ping *next = pping->next;
grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify, GRPC_ERROR_NONE, grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
NULL); GRPC_ERROR_NONE, NULL);
pping = next; pping = next;
} }
@ -691,7 +715,7 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (pp->target == target) { if (pp->target == target) {
*target = NULL; *target = NULL;
grpc_exec_ctx_sched( grpc_exec_ctx_sched(
exec_ctx, &pp->wrapped_on_complete, exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
} else { } else {
pp->next = glb_policy->pending_picks; pp->next = glb_policy->pending_picks;
@ -721,7 +745,7 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) == if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) { initial_metadata_flags_eq) {
grpc_exec_ctx_sched( grpc_exec_ctx_sched(
exec_ctx, &pp->wrapped_on_complete, exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
} else { } else {
pp->next = glb_policy->pending_picks; pp->next = glb_policy->pending_picks;
@ -774,37 +798,20 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
(intptr_t)glb_policy->rr_policy); (intptr_t)glb_policy->rr_policy);
} }
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg));
glb_policy->wc_arg.rr_policy = glb_policy->rr_policy;
glb_policy->wc_arg.target = target;
glb_policy->wc_arg.wrapped_closure = on_complete;
glb_policy->wc_arg.lb_token_mdelem_storage =
pick_args->lb_token_mdelem_storage;
glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata;
glb_policy->wc_arg.owning_pending_node = NULL;
grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
&glb_policy->wc_arg);
pick_done =
grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
(void **)&glb_policy->wc_arg.lb_token,
&glb_policy->wrapped_on_complete);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
(intptr_t)glb_policy->wc_arg.rr_policy);
}
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick");
/* add the load reporting initial metadata */ wrapped_rr_closure_arg *wc_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg));
initial_metadata_add_lb_token( memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg));
pick_args->initial_metadata, pick_args->lb_token_mdelem_storage,
GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token)); grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg);
} wc_arg->rr_policy = glb_policy->rr_policy;
wc_arg->target = target;
wc_arg->wrapped_closure = on_complete;
wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
wc_arg->initial_metadata = pick_args->initial_metadata;
wc_arg->free_when_done = wc_arg;
pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
pick_args, target, wc_arg);
} else { } else {
/* else, the pending pick will be registered and taken care of by the
* pending pick list inside the RR policy (glb_policy->rr_policy) */
add_pending_pick(&glb_policy->pending_picks, pick_args, target, add_pending_pick(&glb_policy->pending_picks, pick_args, target,
on_complete); on_complete);
@ -1073,6 +1080,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
/* update serverlist */ /* update serverlist */
if (serverlist->num_servers > 0) { if (serverlist->num_servers > 0) {
gpr_mu_lock(&lb_client->glb_policy->mu);
if (grpc_grpclb_serverlist_equals(lb_client->glb_policy->serverlist, if (grpc_grpclb_serverlist_equals(lb_client->glb_policy->serverlist,
serverlist)) { serverlist)) {
if (grpc_lb_glb_trace) { if (grpc_lb_glb_trace) {
@ -1090,7 +1098,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
if (lb_client->glb_policy->rr_policy == NULL) { if (lb_client->glb_policy->rr_policy == NULL) {
/* initial "handover", in this case from a null RR policy, meaning /* initial "handover", in this case from a null RR policy, meaning
* it'll just create the first RR policy instance */ * it'll just create the first RR policy instance */
rr_handover(exec_ctx, lb_client->glb_policy, error); rr_handover_locked(exec_ctx, lb_client->glb_policy, error);
} else { } else {
/* unref the RR policy, eventually leading to its substitution with a /* unref the RR policy, eventually leading to its substitution with a
* new one constructed from the received serverlist (see * new one constructed from the received serverlist (see
@ -1098,6 +1106,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy, GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy,
"serverlist_received"); "serverlist_received");
} }
gpr_mu_unlock(&lb_client->glb_policy->mu);
} else { } else {
if (grpc_lb_glb_trace) { if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,

@ -396,7 +396,6 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_lock(&p->mu); gpr_mu_lock(&p->mu);
if ((selected = peek_next_connected_locked(p))) { if ((selected = peek_next_connected_locked(p))) {
/* readily available, report right away */ /* readily available, report right away */
gpr_mu_unlock(&p->mu);
*target = grpc_subchannel_get_connected_subchannel(selected->subchannel); *target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
if (user_data != NULL) { if (user_data != NULL) {
@ -409,6 +408,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
} }
/* only advance the last picked pointer if the selection was used */ /* only advance the last picked pointer if the selection was used */
advance_last_picked_locked(p); advance_last_picked_locked(p);
gpr_mu_unlock(&p->mu);
return 1; return 1;
} else { } else {
/* no pick currently available. Save for later in list of pending picks */ /* no pick currently available. Save for later in list of pending picks */

Loading…
Cancel
Save