Introduced grpc_lb_policy_pick_args and added some LB docs

pull/7740/head
David Garcia Quintas 8 years ago
parent 5a22a598b3
commit 8aace513d0
  1. 7
      src/core/ext/client_config/client_channel.c
  2. 7
      src/core/ext/client_config/lb_policy.c
  3. 50
      src/core/ext/client_config/lb_policy.h
  4. 32
      src/core/ext/lb_policy/grpclb/grpclb.c
  5. 10
      src/core/ext/lb_policy/pick_first/pick_first.c
  6. 10
      src/core/ext/lb_policy/round_robin/round_robin.c

@ -401,9 +401,10 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
int r; int r;
GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel"); GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel");
gpr_mu_unlock(&chand->mu_config); gpr_mu_unlock(&chand->mu_config);
r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollent, const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata,
initial_metadata, initial_metadata_flags, initial_metadata_flags};
connected_subchannel, on_ready); r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel,
on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel"); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel");
GPR_TIMER_END("cc_pick_subchannel", 0); GPR_TIMER_END("cc_pick_subchannel", 0);
return r; return r;

@ -100,13 +100,10 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
} }
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_polling_entity *pollent, const grpc_lb_policy_pick_args *pick_args,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_connected_subchannel **target,
grpc_closure *on_complete) { grpc_closure *on_complete) {
return policy->vtable->pick(exec_ctx, policy, pollent, initial_metadata, return policy->vtable->pick(exec_ctx, policy, pick_args, target, on_complete);
initial_metadata_flags, target, on_complete);
} }
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,

@ -53,23 +53,35 @@ struct grpc_lb_policy {
grpc_pollset_set *interested_parties; grpc_pollset_set *interested_parties;
}; };
/** Extra arguments for an LB pick */
typedef struct grpc_lb_policy_pick_args {
/** Parties interested in the pick's progress */
grpc_polling_entity *pollent;
/** Initial metadata associated with the picking call. */
grpc_metadata_batch *initial_metadata;
/** See \a GRPC_INITIAL_METADATA_* in grpc_types.h */
uint32_t initial_metadata_flags;
} grpc_lb_policy_pick_args;
struct grpc_lb_policy_vtable { struct grpc_lb_policy_vtable {
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
/** implement grpc_lb_policy_pick */ /** \see grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_polling_entity *pollent, const grpc_lb_policy_pick_args *pick_args,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_closure *on_complete); grpc_connected_subchannel **target, grpc_closure *on_complete);
/** \see grpc_lb_policy_cancel_pick */
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target); grpc_connected_subchannel **target);
/** \see grpc_lb_policy_cancel_picks */
void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq); uint32_t initial_metadata_flags_eq);
/** \see grpc_lb_policy_ping_one */
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure); grpc_closure *closure);
@ -83,8 +95,7 @@ struct grpc_lb_policy_vtable {
/** call notify when the connectivity state of a channel changes from *state. /** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy. Calling with a NULL \a Updates *state with the new state of the policy. Calling with a NULL \a
state cancels the subscription. state cancels the subscription. */
*/
void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx, void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy, grpc_lb_policy *policy,
grpc_connectivity_state *state, grpc_connectivity_state *state,
@ -124,26 +135,31 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_init(grpc_lb_policy *policy, void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable); const grpc_lb_policy_vtable *vtable);
/** Given initial metadata in \a initial_metadata, find an appropriate /** Find an appropriate target for this call, based on \a pick_args.
target for this rpc, and 'return' it by calling \a on_complete after setting Upon completion \a on_complete will be called, with \a *target set to an
\a target. appropriate connected subchannel if the pick was successful or NULL
Picking can be asynchronous. Any IO should be done under \a pollent. */ otherwise.
Picking can be asynchronous. Any IO should be done under \a
pick_args->pollent. */
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_polling_entity *pollent, const grpc_lb_policy_pick_args *pick_args,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_connected_subchannel **target,
grpc_closure *on_complete); grpc_closure *on_complete);
/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
against one of the connected subchannels managed by \a policy. */
void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure); grpc_closure *closure);
/** Cancel picks for \a target.
The \a on_complete callback of the pending picks will be invoked with \a
*target set to NULL. */
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target); grpc_connected_subchannel **target);
/** Cancel all pending picks which have: /** Cancel all pending picks for which their \a initial_metadata_flags (as given
(initial_metadata_flags & initial_metadata_flags_mask) == in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq
initial_metadata_flags_eq */ when AND'd with \a initial_metadata_flags_mask */
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_mask,

