Merge pull request #9664 from ctiller/c3+r+l

Make load balancers use combiner locks
pull/9864/head
Craig Tiller 8 years ago committed by GitHub
commit 8a6db33e7b
  1. 42
      src/core/ext/client_channel/client_channel.c
  2. 89
      src/core/ext/client_channel/lb_policy.c
  3. 85
      src/core/ext/client_channel/lb_policy.h
  4. 1
      src/core/ext/client_channel/lb_policy_factory.h
  5. 2
      src/core/ext/client_channel/subchannel.c
  6. 170
      src/core/ext/lb_policy/grpclb/grpclb.c
  7. 181
      src/core/ext/lb_policy/pick_first/pick_first.c
  8. 100
      src/core/ext/lb_policy/round_robin/round_robin.c

@ -183,7 +183,7 @@ typedef struct client_channel_channel_data {
grpc_pollset_set *interested_parties; grpc_pollset_set *interested_parties;
/* the following properties are guarded by a mutex since API's require them /* the following properties are guarded by a mutex since API's require them
to be instantaniously available */ to be instantaneously available */
gpr_mu info_mu; gpr_mu info_mu;
char *info_lb_policy_name; char *info_lb_policy_name;
/** service config in JSON form */ /** service config in JSON form */
@ -200,9 +200,9 @@ typedef struct {
grpc_lb_policy *lb_policy; grpc_lb_policy *lb_policy;
} lb_policy_connectivity_watcher; } lb_policy_connectivity_watcher;
static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_lb_policy *lb_policy, grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state); grpc_connectivity_state current_state);
static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
channel_data *chand, channel_data *chand,
@ -213,7 +213,7 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
state == GRPC_CHANNEL_SHUTDOWN) && state == GRPC_CHANNEL_SHUTDOWN) &&
chand->lb_policy != NULL) { chand->lb_policy != NULL) {
/* cancel picks with wait_for_ready=false */ /* cancel picks with wait_for_ready=false */
grpc_lb_policy_cancel_picks( grpc_lb_policy_cancel_picks_locked(
exec_ctx, chand->lb_policy, exec_ctx, chand->lb_policy,
/* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
/* check= */ 0, GRPC_ERROR_REF(error)); /* check= */ 0, GRPC_ERROR_REF(error));
@ -237,7 +237,7 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state, set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
GRPC_ERROR_REF(error), "lb_changed"); GRPC_ERROR_REF(error), "lb_changed");
if (w->state != GRPC_CHANNEL_SHUTDOWN) { if (w->state != GRPC_CHANNEL_SHUTDOWN) {
watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
} }
} }
@ -245,9 +245,9 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
gpr_free(w); gpr_free(w);
} }
static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_lb_policy *lb_policy, grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state) { grpc_connectivity_state current_state) {
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
@ -256,8 +256,8 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_combiner_scheduler(chand->combiner, false)); grpc_combiner_scheduler(chand->combiner, false));
w->state = current_state; w->state = current_state;
w->lb_policy = lb_policy; w->lb_policy = lb_policy;
grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state, grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state,
&w->on_changed); &w->on_changed);
} }
static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
@ -313,13 +313,14 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args lb_policy_args; grpc_lb_policy_args lb_policy_args;
lb_policy_args.args = chand->resolver_result; lb_policy_args.args = chand->resolver_result;
lb_policy_args.client_channel_factory = chand->client_channel_factory; lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy_args.combiner = chand->combiner;
lb_policy = lb_policy =
grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args); grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
if (lb_policy != NULL) { if (lb_policy != NULL) {
GRPC_LB_POLICY_REF(lb_policy, "config_change"); GRPC_LB_POLICY_REF(lb_policy, "config_change");
GRPC_ERROR_UNREF(state_error); GRPC_ERROR_UNREF(state_error);
state = state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy,
grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error); &state_error);
} }
// Find service config. // Find service config.
channel_arg = channel_arg =
@ -383,7 +384,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
set_channel_connectivity_state_locked( set_channel_connectivity_state_locked(
exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver"); exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
if (lb_policy != NULL) { if (lb_policy != NULL) {
watch_lb_policy(exec_ctx, chand, lb_policy, state); watch_lb_policy_locked(exec_ctx, chand, lb_policy, state);
} }
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
grpc_resolver_next_locked(exec_ctx, chand->resolver, grpc_resolver_next_locked(exec_ctx, chand->resolver,
@ -404,7 +405,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
} }
if (exit_idle) { if (exit_idle) {
grpc_lb_policy_exit_idle(exec_ctx, lb_policy); grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle"); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
} }
@ -441,7 +442,7 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_closure_sched(exec_ctx, op->send_ping, grpc_closure_sched(exec_ctx, op->send_ping,
GRPC_ERROR_CREATE("Ping with no load balancing")); GRPC_ERROR_CREATE("Ping with no load balancing"));
} else { } else {
grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping); grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping);
op->bind_pollset = NULL; op->bind_pollset = NULL;
} }
op->send_ping = NULL; op->send_ping = NULL;
@ -808,8 +809,9 @@ static bool pick_subchannel_locked(
if (initial_metadata == NULL) { if (initial_metadata == NULL) {
if (chand->lb_policy != NULL) { if (chand->lb_policy != NULL) {
grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy,
connected_subchannel, GRPC_ERROR_REF(error)); connected_subchannel,
GRPC_ERROR_REF(error));
} }
for (closure = chand->waiting_for_config_closures.head; closure != NULL; for (closure = chand->waiting_for_config_closures.head; closure != NULL;
closure = closure->next_data.next) { closure = closure->next_data.next) {
@ -848,7 +850,7 @@ static bool pick_subchannel_locked(
const grpc_lb_policy_pick_args inputs = { const grpc_lb_policy_pick_args inputs = {
initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem, initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
gpr_inf_future(GPR_CLOCK_MONOTONIC)}; gpr_inf_future(GPR_CLOCK_MONOTONIC)};
const bool result = grpc_lb_policy_pick( const bool result = grpc_lb_policy_pick_locked(
exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready); exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
GPR_TIMER_END("pick_subchannel", 0); GPR_TIMER_END("pick_subchannel", 0);
@ -1216,7 +1218,7 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error_ignored) { grpc_error *error_ignored) {
channel_data *chand = arg; channel_data *chand = arg;
if (chand->lb_policy != NULL) { if (chand->lb_policy != NULL) {
grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy); grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy);
} else { } else {
chand->exit_idle_when_lb_policy_arrives = true; chand->exit_idle_when_lb_policy_arrives = true;
if (!chand->started_resolving && chand->resolver != NULL) { if (!chand->started_resolving && chand->resolver != NULL) {

@ -32,14 +32,17 @@
*/ */
#include "src/core/ext/client_channel/lb_policy.h" #include "src/core/ext/client_channel/lb_policy.h"
#include "src/core/lib/iomgr/combiner.h"
#define WEAK_REF_BITS 16 #define WEAK_REF_BITS 16
void grpc_lb_policy_init(grpc_lb_policy *policy, void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable) { const grpc_lb_policy_vtable *vtable,
grpc_combiner *combiner) {
policy->vtable = vtable; policy->vtable = vtable;
gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS); gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS);
policy->interested_parties = grpc_pollset_set_create(); policy->interested_parties = grpc_pollset_set_create();
policy->combiner = GRPC_COMBINER_REF(combiner, "lb_policy");
} }
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
@ -71,6 +74,13 @@ void grpc_lb_policy_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF")); ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF"));
} }
static void shutdown_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_lb_policy *policy = arg;
policy->vtable->shutdown_locked(exec_ctx, policy);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, policy, "strong-unref");
}
void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
gpr_atm old_val = gpr_atm old_val =
@ -79,10 +89,15 @@ void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx,
gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1); gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1);
gpr_atm check = 1 << WEAK_REF_BITS; gpr_atm check = 1 << WEAK_REF_BITS;
if ((old_val & mask) == check) { if ((old_val & mask) == check) {
policy->vtable->shutdown(exec_ctx, policy); grpc_closure_sched(
exec_ctx,
grpc_closure_create(shutdown_locked, policy,
grpc_combiner_scheduler(policy->combiner, false)),
GRPC_ERROR_NONE);
} else {
grpc_lb_policy_weak_unref(exec_ctx,
policy REF_FUNC_PASS_ARGS("strong-unref"));
} }
grpc_lb_policy_weak_unref(exec_ctx,
policy REF_FUNC_PASS_ARGS("strong-unref"));
} }
void grpc_lb_policy_weak_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { void grpc_lb_policy_weak_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
@ -95,52 +110,58 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF"));
if (old_val == 1) { if (old_val == 1) {
grpc_pollset_set_destroy(exec_ctx, policy->interested_parties); grpc_pollset_set_destroy(exec_ctx, policy->interested_parties);
grpc_combiner *combiner = policy->combiner;
policy->vtable->destroy(exec_ctx, policy); policy->vtable->destroy(exec_ctx, policy);
GRPC_COMBINER_UNREF(exec_ctx, combiner, "lb_policy");
} }
} }
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_pick_args *pick_args, const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data, grpc_connected_subchannel **target,
grpc_closure *on_complete) { void **user_data, grpc_closure *on_complete) {
return policy->vtable->pick(exec_ctx, policy, pick_args, target, user_data, return policy->vtable->pick_locked(exec_ctx, policy, pick_args, target,
on_complete); user_data, on_complete);
} }
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void grpc_lb_policy_cancel_pick_locked(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel **target, grpc_lb_policy *policy,
grpc_error *error) { grpc_connected_subchannel **target,
policy->vtable->cancel_pick(exec_ctx, policy, target, error); grpc_error *error) {
policy->vtable->cancel_pick_locked(exec_ctx, policy, target, error);
} }
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, void grpc_lb_policy_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq, uint32_t initial_metadata_flags_eq,
grpc_error *error) { grpc_error *error) {
policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask, policy->vtable->cancel_picks_locked(exec_ctx, policy,
initial_metadata_flags_eq, error); initial_metadata_flags_mask,
initial_metadata_flags_eq, error);
} }
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { void grpc_lb_policy_exit_idle_locked(grpc_exec_ctx *exec_ctx,
policy->vtable->exit_idle(exec_ctx, policy); grpc_lb_policy *policy) {
policy->vtable->exit_idle_locked(exec_ctx, policy);
} }
void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void grpc_lb_policy_ping_one_locked(grpc_exec_ctx *exec_ctx,
grpc_closure *closure) { grpc_lb_policy *policy,
policy->vtable->ping_one(exec_ctx, policy, closure); grpc_closure *closure) {
policy->vtable->ping_one_locked(exec_ctx, policy, closure);
} }
void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx, void grpc_lb_policy_notify_on_state_change_locked(
grpc_lb_policy *policy, grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connectivity_state *state, grpc_connectivity_state *state, grpc_closure *closure) {
grpc_closure *closure) { policy->vtable->notify_on_state_change_locked(exec_ctx, policy, state,
policy->vtable->notify_on_state_change(exec_ctx, policy, state, closure); closure);
} }
grpc_connectivity_state grpc_lb_policy_check_connectivity( grpc_connectivity_state grpc_lb_policy_check_connectivity_locked(
grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_error **connectivity_error) { grpc_error **connectivity_error) {
return policy->vtable->check_connectivity(exec_ctx, policy, return policy->vtable->check_connectivity_locked(exec_ctx, policy,
connectivity_error); connectivity_error);
} }

