From 90712d5e5dfc40ec694a031c70379cd3c9c94431 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 13 Oct 2016 19:33:04 -0700 Subject: [PATCH 1/5] gRPCLB and RR race fixes --- src/core/ext/lb_policy/grpclb/grpclb.c | 112 +++++++++++------- .../ext/lb_policy/round_robin/round_robin.c | 2 +- 2 files changed, 71 insertions(+), 43 deletions(-) diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 626f285b90a..5f0574e5df7 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -69,8 +69,8 @@ * possible scenarios: * * 1. This is the first server list received. There was no previous instance of - * the Round Robin policy. \a rr_handover() will instantiate the RR policy - * and perform all the pending operations over it. + * the Round Robin policy. \a rr_handover_locked() will instantiate the RR + * policy and perform all the pending operations over it. * 2. There's already a RR policy instance active. We need to introduce the new * one build from the new serverlist, but taking care not to disrupt the * operations in progress over the old RR instance. This is done by @@ -78,7 +78,7 @@ * references are held on the old RR policy, it'll be destroyed and \a * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN * state. At this point we can transition to a new RR instance safely, which - * is done once again via \a rr_handover(). + * is done once again via \a rr_handover_locked(). * * * Once a RR policy instance is in place (and getting updated as described), @@ -86,8 +86,8 @@ * forwarding them to the RR instance. Any time there's no RR policy available * (ie, right after the creation of the gRPCLB policy, if an empty serverlist * is received, etc), pick/ping requests are added to a list of pending - * picks/pings to be flushed and serviced as part of \a rr_handover() the moment - * the RR policy instance becomes available. + * picks/pings to be flushed and serviced as part of \a rr_handover_locked() the + * moment the RR policy instance becomes available. * * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the * high level design and details. */ @@ -158,6 +158,12 @@ typedef struct wrapped_rr_closure_arg { /* when not NULL, represents a pending_{pick,ping} node to be freed upon * closure execution */ void *owning_pending_node; /* to be freed if not NULL */ + + /* Pointer ot heap memory if the closure and its argument were allocated + * dynamically outside of a pending pick. It'll be NULL otherwise. + * + * TODO(dgq): This is by no means pretty. */ + void *closure_mem_or_null; } wrapped_rr_closure_arg; /* The \a on_complete closure passed as part of the pick requires keeping a @@ -186,7 +192,17 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error), NULL); - gpr_free(wc_arg->owning_pending_node); + + /* Make sure this closure and its arg are EITHER on the heap on their oen OR + * part of a pending pick (thus part of the pending pick's memory) */ + GPR_ASSERT((wc_arg->closure_mem_or_null != NULL) + + (wc_arg->owning_pending_node != NULL) == + 1); + if (wc_arg->closure_mem_or_null) { + gpr_free(wc_arg->closure_mem_or_null); + } else { + gpr_free(wc_arg->owning_pending_node); + } } /* Linked list of pending pick requests. It stores all information needed to @@ -307,13 +323,6 @@ typedef struct glb_lb_policy { /** for tracking of the RR connectivity */ rr_connectivity_data *rr_connectivity; - - /* a wrapped (see \a wrapped_rr_closure) on-complete closure for readily - * available RR picks */ - grpc_closure wrapped_on_complete; - - /* arguments for the wrapped_on_complete closure */ - wrapped_rr_closure_arg wc_arg; } glb_lb_policy; /* Keeps track and reacts to changes in connectivity of the RR instance */ @@ -424,9 +433,9 @@ static void lb_token_destroy(void *token) { if (token != NULL) GRPC_MDELEM_UNREF(token); } -static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, - const grpc_grpclb_serverlist *serverlist, - glb_lb_policy *glb_policy) { +static grpc_lb_policy *create_rr_locked( + grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist, + glb_lb_policy *glb_policy) { GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); grpc_lb_policy_args args; @@ -446,12 +455,12 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, return rr; } -static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, - grpc_error *error) { +static void rr_handover_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy, grpc_error *error) { GPR_ASSERT(glb_policy->serverlist != NULL && glb_policy->serverlist->num_servers > 0); glb_policy->rr_policy = - create_rr(exec_ctx, glb_policy->serverlist, glb_policy); + create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy); if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")", @@ -474,6 +483,7 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, glb_policy->pending_picks = pp->next; GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick"); pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy; + pp->wrapped_on_complete_arg.owning_pending_node = pp; if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); @@ -482,7 +492,6 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, pp->target, (void **)&pp->wrapped_on_complete_arg.lb_token, &pp->wrapped_on_complete); - pp->wrapped_on_complete_arg.owning_pending_node = pp; } pending_ping *pping; @@ -490,13 +499,13 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, glb_policy->pending_pings = pping->next; GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy; + pping->wrapped_notify_arg.owning_pending_node = pping; if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, &pping->wrapped_notify); - pping->wrapped_notify_arg.owning_pending_node = pping; } } @@ -509,13 +518,16 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, if (glb_policy->serverlist != NULL) { /* a RR policy is shutting down but there's a serverlist available -> * perform a handover */ - rr_handover(exec_ctx, glb_policy, error); + gpr_mu_lock(&glb_policy->mu); + rr_handover_locked(exec_ctx, glb_policy, error); + gpr_mu_unlock(&glb_policy->mu); } else { /* shutting down and no new serverlist available. Bail out. */ gpr_free(rr_conn_data); } } else { if (error == GRPC_ERROR_NONE) { + gpr_mu_lock(&glb_policy->mu); /* RR not shutting down. Mimic the RR's policy state */ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_conn_data->state, GRPC_ERROR_REF(error), @@ -524,6 +536,7 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, &rr_conn_data->state, &rr_conn_data->on_change); + gpr_mu_unlock(&glb_policy->mu); } else { /* error */ gpr_free(rr_conn_data); } @@ -775,37 +788,50 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, (intptr_t)glb_policy->rr_policy); } GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); - memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg)); - glb_policy->wc_arg.rr_policy = glb_policy->rr_policy; - glb_policy->wc_arg.target = target; - glb_policy->wc_arg.wrapped_closure = on_complete; - glb_policy->wc_arg.lb_token_mdelem_storage = - pick_args->lb_token_mdelem_storage; - glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata; - glb_policy->wc_arg.owning_pending_node = NULL; - grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure, - &glb_policy->wc_arg); + + /* we need to allocate the closure on the stack because we may be serving + * concurrent picks: a single field in glb_policy isn't good enough */ + void *closure_mem = + gpr_malloc(sizeof(grpc_closure) + sizeof(wrapped_rr_closure_arg)); + grpc_closure *wrapped_on_complete = closure_mem; + memset(wrapped_on_complete, 0, sizeof(grpc_closure)); + + wrapped_rr_closure_arg *wc_arg = closure_mem + sizeof(grpc_closure); + memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg)); + + grpc_closure_init(wrapped_on_complete, wrapped_rr_closure, wc_arg); + + wc_arg->rr_policy = glb_policy->rr_policy; + wc_arg->target = target; + wc_arg->wrapped_closure = on_complete; + wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; + wc_arg->initial_metadata = pick_args->initial_metadata; + wc_arg->owning_pending_node = NULL; + wc_arg->closure_mem_or_null = closure_mem; pick_done = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target, - (void **)&glb_policy->wc_arg.lb_token, - &glb_policy->wrapped_on_complete); + (void **)&wc_arg->lb_token, wrapped_on_complete); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", - (intptr_t)glb_policy->wc_arg.rr_policy); + (intptr_t)wc_arg->rr_policy); } - GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick"); + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick"); /* add the load reporting initial metadata */ - initial_metadata_add_lb_token( - pick_args->initial_metadata, pick_args->lb_token_mdelem_storage, - GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token)); + initial_metadata_add_lb_token(pick_args->initial_metadata, + pick_args->lb_token_mdelem_storage, + GRPC_MDELEM_REF(wc_arg->lb_token)); + + gpr_free(closure_mem); } + /* else, !pick_done, the pending pick will be registered and taken care of + * by the pending pick list inside the RR policy (glb_policy->rr_policy). + * Eventually, wrapped_on_complete will be called, which will -among other + * things- add the LB token to the call's initial metadata */ } else { - /* else, the pending pick will be registered and taken care of by the - * pending pick list inside the RR policy (glb_policy->rr_policy) */ grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, glb_policy->base.interested_parties); add_pending_pick(&glb_policy->pending_picks, pick_args, target, @@ -1076,6 +1102,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { /* update serverlist */ if (serverlist->num_servers > 0) { + gpr_mu_lock(&lb_client->glb_policy->mu); if (grpc_grpclb_serverlist_equals(lb_client->glb_policy->serverlist, serverlist)) { if (grpc_lb_glb_trace) { @@ -1093,7 +1120,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { if (lb_client->glb_policy->rr_policy == NULL) { /* initial "handover", in this case from a null RR policy, meaning * it'll just create the first RR policy instance */ - rr_handover(exec_ctx, lb_client->glb_policy, error); + rr_handover_locked(exec_ctx, lb_client->glb_policy, error); } else { /* unref the RR policy, eventually leading to its substitution with a * new one constructed from the received serverlist (see @@ -1101,6 +1128,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy, "serverlist_received"); } + gpr_mu_unlock(&lb_client->glb_policy->mu); } else { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 930fa86aca1..325202fc6d0 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -403,7 +403,6 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_lock(&p->mu); if ((selected = peek_next_connected_locked(p))) { /* readily available, report right away */ - gpr_mu_unlock(&p->mu); *target = grpc_subchannel_get_connected_subchannel(selected->subchannel); if (user_data != NULL) { @@ -416,6 +415,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } /* only advance the last picked pointer if the selection was used */ advance_last_picked_locked(p); + gpr_mu_unlock(&p->mu); return 1; } else { /* no pick currently available. Save for later in list of pending picks */ From 97ba64244a779d3e047106b60335a1d7192977a2 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Fri, 14 Oct 2016 13:06:45 -0700 Subject: [PATCH 2/5] pr comments --- src/core/ext/lb_policy/grpclb/grpclb.c | 86 ++++++++------------------ 1 file changed, 27 insertions(+), 59 deletions(-) diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 5f0574e5df7..7363910dfb2 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -134,6 +134,9 @@ static void initial_metadata_add_lb_token( } typedef struct wrapped_rr_closure_arg { + /* the closure instance using this struct as argument */ + grpc_closure wrapper_closure; + /* the original closure. Usually a on_complete/notify cb for pick() and ping() * calls against the internal RR instance, respectively. */ grpc_closure *wrapped_closure; @@ -155,15 +158,8 @@ typedef struct wrapped_rr_closure_arg { /* The RR instance related to the closure */ grpc_lb_policy *rr_policy; - /* when not NULL, represents a pending_{pick,ping} node to be freed upon - * closure execution */ - void *owning_pending_node; /* to be freed if not NULL */ - - /* Pointer ot heap memory if the closure and its argument were allocated - * dynamically outside of a pending pick. It'll be NULL otherwise. - * - * TODO(dgq): This is by no means pretty. */ - void *closure_mem_or_null; + /* heap memory to be freed upon closure execution. */ + void *free_when_done; } wrapped_rr_closure_arg; /* The \a on_complete closure passed as part of the pick requires keeping a @@ -189,20 +185,9 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, } } GPR_ASSERT(wc_arg->wrapped_closure != NULL); - grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error), NULL); - - /* Make sure this closure and its arg are EITHER on the heap on their oen OR - * part of a pending pick (thus part of the pending pick's memory) */ - GPR_ASSERT((wc_arg->closure_mem_or_null != NULL) + - (wc_arg->owning_pending_node != NULL) == - 1); - if (wc_arg->closure_mem_or_null) { - gpr_free(wc_arg->closure_mem_or_null); - } else { - gpr_free(wc_arg->owning_pending_node); - } + gpr_free(wc_arg->free_when_done); } /* Linked list of pending pick requests. It stores all information needed to @@ -223,10 +208,6 @@ typedef struct pending_pick { * upon error. */ grpc_connected_subchannel **target; - /* a closure wrapping the original on_complete one to be invoked once the - * pick() has completed (regardless of success) */ - grpc_closure wrapped_on_complete; - /* args for wrapped_on_complete */ wrapped_rr_closure_arg wrapped_on_complete_arg; } pending_pick; @@ -246,8 +227,8 @@ static void add_pending_pick(pending_pick **root, pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; pp->wrapped_on_complete_arg.lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; - grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure, - &pp->wrapped_on_complete_arg); + grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure, + wrapped_rr_closure, &pp->wrapped_on_complete_arg); *root = pp; } @@ -255,10 +236,6 @@ static void add_pending_pick(pending_pick **root, typedef struct pending_ping { struct pending_ping *next; - /* a closure wrapping the original on_complete one to be invoked once the - * ping() has completed (regardless of success) */ - grpc_closure wrapped_notify; - /* args for wrapped_notify */ wrapped_rr_closure_arg wrapped_notify_arg; } pending_ping; @@ -268,8 +245,8 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) { memset(pping, 0, sizeof(pending_ping)); memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg)); pping->next = *root; - grpc_closure_init(&pping->wrapped_notify, wrapped_rr_closure, - &pping->wrapped_notify_arg); + grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure, + wrapped_rr_closure, &pping->wrapped_notify_arg); pping->wrapped_notify_arg.wrapped_closure = notify; *root = pping; } @@ -483,7 +460,7 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_policy->pending_picks = pp->next; GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick"); pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy; - pp->wrapped_on_complete_arg.owning_pending_node = pp; + pp->wrapped_on_complete_arg.free_when_done = pp; if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); @@ -491,7 +468,7 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pp->pick_args, pp->target, (void **)&pp->wrapped_on_complete_arg.lb_token, - &pp->wrapped_on_complete); + &pp->wrapped_on_complete_arg.wrapper_closure); } pending_ping *pping; @@ -499,13 +476,13 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_policy->pending_pings = pping->next; GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy; - pping->wrapped_notify_arg.owning_pending_node = pping; + pping->wrapped_notify_arg.free_when_done = pping; if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, - &pping->wrapped_notify); + &pping->wrapped_notify_arg.wrapper_closure); } } @@ -661,15 +638,15 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_ERROR_NONE, NULL); pp = next; } while (pping != NULL) { pending_ping *next = pping->next; - grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, + GRPC_ERROR_NONE, NULL); pping = next; } @@ -703,7 +680,7 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties); *target = NULL; grpc_exec_ctx_sched( - exec_ctx, &pp->wrapped_on_complete, + exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); } else { pp->next = glb_policy->pending_picks; @@ -735,7 +712,7 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_polling_entity_del_from_pollset_set( exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties); grpc_exec_ctx_sched( - exec_ctx, &pp->wrapped_on_complete, + exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); } else { pp->next = glb_policy->pending_picks; @@ -789,29 +766,20 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); - /* we need to allocate the closure on the stack because we may be serving - * concurrent picks: a single field in glb_policy isn't good enough */ - void *closure_mem = - gpr_malloc(sizeof(grpc_closure) + sizeof(wrapped_rr_closure_arg)); - grpc_closure *wrapped_on_complete = closure_mem; - memset(wrapped_on_complete, 0, sizeof(grpc_closure)); - - wrapped_rr_closure_arg *wc_arg = closure_mem + sizeof(grpc_closure); + wrapped_rr_closure_arg *wc_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg)); memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg)); - grpc_closure_init(wrapped_on_complete, wrapped_rr_closure, wc_arg); - + grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg); wc_arg->rr_policy = glb_policy->rr_policy; wc_arg->target = target; wc_arg->wrapped_closure = on_complete; wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; wc_arg->initial_metadata = pick_args->initial_metadata; - wc_arg->owning_pending_node = NULL; - wc_arg->closure_mem_or_null = closure_mem; + wc_arg->free_when_done = wc_arg; - pick_done = - grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target, - (void **)&wc_arg->lb_token, wrapped_on_complete); + pick_done = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, + target, (void **)&wc_arg->lb_token, + &wc_arg->wrapper_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ if (grpc_lb_glb_trace) { @@ -825,7 +793,7 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pick_args->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); - gpr_free(closure_mem); + gpr_free(wc_arg); } /* else, !pick_done, the pending pick will be registered and taken care of * by the pending pick list inside the RR policy (glb_policy->rr_policy). From b39330d9957243cfbe2d6c2cf509469be72ccdd9 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Fri, 14 Oct 2016 13:35:56 -0700 Subject: [PATCH 3/5] Tiny refactoring --- src/core/ext/lb_policy/grpclb/grpclb.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 7363910dfb2..c18cc0eb9ee 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -187,6 +187,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(wc_arg->wrapped_closure != NULL); grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error), NULL); + GPR_ASSERT(wc_arg->free_when_done != NULL); gpr_free(wc_arg->free_when_done); } @@ -227,6 +228,7 @@ static void add_pending_pick(pending_pick **root, pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; pp->wrapped_on_complete_arg.lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; + pp->wrapped_on_complete_arg.free_when_done = pp; grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure, wrapped_rr_closure, &pp->wrapped_on_complete_arg); *root = pp; @@ -244,10 +246,11 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) { pending_ping *pping = gpr_malloc(sizeof(*pping)); memset(pping, 0, sizeof(pending_ping)); memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg)); + pping->wrapped_notify_arg.wrapped_closure = notify; + pping->wrapped_notify_arg.free_when_done = pping; pping->next = *root; grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure, wrapped_rr_closure, &pping->wrapped_notify_arg); - pping->wrapped_notify_arg.wrapped_closure = notify; *root = pping; } @@ -460,7 +463,6 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_policy->pending_picks = pp->next; GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick"); pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy; - pp->wrapped_on_complete_arg.free_when_done = pp; if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); @@ -476,7 +478,6 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_policy->pending_pings = pping->next; GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy; - pping->wrapped_notify_arg.free_when_done = pping; if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); From 58c18e74430022e01e9b49e280dec820ba29875b Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Fri, 14 Oct 2016 15:23:45 -0700 Subject: [PATCH 4/5] Factored out common code into pick_from_internal_rr. This also fixed a potential bug whereby callbacks wouldn't be called if the pick happened immediately during the flushing of pending picks. --- src/core/ext/lb_policy/grpclb/grpclb.c | 66 +++++++++++++++----------- 1 file changed, 39 insertions(+), 27 deletions(-) diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 140dc9b7828..4386e6ec4d0 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -413,6 +413,40 @@ static void lb_token_destroy(void *token) { if (token != NULL) GRPC_MDELEM_UNREF(token); } +/* perform a pick over \a rr_policy. Given that a pick can return immediately + * (ignoring its completion callback) we need to perform the cleanups this + * callback would be otherwise resposible for */ +static bool pick_from_internal_rr_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy* rr_policy, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, + wrapped_rr_closure_arg *wc_arg) { + GPR_ASSERT(rr_policy != NULL); + const bool pick_done = grpc_lb_policy_pick(exec_ctx, rr_policy, pick_args, + target, (void **)&wc_arg->lb_token, + &wc_arg->wrapper_closure); + if (pick_done) { + /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", + (intptr_t)wc_arg->rr_policy); + } + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick"); + + /* add the load reporting initial metadata */ + initial_metadata_add_lb_token(pick_args->initial_metadata, + pick_args->lb_token_mdelem_storage, + GRPC_MDELEM_REF(wc_arg->lb_token)); + + gpr_free(wc_arg); + } + /* else, the pending pick will be registered and taken care of by the + * pending pick list inside the RR policy (glb_policy->rr_policy). + * Eventually, wrapped_on_complete will be called, which will -among other + * things- add the LB token to the call's initial metadata */ + + return pick_done; +} + static grpc_lb_policy *create_rr_locked( grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist, glb_lb_policy *glb_policy) { @@ -470,10 +504,9 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } - grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pp->pick_args, - pp->target, - (void **)&pp->wrapped_on_complete_arg.lb_token, - &pp->wrapped_on_complete_arg.wrapper_closure); + pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, + &pp->pick_args, pp->target, + &pp->wrapped_on_complete_arg); } pending_ping *pping; @@ -776,29 +809,8 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; wc_arg->initial_metadata = pick_args->initial_metadata; wc_arg->free_when_done = wc_arg; - - pick_done = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, - target, (void **)&wc_arg->lb_token, - &wc_arg->wrapper_closure); - if (pick_done) { - /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", - (intptr_t)wc_arg->rr_policy); - } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick"); - - /* add the load reporting initial metadata */ - initial_metadata_add_lb_token(pick_args->initial_metadata, - pick_args->lb_token_mdelem_storage, - GRPC_MDELEM_REF(wc_arg->lb_token)); - - gpr_free(wc_arg); - } - /* else, !pick_done, the pending pick will be registered and taken care of - * by the pending pick list inside the RR policy (glb_policy->rr_policy). - * Eventually, wrapped_on_complete will be called, which will -among other - * things- add the LB token to the call's initial metadata */ + pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, + pick_args, target, wc_arg); } else { add_pending_pick(&glb_policy->pending_picks, pick_args, target, on_complete); From 2035906c283bf575a7325e90cef5b87dd8a24195 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Sat, 15 Oct 2016 15:22:51 -0700 Subject: [PATCH 5/5] clang-format --- src/core/ext/lb_policy/grpclb/grpclb.c | 54 +++++++++++++------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 4386e6ec4d0..9af92b787df 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -416,35 +416,35 @@ static void lb_token_destroy(void *token) { /* perform a pick over \a rr_policy. Given that a pick can return immediately * (ignoring its completion callback) we need to perform the cleanups this * callback would be otherwise resposible for */ -static bool pick_from_internal_rr_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy* rr_policy, - const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, - wrapped_rr_closure_arg *wc_arg) { - GPR_ASSERT(rr_policy != NULL); - const bool pick_done = grpc_lb_policy_pick(exec_ctx, rr_policy, pick_args, - target, (void **)&wc_arg->lb_token, - &wc_arg->wrapper_closure); - if (pick_done) { - /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", - (intptr_t)wc_arg->rr_policy); - } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick"); +static bool pick_from_internal_rr_locked( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *rr_policy, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { + GPR_ASSERT(rr_policy != NULL); + const bool pick_done = + grpc_lb_policy_pick(exec_ctx, rr_policy, pick_args, target, + (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure); + if (pick_done) { + /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", + (intptr_t)wc_arg->rr_policy); + } + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick"); - /* add the load reporting initial metadata */ - initial_metadata_add_lb_token(pick_args->initial_metadata, - pick_args->lb_token_mdelem_storage, - GRPC_MDELEM_REF(wc_arg->lb_token)); + /* add the load reporting initial metadata */ + initial_metadata_add_lb_token(pick_args->initial_metadata, + pick_args->lb_token_mdelem_storage, + GRPC_MDELEM_REF(wc_arg->lb_token)); - gpr_free(wc_arg); - } - /* else, the pending pick will be registered and taken care of by the - * pending pick list inside the RR policy (glb_policy->rr_policy). - * Eventually, wrapped_on_complete will be called, which will -among other - * things- add the LB token to the call's initial metadata */ + gpr_free(wc_arg); + } + /* else, the pending pick will be registered and taken care of by the + * pending pick list inside the RR policy (glb_policy->rr_policy). + * Eventually, wrapped_on_complete will be called, which will -among other + * things- add the LB token to the call's initial metadata */ - return pick_done; + return pick_done; } static grpc_lb_policy *create_rr_locked( @@ -810,7 +810,7 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, wc_arg->initial_metadata = pick_args->initial_metadata; wc_arg->free_when_done = wc_arg; pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, - pick_args, target, wc_arg); + pick_args, target, wc_arg); } else { add_pending_pick(&glb_policy->pending_picks, pick_args, target, on_complete);