@ -180,19 +180,18 @@ typedef struct pending_pick {
wrapped_rr_closure_arg wrapped_on_complete_arg; wrapped_rr_closure_arg wrapped_on_complete_arg;
} pending_pick; } pending_pick;
static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent, static void add_pending_pick(pending_pick **root,
grpc_metadata_batch *initial_metadata, const grpc_lb_policy_pick_args *pick_args,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_connected_subchannel **target,
grpc_closure *on_complete) { grpc_closure *on_complete) {
pending_pick *pp = gpr_malloc(sizeof(*pp)); pending_pick *pp = gpr_malloc(sizeof(*pp));
memset(pp, 0, sizeof(pending_pick)); memset(pp, 0, sizeof(pending_pick));
memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg)); memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
pp->next = *root; pp->next = *root;
pp->pollent = pollent; pp->pollent = pick_args->pollent;
pp->target = target; pp->target = target;
pp->initial_metadata = initial_metadata; pp->initial_metadata = pick_args->initial_metadata;
pp->initial_metadata_flags = initial_metadata_flags; pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->wrapped_on_complete_arg.wrapped_closure = on_complete; pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure, grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
&pp->wrapped_on_complete_arg); &pp->wrapped_on_complete_arg);
@ -359,9 +358,10 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy); (intptr_t)glb_policy->rr_policy);
} }
grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->pollent, const grpc_lb_policy_pick_args pick_args = {
pp->initial_metadata, pp->initial_metadata_flags, pp->pollent, pp->initial_metadata, pp->initial_metadata_flags};
pp->target, &pp->wrapped_on_complete); grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target,
&pp->wrapped_on_complete);
pp->wrapped_on_complete_arg.owning_pending_node = pp; pp->wrapped_on_complete_arg.owning_pending_node = pp;
} }
@ -603,9 +603,7 @@ static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
} }
static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity *pollent, const grpc_lb_policy_pick_args *pick_args,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_connected_subchannel **target,
grpc_closure *on_complete) { grpc_closure *on_complete) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol; glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
@ -623,8 +621,8 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
glb_policy->wc_arg.wrapped_closure = on_complete; glb_policy->wc_arg.wrapped_closure = on_complete;
grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure, grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
&glb_policy->wc_arg); &glb_policy->wc_arg);
r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pollent,
initial_metadata, initial_metadata_flags, target, r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
&glb_policy->wrapped_on_complete); &glb_policy->wrapped_on_complete);
if (r != 0) { if (r != 0) {
/* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR /* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR
@ -639,10 +637,10 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
GRPC_ERROR_NONE, NULL); GRPC_ERROR_NONE, NULL);
} }
} else { } else {
grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent, grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
glb_policy->base.interested_parties); glb_policy->base.interested_parties);
add_pending_pick(&glb_policy->pending_picks, pollent, initial_metadata, add_pending_pick(&glb_policy->pending_picks, pick_args, target,
initial_metadata_flags, target, on_complete); on_complete);
if (!glb_policy->started_picking) { if (!glb_policy->started_picking) {
start_picking(exec_ctx, glb_policy); start_picking(exec_ctx, glb_policy);

@ -199,9 +199,7 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
} }
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity *pollent, const grpc_lb_policy_pick_args *pick_args,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_connected_subchannel **target,
grpc_closure *on_complete) { grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@ -225,13 +223,13 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) { if (!p->started_picking) {
start_picking(exec_ctx, p); start_picking(exec_ctx, p);
} }
grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent, grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
p->base.interested_parties); p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp)); pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks; pp->next = p->pending_picks;
pp->pollent = pollent; pp->pollent = pick_args->pollent;
pp->target = target; pp->target = target;
pp->initial_metadata_flags = initial_metadata_flags; pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->on_complete = on_complete; pp->on_complete = on_complete;
p->pending_picks = pp; p->pending_picks = pp;
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);

@ -361,9 +361,7 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
} }
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity *pollent, const grpc_lb_policy_pick_args *pick_args,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_connected_subchannel **target,
grpc_closure *on_complete) { grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
@ -385,14 +383,14 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) { if (!p->started_picking) {
start_picking(exec_ctx, p); start_picking(exec_ctx, p);
} }
grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent, grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
p->base.interested_parties); p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp)); pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks; pp->next = p->pending_picks;
pp->pollent = pollent; pp->pollent = pick_args->pollent;
pp->target = target; pp->target = target;
pp->on_complete = on_complete; pp->on_complete = on_complete;
pp->initial_metadata_flags = initial_metadata_flags; pp->initial_metadata_flags = pick_args->initial_metadata_flags;
p->pending_picks = pp; p->pending_picks = pp;
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
return 0; return 0;

Loading…
Cancel
Save