@ -51,6 +51,8 @@ struct grpc_lb_policy {
gpr_atm ref_pair; gpr_atm ref_pair;
/* owned pointer to interested parties in load balancing decisions */ /* owned pointer to interested parties in load balancing decisions */
grpc_pollset_set *interested_parties; grpc_pollset_set *interested_parties;
/* combiner under which lb_policy actions take place */
grpc_combiner *combiner;
}; };
/** Extra arguments for an LB pick */ /** Extra arguments for an LB pick */
@ -69,42 +71,44 @@ typedef struct grpc_lb_policy_pick_args {
struct grpc_lb_policy_vtable { struct grpc_lb_policy_vtable {
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); void (*shutdown_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
/** \see grpc_lb_policy_pick */ /** \see grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, int (*pick_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_pick_args *pick_args, const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data, grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete); grpc_closure *on_complete);
/** \see grpc_lb_policy_cancel_pick */ /** \see grpc_lb_policy_cancel_pick */
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void (*cancel_pick_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target, grpc_error *error); grpc_connected_subchannel **target,
grpc_error *error);
/** \see grpc_lb_policy_cancel_picks */ /** \see grpc_lb_policy_cancel_picks */
void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void (*cancel_picks_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq, grpc_error *error); uint32_t initial_metadata_flags_eq,
grpc_error *error);
/** \see grpc_lb_policy_ping_one */ /** \see grpc_lb_policy_ping_one */
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void (*ping_one_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure); grpc_closure *closure);
/** Try to enter a READY connectivity state */ /** Try to enter a READY connectivity state */
void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); void (*exit_idle_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
/** check the current connectivity of the lb_policy */ /** check the current connectivity of the lb_policy */
grpc_connectivity_state (*check_connectivity)( grpc_connectivity_state (*check_connectivity_locked)(
grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_error **connectivity_error); grpc_error **connectivity_error);
/** call notify when the connectivity state of a channel changes from *state. /** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy. Calling with a NULL \a Updates *state with the new state of the policy. Calling with a NULL \a
state cancels the subscription. */ state cancels the subscription. */
void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx, void (*notify_on_state_change_locked)(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy, grpc_lb_policy *policy,
grpc_connectivity_state *state, grpc_connectivity_state *state,
grpc_closure *closure); grpc_closure *closure);
}; };
/*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/ /*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/
@ -144,7 +148,8 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
/** called by concrete implementations to initialize the base struct */ /** called by concrete implementations to initialize the base struct */
void grpc_lb_policy_init(grpc_lb_policy *policy, void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable); const grpc_lb_policy_vtable *vtable,
grpc_combiner *combiner);
/** Finds an appropriate subchannel for a call, based on \a pick_args. /** Finds an appropriate subchannel for a call, based on \a pick_args.
@ -159,43 +164,45 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
Any IO should be done under the \a interested_parties \a grpc_pollset_set Any IO should be done under the \a interested_parties \a grpc_pollset_set
in the \a grpc_lb_policy struct. */ in the \a grpc_lb_policy struct. */
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_pick_args *pick_args, const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data, grpc_connected_subchannel **target,
grpc_closure *on_complete); void **user_data, grpc_closure *on_complete);
/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping) /** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
against one of the connected subchannels managed by \a policy. */ against one of the connected subchannels managed by \a policy. */
void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void grpc_lb_policy_ping_one_locked(grpc_exec_ctx *exec_ctx,
grpc_closure *closure); grpc_lb_policy *policy,
grpc_closure *closure);
/** Cancel picks for \a target. /** Cancel picks for \a target.
The \a on_complete callback of the pending picks will be invoked with \a The \a on_complete callback of the pending picks will be invoked with \a
*target set to NULL. */ *target set to NULL. */
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void grpc_lb_policy_cancel_pick_locked(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel **target, grpc_lb_policy *policy,
grpc_error *error); grpc_connected_subchannel **target,
grpc_error *error);
/** Cancel all pending picks for which their \a initial_metadata_flags (as given /** Cancel all pending picks for which their \a initial_metadata_flags (as given
in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq
when AND'd with \a initial_metadata_flags_mask */ when AND'd with \a initial_metadata_flags_mask */
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, void grpc_lb_policy_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq, uint32_t initial_metadata_flags_eq,
grpc_error *error); grpc_error *error);
/** Try to enter a READY connectivity state */ /** Try to enter a READY connectivity state */
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); void grpc_lb_policy_exit_idle_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy);
/* Call notify when the connectivity state of a channel changes from \a *state. /* Call notify when the connectivity state of a channel changes from \a *state.
* Updates \a *state with the new state of the policy */ * Updates \a *state with the new state of the policy */
void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx, void grpc_lb_policy_notify_on_state_change_locked(
grpc_lb_policy *policy, grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connectivity_state *state, grpc_connectivity_state *state, grpc_closure *closure);
grpc_closure *closure);
grpc_connectivity_state grpc_lb_policy_check_connectivity( grpc_connectivity_state grpc_lb_policy_check_connectivity_locked(
grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_error **connectivity_error); grpc_error **connectivity_error);

@ -107,6 +107,7 @@ grpc_arg grpc_lb_addresses_create_channel_arg(
typedef struct grpc_lb_policy_args { typedef struct grpc_lb_policy_args {
grpc_client_channel_factory *client_channel_factory; grpc_client_channel_factory *client_channel_factory;
grpc_channel_args *args; grpc_channel_args *args;
grpc_combiner *combiner;
} grpc_lb_policy_args; } grpc_lb_policy_args;
struct grpc_lb_policy_factory_vtable { struct grpc_lb_policy_factory_vtable {

@ -438,7 +438,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&w->subchannel->mu); gpr_mu_unlock(&w->subchannel->mu);
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher");
gpr_free(w); gpr_free(w);
follow_up->cb(exec_ctx, follow_up->cb_arg, error); grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error));
} }
static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {

@ -115,6 +115,7 @@
#include "src/core/ext/lb_policy/grpclb/grpclb_channel.h" #include "src/core/ext/lb_policy/grpclb/grpclb_channel.h"
#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h" #include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
@ -285,9 +286,6 @@ typedef struct glb_lb_policy {
/** base policy: must be first */ /** base policy: must be first */
grpc_lb_policy base; grpc_lb_policy base;
/** mutex protecting remaining members */
gpr_mu mu;
/** who the client is trying to communicate with */ /** who the client is trying to communicate with */
const char *server_name; const char *server_name;
grpc_client_channel_factory *cc_factory; grpc_client_channel_factory *cc_factory;
@ -557,9 +555,9 @@ static bool pick_from_internal_rr_locked(
const grpc_lb_policy_pick_args *pick_args, const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
GPR_ASSERT(rr_policy != NULL); GPR_ASSERT(rr_policy != NULL);
const bool pick_done = const bool pick_done = grpc_lb_policy_pick_locked(
grpc_lb_policy_pick(exec_ctx, rr_policy, pick_args, target, exec_ctx, rr_policy, pick_args, target, (void **)&wc_arg->lb_token,
(void **)&wc_arg->lb_token, &wc_arg->wrapper_closure); &wc_arg->wrapper_closure);
if (pick_done) { if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
if (grpc_lb_glb_trace) { if (grpc_lb_glb_trace) {
@ -590,6 +588,7 @@ static grpc_lb_policy *create_rr_locked(
grpc_lb_policy_args args; grpc_lb_policy_args args;
memset(&args, 0, sizeof(args)); memset(&args, 0, sizeof(args));
args.client_channel_factory = glb_policy->cc_factory; args.client_channel_factory = glb_policy->cc_factory;
args.combiner = glb_policy->base.combiner;
grpc_lb_addresses *addresses = grpc_lb_addresses *addresses =
process_serverlist_locked(exec_ctx, serverlist); process_serverlist_locked(exec_ctx, serverlist);
@ -608,8 +607,8 @@ static grpc_lb_policy *create_rr_locked(
return rr; return rr;
} }
static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_error *error); void *arg, grpc_error *error);
/* glb_policy->rr_policy may be NULL (initial handover) */ /* glb_policy->rr_policy may be NULL (initial handover) */
static void rr_handover_locked(grpc_exec_ctx *exec_ctx, static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) { glb_lb_policy *glb_policy) {
@ -633,8 +632,8 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
grpc_error *new_rr_state_error = NULL; grpc_error *new_rr_state_error = NULL;
const grpc_connectivity_state new_rr_state = const grpc_connectivity_state new_rr_state =
grpc_lb_policy_check_connectivity(exec_ctx, new_rr_policy, grpc_lb_policy_check_connectivity_locked(exec_ctx, new_rr_policy,
&new_rr_state_error); &new_rr_state_error);
/* Connectivity state is a function of the new RR policy just created */ /* Connectivity state is a function of the new RR policy just created */
const bool replace_old_rr = update_lb_connectivity_status_locked( const bool replace_old_rr = update_lb_connectivity_status_locked(
exec_ctx, glb_policy, new_rr_state, new_rr_state_error); exec_ctx, glb_policy, new_rr_state, new_rr_state_error);
@ -677,17 +676,18 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
rr_connectivity_data *rr_connectivity = rr_connectivity_data *rr_connectivity =
gpr_malloc(sizeof(rr_connectivity_data)); gpr_malloc(sizeof(rr_connectivity_data));
memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed, grpc_closure_init(&rr_connectivity->on_change,
rr_connectivity, grpc_schedule_on_exec_ctx); glb_rr_connectivity_changed_locked, rr_connectivity,
grpc_combiner_scheduler(glb_policy->base.combiner, false));
rr_connectivity->glb_policy = glb_policy; rr_connectivity->glb_policy = glb_policy;
rr_connectivity->state = new_rr_state; rr_connectivity->state = new_rr_state;
/* Subscribe to changes to the connectivity of the new RR */ /* Subscribe to changes to the connectivity of the new RR */
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb"); GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb");
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
&rr_connectivity->state, &rr_connectivity->state,
&rr_connectivity->on_change); &rr_connectivity->on_change);
grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy); grpc_lb_policy_exit_idle_locked(exec_ctx, glb_policy->rr_policy);
/* Update picks and pings in wait */ /* Update picks and pings in wait */
pending_pick *pp; pending_pick *pp;
@ -713,17 +713,16 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "", gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy); (intptr_t)glb_policy->rr_policy);
} }
grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy,
&pping->wrapped_notify_arg.wrapper_closure); &pping->wrapped_notify_arg.wrapper_closure);
} }
} }
static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_error *error) { void *arg, grpc_error *error) {
rr_connectivity_data *rr_connectivity = arg; rr_connectivity_data *rr_connectivity = arg;
glb_lb_policy *glb_policy = rr_connectivity->glb_policy; glb_lb_policy *glb_policy = rr_connectivity->glb_policy;
gpr_mu_lock(&glb_policy->mu);
const bool shutting_down = glb_policy->shutting_down; const bool shutting_down = glb_policy->shutting_down;
bool unref_needed = false; bool unref_needed = false;
GRPC_ERROR_REF(error); GRPC_ERROR_REF(error);
@ -740,11 +739,10 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
update_lb_connectivity_status_locked(exec_ctx, glb_policy, update_lb_connectivity_status_locked(exec_ctx, glb_policy,
rr_connectivity->state, error); rr_connectivity->state, error);
/* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, grpc_lb_policy_notify_on_state_change_locked(
&rr_connectivity->state, exec_ctx, glb_policy->rr_policy, &rr_connectivity->state,
&rr_connectivity->on_change); &rr_connectivity->on_change);
} }
gpr_mu_unlock(&glb_policy->mu);
if (unref_needed) { if (unref_needed) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"rr_connectivity_cb"); "rr_connectivity_cb");
@ -899,8 +897,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
gpr_free(glb_policy); gpr_free(glb_policy);
return NULL; return NULL;
} }
grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable); grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
gpr_mu_init(&glb_policy->mu);
grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
"grpclb"); "grpclb");
return &glb_policy->base; return &glb_policy->base;
@ -918,13 +915,11 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
if (glb_policy->serverlist != NULL) { if (glb_policy->serverlist != NULL) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist); grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
} }
gpr_mu_destroy(&glb_policy->mu);
gpr_free(glb_policy); gpr_free(glb_policy);
} }
static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol; glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
gpr_mu_lock(&glb_policy->mu);
glb_policy->shutting_down = true; glb_policy->shutting_down = true;
pending_pick *pp = glb_policy->pending_picks; pending_pick *pp = glb_policy->pending_picks;
@ -941,7 +936,6 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
* while holding glb_policy->mu: lb_on_server_status_received, invoked due to * while holding glb_policy->mu: lb_on_server_status_received, invoked due to
* the cancel, needs to acquire that same lock */ * the cancel, needs to acquire that same lock */
grpc_call *lb_call = glb_policy->lb_call; grpc_call *lb_call = glb_policy->lb_call;
gpr_mu_unlock(&glb_policy->mu);
/* glb_policy->lb_call and this local lb_call must be consistent at this point /* glb_policy->lb_call and this local lb_call must be consistent at this point
* because glb_policy->lb_call is only assigned in lb_call_init_locked as part * because glb_policy->lb_call is only assigned in lb_call_init_locked as part
@ -967,11 +961,10 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
} }
} }
static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target, grpc_connected_subchannel **target,
grpc_error *error) { grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol; glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
gpr_mu_lock(&glb_policy->mu);
pending_pick *pp = glb_policy->pending_picks; pending_pick *pp = glb_policy->pending_picks;
glb_policy->pending_picks = NULL; glb_policy->pending_picks = NULL;
while (pp != NULL) { while (pp != NULL) {
@ -987,16 +980,15 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
} }
pp = next; pp = next;
} }
gpr_mu_unlock(&glb_policy->mu);
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
uint32_t initial_metadata_flags_mask, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_eq, uint32_t initial_metadata_flags_mask,
grpc_error *error) { uint32_t initial_metadata_flags_eq,
grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol; glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
gpr_mu_lock(&glb_policy->mu);
pending_pick *pp = glb_policy->pending_picks; pending_pick *pp = glb_policy->pending_picks;
glb_policy->pending_picks = NULL; glb_policy->pending_picks = NULL;
while (pp != NULL) { while (pp != NULL) {
@ -1012,7 +1004,6 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
} }
pp = next; pp = next;
} }
gpr_mu_unlock(&glb_policy->mu);
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
@ -1025,19 +1016,17 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx,
query_for_backends_locked(exec_ctx, glb_policy); query_for_backends_locked(exec_ctx, glb_policy);
} }
static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol; glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
gpr_mu_lock(&glb_policy->mu);
if (!glb_policy->started_picking) { if (!glb_policy->started_picking) {
start_picking_locked(exec_ctx, glb_policy); start_picking_locked(exec_ctx, glb_policy);
} }
gpr_mu_unlock(&glb_policy->mu);
} }
static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
const grpc_lb_policy_pick_args *pick_args, const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data, grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) { grpc_closure *on_complete) {
if (pick_args->lb_token_mdelem_storage == NULL) { if (pick_args->lb_token_mdelem_storage == NULL) {
*target = NULL; *target = NULL;
grpc_closure_sched( grpc_closure_sched(
@ -1048,7 +1037,6 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
} }
glb_lb_policy *glb_policy = (glb_lb_policy *)pol; glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
gpr_mu_lock(&glb_policy->mu);
glb_policy->deadline = pick_args->deadline; glb_policy->deadline = pick_args->deadline;
bool pick_done; bool pick_done;
@ -1087,53 +1075,43 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
} }
pick_done = false; pick_done = false;
} }
gpr_mu_unlock(&glb_policy->mu);
return pick_done; return pick_done;
} }
static grpc_connectivity_state glb_check_connectivity( static grpc_connectivity_state glb_check_connectivity_locked(
grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_error **connectivity_error) { grpc_error **connectivity_error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol; glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
grpc_connectivity_state st; return grpc_connectivity_state_get(&glb_policy->state_tracker,
gpr_mu_lock(&glb_policy->mu); connectivity_error);
st = grpc_connectivity_state_get(&glb_policy->state_tracker,
connectivity_error);
gpr_mu_unlock(&glb_policy->mu);
return st;
} }
static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void glb_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *closure) { grpc_closure *closure) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol; glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
gpr_mu_lock(&glb_policy->mu);
if (glb_policy->rr_policy) { if (glb_policy->rr_policy) {
grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, closure); grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, closure);
} else { } else {
add_pending_ping(&glb_policy->pending_pings, closure); add_pending_ping(&glb_policy->pending_pings, closure);
if (!glb_policy->started_picking) { if (!glb_policy->started_picking) {
start_picking_locked(exec_ctx, glb_policy); start_picking_locked(exec_ctx, glb_policy);
} }
} }
gpr_mu_unlock(&glb_policy->mu);
} }
static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx, static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *pol, grpc_lb_policy *pol,
grpc_connectivity_state *current, grpc_connectivity_state *current,
grpc_closure *notify) { grpc_closure *notify) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol; glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
gpr_mu_lock(&glb_policy->mu);
grpc_connectivity_state_notify_on_state_change( grpc_connectivity_state_notify_on_state_change(
exec_ctx, &glb_policy->state_tracker, current, notify); exec_ctx, &glb_policy->state_tracker, current, notify);
gpr_mu_unlock(&glb_policy->mu);
} }
static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
grpc_error *error); void *arg, grpc_error *error);
static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error); grpc_error *error);
static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) { glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->server_name != NULL); GPR_ASSERT(glb_policy->server_name != NULL);
@ -1162,11 +1140,11 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
grpc_grpclb_request_destroy(request); grpc_grpclb_request_destroy(request);
grpc_closure_init(&glb_policy->lb_on_server_status_received, grpc_closure_init(&glb_policy->lb_on_server_status_received,
lb_on_server_status_received, glb_policy, lb_on_server_status_received_locked, glb_policy,
grpc_schedule_on_exec_ctx); grpc_combiner_scheduler(glb_policy->base.combiner, false));
grpc_closure_init(&glb_policy->lb_on_response_received, grpc_closure_init(&glb_policy->lb_on_response_received,
lb_on_response_received, glb_policy, lb_on_response_received_locked, glb_policy,
grpc_schedule_on_exec_ctx); grpc_combiner_scheduler(glb_policy->base.combiner, false));
gpr_backoff_init(&glb_policy->lb_call_backoff_state, gpr_backoff_init(&glb_policy->lb_call_backoff_state,
GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS, GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS,
@ -1261,14 +1239,13 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(GRPC_CALL_OK == call_error); GPR_ASSERT(GRPC_CALL_OK == call_error);
} }
static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
glb_lb_policy *glb_policy = arg; glb_lb_policy *glb_policy = arg;
grpc_op ops[2]; grpc_op ops[2];
memset(ops, 0, sizeof(ops)); memset(ops, 0, sizeof(ops));
grpc_op *op = ops; grpc_op *op = ops;
gpr_mu_lock(&glb_policy->mu);
if (glb_policy->lb_response_payload != NULL) { if (glb_policy->lb_response_payload != NULL) {
gpr_backoff_reset(&glb_policy->lb_call_backoff_state); gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
/* Received data from the LB server. Look inside /* Received data from the LB server. Look inside
@ -1342,20 +1319,17 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
&glb_policy->lb_on_response_received); /* loop */ &glb_policy->lb_on_response_received); /* loop */
GPR_ASSERT(GRPC_CALL_OK == call_error); GPR_ASSERT(GRPC_CALL_OK == call_error);
} }
gpr_mu_unlock(&glb_policy->mu);
} else { /* empty payload: call cancelled. */ } else { /* empty payload: call cancelled. */
/* dispose of the "lb_on_response_received" weak ref taken in /* dispose of the "lb_on_response_received" weak ref taken in
* query_for_backends_locked() and reused in every reception loop */ * query_for_backends_locked() and reused in every reception loop */
gpr_mu_unlock(&glb_policy->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"lb_on_response_received_empty_payload"); "lb_on_response_received_empty_payload");
} }
} }
static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
glb_lb_policy *glb_policy = arg; glb_lb_policy *glb_policy = arg;
gpr_mu_lock(&glb_policy->mu);
if (!glb_policy->shutting_down) { if (!glb_policy->shutting_down) {
if (grpc_lb_glb_trace) { if (grpc_lb_glb_trace) {
@ -1365,15 +1339,13 @@ static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(glb_policy->lb_call == NULL); GPR_ASSERT(glb_policy->lb_call == NULL);
query_for_backends_locked(exec_ctx, glb_policy); query_for_backends_locked(exec_ctx, glb_policy);
} }
gpr_mu_unlock(&glb_policy->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"grpclb_on_retry_timer"); "grpclb_on_retry_timer");
} }
static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
grpc_error *error) { void *arg, grpc_error *error) {
glb_lb_policy *glb_policy = arg; glb_lb_policy *glb_policy = arg;
gpr_mu_lock(&glb_policy->mu);
GPR_ASSERT(glb_policy->lb_call != NULL); GPR_ASSERT(glb_policy->lb_call != NULL);
@ -1408,21 +1380,27 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
} }
} }
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
grpc_closure_init(&glb_policy->lb_on_call_retry, lb_call_on_retry_timer, grpc_closure_init(
glb_policy, grpc_schedule_on_exec_ctx); &glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked,
glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false));
grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try, grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
&glb_policy->lb_on_call_retry, now); &glb_policy->lb_on_call_retry, now);
} }
gpr_mu_unlock(&glb_policy->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"lb_on_server_status_received"); "lb_on_server_status_received");
} }
/* Code wiring the policy with the rest of the core */ /* Code wiring the policy with the rest of the core */
static const grpc_lb_policy_vtable glb_lb_policy_vtable = { static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_destroy, glb_shutdown, glb_pick, glb_destroy,
glb_cancel_pick, glb_cancel_picks, glb_ping_one, glb_shutdown_locked,
glb_exit_idle, glb_check_connectivity, glb_notify_on_state_change}; glb_pick_locked,
glb_cancel_pick_locked,
glb_cancel_picks_locked,
glb_ping_one_locked,
glb_exit_idle_locked,
glb_check_connectivity_locked,
glb_notify_on_state_change_locked};
static void glb_factory_ref(grpc_lb_policy_factory *factory) {} static void glb_factory_ref(grpc_lb_policy_factory *factory) {}

@ -38,6 +38,7 @@
#include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/subchannel.h" #include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/connectivity_state.h"
@ -57,11 +58,11 @@ typedef struct {
grpc_closure connectivity_changed; grpc_closure connectivity_changed;
/** the selected channel (a grpc_connected_subchannel) */ /** remaining members are protected by the combiner */
gpr_atm selected;
/** the selected channel */
grpc_connected_subchannel *selected;
/** mutex protecting remaining members */
gpr_mu mu;
/** have we started picking? */ /** have we started picking? */
int started_picking; int started_picking;
/** are we shut down? */ /** are we shut down? */
@ -77,32 +78,24 @@ typedef struct {
grpc_connectivity_state_tracker state_tracker; grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy; } pick_first_lb_policy;
#define GET_SELECTED(p) \
((grpc_connected_subchannel *)gpr_atm_acq_load(&(p)->selected))
static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_connected_subchannel *selected = GET_SELECTED(p);
size_t i; size_t i;
GPR_ASSERT(p->pending_picks == NULL); GPR_ASSERT(p->pending_picks == NULL);
for (i = 0; i < p->num_subchannels; i++) { for (i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first"); GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
} }
if (selected != NULL) { if (p->selected != NULL) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "picked_first"); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
} }
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels); gpr_free(p->subchannels);
gpr_mu_destroy(&p->mu);
gpr_free(p); gpr_free(p);
} }
static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp; pending_pick *pp;
grpc_connected_subchannel *selected;
gpr_mu_lock(&p->mu);
selected = GET_SELECTED(p);
p->shutdown = 1; p->shutdown = 1;
pp = p->pending_picks; pp = p->pending_picks;
p->pending_picks = NULL; p->pending_picks = NULL;
@ -110,15 +103,14 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE("Channel shutdown"), "shutdown"); GRPC_ERROR_CREATE("Channel shutdown"), "shutdown");
/* cancel subscription */ /* cancel subscription */
if (selected != NULL) { if (p->selected != NULL) {
grpc_connected_subchannel_notify_on_state_change( grpc_connected_subchannel_notify_on_state_change(
exec_ctx, selected, NULL, NULL, &p->connectivity_changed); exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
} else if (p->num_subchannels > 0) { } else if (p->num_subchannels > 0) {
grpc_subchannel_notify_on_state_change( grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
&p->connectivity_changed); &p->connectivity_changed);
} }
gpr_mu_unlock(&p->mu);
while (pp != NULL) { while (pp != NULL) {
pending_pick *next = pp->next; pending_pick *next = pp->next;
*pp->target = NULL; *pp->target = NULL;
@ -128,12 +120,11 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
} }
} }
static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target, grpc_connected_subchannel **target,
grpc_error *error) { grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp; pending_pick *pp;
gpr_mu_lock(&p->mu);
pp = p->pending_picks; pp = p->pending_picks;
p->pending_picks = NULL; p->pending_picks = NULL;
while (pp != NULL) { while (pp != NULL) {
@ -150,17 +141,15 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
} }
pp = next; pp = next;
} }
gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq, uint32_t initial_metadata_flags_eq,
grpc_error *error) { grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp; pending_pick *pp;
gpr_mu_lock(&p->mu);
pp = p->pending_picks; pp = p->pending_picks;
p->pending_picks = NULL; p->pending_picks = NULL;
while (pp != NULL) { while (pp != NULL) {
@ -177,7 +166,6 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
} }
pp = next; pp = next;
} }
gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
@ -192,63 +180,48 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
&p->connectivity_changed); &p->connectivity_changed);
} }
static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
gpr_mu_lock(&p->mu);
if (!p->started_picking) { if (!p->started_picking) {
start_picking(exec_ctx, p); start_picking(exec_ctx, p);
} }
gpr_mu_unlock(&p->mu);
} }
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
const grpc_lb_policy_pick_args *pick_args, const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data, grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) { grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp; pending_pick *pp;
/* Check atomically for a selected channel */ /* Check atomically for a selected channel */
grpc_connected_subchannel *selected = GET_SELECTED(p); if (p->selected != NULL) {
if (selected != NULL) { *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked");
*target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
return 1; return 1;
} }
/* No subchannel selected yet, so acquire lock and then attempt again */ /* No subchannel selected yet, so try again */
gpr_mu_lock(&p->mu); if (!p->started_picking) {
selected = GET_SELECTED(p); start_picking(exec_ctx, p);
if (selected) {
gpr_mu_unlock(&p->mu);
*target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
return 1;
} else {
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->target = target;
pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0;
} }
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->target = target;
pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->on_complete = on_complete;
p->pending_picks = pp;
return 0;
} }
static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx,
grpc_error *error) { pick_first_lb_policy *p) {
pick_first_lb_policy *p = arg;
size_t i; size_t i;
size_t num_subchannels = p->num_subchannels; size_t num_subchannels = p->num_subchannels;
grpc_subchannel **subchannels; grpc_subchannel **subchannels;
gpr_mu_lock(&p->mu);
subchannels = p->subchannels; subchannels = p->subchannels;
p->num_subchannels = 0; p->num_subchannels = 0;
p->subchannels = NULL; p->subchannels = NULL;
gpr_mu_unlock(&p->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels");
for (i = 0; i < num_subchannels; i++) { for (i = 0; i < num_subchannels; i++) {
@ -258,25 +231,19 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(subchannels); gpr_free(subchannels);
} }
static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
pick_first_lb_policy *p = arg; pick_first_lb_policy *p = arg;
grpc_subchannel *selected_subchannel; grpc_subchannel *selected_subchannel;
pending_pick *pp; pending_pick *pp;
grpc_connected_subchannel *selected;
GRPC_ERROR_REF(error); GRPC_ERROR_REF(error);
gpr_mu_lock(&p->mu);
selected = GET_SELECTED(p);
if (p->shutdown) { if (p->shutdown) {
gpr_mu_unlock(&p->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
return; return;
} else if (selected != NULL) { } else if (p->selected != NULL) {
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
/* if the selected channel goes bad, we're done */ /* if the selected channel goes bad, we're done */
p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN; p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN;
@ -286,7 +253,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"selected_changed"); "selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) { if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) {
grpc_connected_subchannel_notify_on_state_change( grpc_connected_subchannel_notify_on_state_change(
exec_ctx, selected, p->base.interested_parties, exec_ctx, p->selected, p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed); &p->checking_connectivity, &p->connectivity_changed);
} else { } else {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
@ -301,26 +268,21 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_CHANNEL_READY, GRPC_ERROR_NONE, GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
"connecting_ready"); "connecting_ready");
selected_subchannel = p->subchannels[p->checking_subchannel]; selected_subchannel = p->subchannels[p->checking_subchannel];
selected = p->selected = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected_subchannel); grpc_subchannel_get_connected_subchannel(selected_subchannel),
GPR_ASSERT(selected != NULL); "picked_first");
GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first");
/* drop the pick list: we are connected now */ /* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
gpr_atm_rel_store(&p->selected, (gpr_atm)selected); destroy_subchannels_locked(exec_ctx, p);
grpc_closure_sched(exec_ctx,
grpc_closure_create(destroy_subchannels, p,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
/* update any calls that were waiting for a pick */ /* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) { while ((pp = p->pending_picks)) {
p->pending_picks = pp->next; p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked"); *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked");
grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp); gpr_free(pp);
} }
grpc_connected_subchannel_notify_on_state_change( grpc_connected_subchannel_notify_on_state_change(
exec_ctx, selected, p->base.interested_parties, exec_ctx, p->selected, p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed); &p->checking_connectivity, &p->connectivity_changed);
break; break;
case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_TRANSIENT_FAILURE:
@ -387,48 +349,44 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
} }
} }
gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx, static grpc_connectivity_state pf_check_connectivity_locked(
grpc_lb_policy *pol, grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) {
grpc_error **error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_connectivity_state st; return grpc_connectivity_state_get(&p->state_tracker, error);
gpr_mu_lock(&p->mu);
st = grpc_connectivity_state_get(&p->state_tracker, error);
gpr_mu_unlock(&p->mu);
return st;
} }
static void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *pol, grpc_lb_policy *pol,
grpc_connectivity_state *current, grpc_connectivity_state *current,
grpc_closure *notify) { grpc_closure *notify) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
gpr_mu_lock(&p->mu);
grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
current, notify); current, notify);
gpr_mu_unlock(&p->mu);
} }
static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *closure) { grpc_closure *closure) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_connected_subchannel *selected = GET_SELECTED(p); if (p->selected) {
if (selected) { grpc_connected_subchannel_ping(exec_ctx, p->selected, closure);
grpc_connected_subchannel_ping(exec_ctx, selected, closure);
} else { } else {
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected")); grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"));
} }
} }
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy, pf_shutdown, pf_pick, pf_destroy,
pf_cancel_pick, pf_cancel_picks, pf_ping_one, pf_shutdown_locked,
pf_exit_idle, pf_check_connectivity, pf_notify_on_state_change}; pf_pick_locked,
pf_cancel_pick_locked,
pf_cancel_picks_locked,
pf_ping_one_locked,
pf_exit_idle_locked,
pf_check_connectivity_locked,
pf_notify_on_state_change_locked};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
@ -489,10 +447,9 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
} }
p->num_subchannels = subchannel_idx; p->num_subchannels = subchannel_idx;
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p, grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p,
grpc_schedule_on_exec_ctx); grpc_combiner_scheduler(args->combiner, false));
gpr_mu_init(&p->mu);
return &p->base; return &p->base;
} }

@ -67,6 +67,7 @@
#include "src/core/ext/client_channel/subchannel.h" #include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/static_metadata.h"
@ -134,7 +135,6 @@ typedef struct {
struct round_robin_lb_policy { struct round_robin_lb_policy {
/** base policy: must be first */ /** base policy: must be first */
grpc_lb_policy base; grpc_lb_policy base;
gpr_mu mu;
/** total number of addresses received at creation time */ /** total number of addresses received at creation time */
size_t num_addresses; size_t num_addresses;
@ -293,7 +293,6 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels); gpr_free(p->subchannels);
gpr_mu_destroy(&p->mu);
elem = p->ready_list.next; elem = p->ready_list.next;
while (elem != NULL && elem != &p->ready_list) { while (elem != NULL && elem != &p->ready_list) {
@ -309,12 +308,11 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_free(p); gpr_free(p);
} }
static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp; pending_pick *pp;
size_t i; size_t i;
gpr_mu_lock(&p->mu);
if (grpc_lb_round_robin_trace) { if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol); gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol);
} }
@ -335,15 +333,13 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL,
&sd->connectivity_changed_closure); &sd->connectivity_changed_closure);
} }
gpr_mu_unlock(&p->mu);
} }
static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target, grpc_connected_subchannel **target,
grpc_error *error) { grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp; pending_pick *pp;
gpr_mu_lock(&p->mu);
pp = p->pending_picks; pp = p->pending_picks;
p->pending_picks = NULL; p->pending_picks = NULL;
while (pp != NULL) { while (pp != NULL) {
@ -360,17 +356,15 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
} }
pp = next; pp = next;
} }
gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq, uint32_t initial_metadata_flags_eq,
grpc_error *error) { grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp; pending_pick *pp;
gpr_mu_lock(&p->mu);
pp = p->pending_picks; pp = p->pending_picks;
p->pending_picks = NULL; p->pending_picks = NULL;
while (pp != NULL) { while (pp != NULL) {
@ -388,11 +382,11 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
} }
pp = next; pp = next;
} }
gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { static void start_picking_locked(grpc_exec_ctx *exec_ctx,
round_robin_lb_policy *p) {
size_t i; size_t i;
p->started_picking = 1; p->started_picking = 1;
@ -411,23 +405,20 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
} }
} }
static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static void rr_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
gpr_mu_lock(&p->mu);
if (!p->started_picking) { if (!p->started_picking) {
start_picking(exec_ctx, p); start_picking_locked(exec_ctx, p);
} }
gpr_mu_unlock(&p->mu);
} }
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
const grpc_lb_policy_pick_args *pick_args, const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data, grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) { grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp; pending_pick *pp;
ready_list *selected; ready_list *selected;
gpr_mu_lock(&p->mu);
if (grpc_lb_round_robin_trace) { if (grpc_lb_round_robin_trace) {
gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol);
@ -449,12 +440,11 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
} }
/* only advance the last picked pointer if the selection was used */ /* only advance the last picked pointer if the selection was used */
advance_last_picked_locked(p); advance_last_picked_locked(p);
gpr_mu_unlock(&p->mu);
return 1; return 1;
} else { } else {
/* no pick currently available. Save for later in list of pending picks */ /* no pick currently available. Save for later in list of pending picks */
if (!p->started_picking) { if (!p->started_picking) {
start_picking(exec_ctx, p); start_picking_locked(exec_ctx, p);
} }
pp = gpr_malloc(sizeof(*pp)); pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks; pp->next = p->pending_picks;
@ -463,7 +453,6 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->initial_metadata_flags = pick_args->initial_metadata_flags; pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->user_data = user_data; pp->user_data = user_data;
p->pending_picks = pp; p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0; return 0;
} }
} }
@ -538,17 +527,15 @@ static grpc_connectivity_state update_lb_connectivity_status(
return sd->curr_connectivity_state; return sd->curr_connectivity_state;
} }
static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
subchannel_data *sd = arg; subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->policy; round_robin_lb_policy *p = sd->policy;
pending_pick *pp; pending_pick *pp;
GRPC_ERROR_REF(error); GRPC_ERROR_REF(error);
gpr_mu_lock(&p->mu);
if (p->shutdown) { if (p->shutdown) {
gpr_mu_unlock(&p->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
return; return;
@ -645,56 +632,51 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
break; break;
} }
gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx, static grpc_connectivity_state rr_check_connectivity_locked(
grpc_lb_policy *pol, grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) {
grpc_error **error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
grpc_connectivity_state st; return grpc_connectivity_state_get(&p->state_tracker, error);
gpr_mu_lock(&p->mu);
st = grpc_connectivity_state_get(&p->state_tracker, error);
gpr_mu_unlock(&p->mu);
return st;
} }
static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx, static void rr_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *pol, grpc_lb_policy *pol,
grpc_connectivity_state *current, grpc_connectivity_state *current,
grpc_closure *notify) { grpc_closure *notify) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
gpr_mu_lock(&p->mu);
grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
current, notify); current, notify);
gpr_mu_unlock(&p->mu);
} }
static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *closure) { grpc_closure *closure) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
ready_list *selected; ready_list *selected;
grpc_connected_subchannel *target; grpc_connected_subchannel *target;
gpr_mu_lock(&p->mu);
if ((selected = peek_next_connected_locked(p))) { if ((selected = peek_next_connected_locked(p))) {
gpr_mu_unlock(&p->mu);
target = GRPC_CONNECTED_SUBCHANNEL_REF( target = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected->subchannel), grpc_subchannel_get_connected_subchannel(selected->subchannel),
"rr_picked"); "rr_picked");
grpc_connected_subchannel_ping(exec_ctx, target, closure); grpc_connected_subchannel_ping(exec_ctx, target, closure);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked"); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked");
} else { } else {
gpr_mu_unlock(&p->mu);
grpc_closure_sched(exec_ctx, closure, grpc_closure_sched(exec_ctx, closure,
GRPC_ERROR_CREATE("Round Robin not connected")); GRPC_ERROR_CREATE("Round Robin not connected"));
} }
} }
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy, rr_shutdown, rr_pick, rr_destroy,
rr_cancel_pick, rr_cancel_picks, rr_ping_one, rr_shutdown_locked,
rr_exit_idle, rr_check_connectivity, rr_notify_on_state_change}; rr_pick_locked,
rr_cancel_pick_locked,
rr_cancel_picks_locked,
rr_ping_one_locked,
rr_exit_idle_locked,
rr_check_connectivity_locked,
rr_notify_on_state_change_locked};
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
@ -762,7 +744,8 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
} }
++subchannel_idx; ++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure, grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed, sd, grpc_schedule_on_exec_ctx); rr_connectivity_changed_locked, sd,
grpc_combiner_scheduler(args->combiner, false));
} }
} }
if (subchannel_idx == 0) { if (subchannel_idx == 0) {
@ -779,7 +762,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
p->ready_list.next = NULL; p->ready_list.next = NULL;
p->ready_list_last_pick = &p->ready_list; p->ready_list_last_pick = &p->ready_list;
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin"); "round_robin");
@ -787,7 +770,6 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels", gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels",
(void *)p, (unsigned long)p->num_subchannels); (void *)p, (unsigned long)p->num_subchannels);
} }
gpr_mu_init(&p->mu);
return &p->base; return &p->base;
} }

Loading…
Cancel
Save