Merge pull request #13985 from markdroth/lb_policy_ref_simplification

Simply LB policy refcounting, again.
pull/13240/merge
Mark D. Roth 7 years ago committed by GitHub
commit 31e99d0788
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 59
      src/core/ext/filters/client_channel/client_channel.cc
  2. 95
      src/core/ext/filters/client_channel/lb_policy.cc
  3. 90
      src/core/ext/filters/client_channel/lb_policy.h
  4. 601
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  5. 91
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  6. 124
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  7. 4
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc

@ -553,6 +553,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
} }
grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
chand->interested_parties); chand->interested_parties);
grpc_lb_policy_shutdown_locked(chand->lb_policy, new_lb_policy);
GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
} }
chand->lb_policy = new_lb_policy; chand->lb_policy = new_lb_policy;
@ -658,6 +659,7 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
if (chand->lb_policy != nullptr) { if (chand->lb_policy != nullptr) {
grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
chand->interested_parties); chand->interested_parties);
grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr);
GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
chand->lb_policy = nullptr; chand->lb_policy = nullptr;
} }
@ -792,6 +794,7 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
if (chand->lb_policy != nullptr) { if (chand->lb_policy != nullptr) {
grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
chand->interested_parties); chand->interested_parties);
grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr);
GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
} }
gpr_free(chand->info_lb_policy_name); gpr_free(chand->info_lb_policy_name);
@ -852,12 +855,10 @@ typedef struct client_channel_call_data {
grpc_subchannel_call* subchannel_call; grpc_subchannel_call* subchannel_call;
grpc_error* error; grpc_error* error;
grpc_lb_policy* lb_policy; // Holds ref while LB pick is pending. grpc_lb_policy_pick_state pick;
grpc_closure lb_pick_closure; grpc_closure lb_pick_closure;
grpc_closure lb_pick_cancel_closure; grpc_closure lb_pick_cancel_closure;
grpc_connected_subchannel* connected_subchannel;
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
grpc_polling_entity* pollent; grpc_polling_entity* pollent;
grpc_transport_stream_op_batch* waiting_for_pick_batches[MAX_WAITING_BATCHES]; grpc_transport_stream_op_batch* waiting_for_pick_batches[MAX_WAITING_BATCHES];
@ -866,8 +867,6 @@ typedef struct client_channel_call_data {
grpc_transport_stream_op_batch* initial_metadata_batch; grpc_transport_stream_op_batch* initial_metadata_batch;
grpc_linked_mdelem lb_token_mdelem;
grpc_closure on_complete; grpc_closure on_complete;
grpc_closure* original_on_complete; grpc_closure* original_on_complete;
} call_data; } call_data;
@ -1010,11 +1009,11 @@ static void create_subchannel_call_locked(grpc_call_element* elem,
calld->call_start_time, // start_time calld->call_start_time, // start_time
calld->deadline, // deadline calld->deadline, // deadline
calld->arena, // arena calld->arena, // arena
calld->subchannel_call_context, // context calld->pick.subchannel_call_context, // context
calld->call_combiner // call_combiner calld->call_combiner // call_combiner
}; };
grpc_error* new_error = grpc_connected_subchannel_create_call( grpc_error* new_error = grpc_connected_subchannel_create_call(
calld->connected_subchannel, &call_args, &calld->subchannel_call); calld->pick.connected_subchannel, &call_args, &calld->subchannel_call);
if (grpc_client_channel_trace.enabled()) { if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
chand, calld, calld->subchannel_call, grpc_error_string(new_error)); chand, calld, calld->subchannel_call, grpc_error_string(new_error));
@ -1032,7 +1031,7 @@ static void create_subchannel_call_locked(grpc_call_element* elem,
static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { static void pick_done_locked(grpc_call_element* elem, grpc_error* error) {
call_data* calld = (call_data*)elem->call_data; call_data* calld = (call_data*)elem->call_data;
channel_data* chand = (channel_data*)elem->channel_data; channel_data* chand = (channel_data*)elem->channel_data;
if (calld->connected_subchannel == nullptr) { if (calld->pick.connected_subchannel == nullptr) {
// Failed to create subchannel. // Failed to create subchannel.
GRPC_ERROR_UNREF(calld->error); GRPC_ERROR_UNREF(calld->error);
calld->error = error == GRPC_ERROR_NONE calld->error = error == GRPC_ERROR_NONE
@ -1071,13 +1070,16 @@ static void pick_callback_cancel_locked(void* arg, grpc_error* error) {
grpc_call_element* elem = (grpc_call_element*)arg; grpc_call_element* elem = (grpc_call_element*)arg;
channel_data* chand = (channel_data*)elem->channel_data; channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data; call_data* calld = (call_data*)elem->call_data;
if (calld->lb_policy != nullptr) { // Note: chand->lb_policy may have changed since we started our pick,
// in which case we will be cancelling the pick on a policy other than
// the one we started it on. However, this will just be a no-op.
if (error != GRPC_ERROR_NONE && chand->lb_policy != nullptr) {
if (grpc_client_channel_trace.enabled()) { if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
chand, calld, calld->lb_policy); chand, calld, chand->lb_policy);
} }
grpc_lb_policy_cancel_pick_locked( grpc_lb_policy_cancel_pick_locked(chand->lb_policy, &calld->pick,
calld->lb_policy, &calld->connected_subchannel, GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
} }
GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
} }
@ -1092,9 +1094,6 @@ static void pick_callback_done_locked(void* arg, grpc_error* error) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously", gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously",
chand, calld); chand, calld);
} }
GPR_ASSERT(calld->lb_policy != nullptr);
GRPC_LB_POLICY_UNREF(calld->lb_policy, "pick_subchannel");
calld->lb_policy = nullptr;
async_pick_done_locked(elem, GRPC_ERROR_REF(error)); async_pick_done_locked(elem, GRPC_ERROR_REF(error));
} }
@ -1128,26 +1127,21 @@ static bool pick_callback_start_locked(grpc_call_element* elem) {
initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
} }
} }
const grpc_lb_policy_pick_args inputs = { calld->pick.initial_metadata =
calld->initial_metadata_batch->payload->send_initial_metadata calld->initial_metadata_batch->payload->send_initial_metadata
.send_initial_metadata, .send_initial_metadata;
initial_metadata_flags, &calld->lb_token_mdelem}; calld->pick.initial_metadata_flags = initial_metadata_flags;
// Keep a ref to the LB policy in calld while the pick is pending.
GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel");
calld->lb_policy = chand->lb_policy;
GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem, GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem,
grpc_combiner_scheduler(chand->combiner)); grpc_combiner_scheduler(chand->combiner));
const bool pick_done = grpc_lb_policy_pick_locked( calld->pick.on_complete = &calld->lb_pick_closure;
chand->lb_policy, &inputs, &calld->connected_subchannel, const bool pick_done =
calld->subchannel_call_context, nullptr, &calld->lb_pick_closure); grpc_lb_policy_pick_locked(chand->lb_policy, &calld->pick);
if (pick_done) { if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
if (grpc_client_channel_trace.enabled()) { if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously", gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously",
chand, calld); chand, calld);
} }
GRPC_LB_POLICY_UNREF(calld->lb_policy, "pick_subchannel");
calld->lb_policy = nullptr;
} else { } else {
GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
grpc_call_combiner_set_notify_on_cancel( grpc_call_combiner_set_notify_on_cancel(
@ -1289,7 +1283,7 @@ static void start_pick_locked(void* arg, grpc_error* ignored) {
grpc_call_element* elem = (grpc_call_element*)arg; grpc_call_element* elem = (grpc_call_element*)arg;
call_data* calld = (call_data*)elem->call_data; call_data* calld = (call_data*)elem->call_data;
channel_data* chand = (channel_data*)elem->channel_data; channel_data* chand = (channel_data*)elem->channel_data;
GPR_ASSERT(calld->connected_subchannel == nullptr); GPR_ASSERT(calld->pick.connected_subchannel == nullptr);
if (chand->lb_policy != nullptr) { if (chand->lb_policy != nullptr) {
// We already have an LB policy, so ask it for a pick. // We already have an LB policy, so ask it for a pick.
if (pick_callback_start_locked(elem)) { if (pick_callback_start_locked(elem)) {
@ -1467,15 +1461,14 @@ static void cc_destroy_call_elem(grpc_call_element* elem,
GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
"client_channel_destroy_call"); "client_channel_destroy_call");
} }
GPR_ASSERT(calld->lb_policy == nullptr);
GPR_ASSERT(calld->waiting_for_pick_batches_count == 0); GPR_ASSERT(calld->waiting_for_pick_batches_count == 0);
if (calld->connected_subchannel != nullptr) { if (calld->pick.connected_subchannel != nullptr) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(calld->connected_subchannel, "picked"); GRPC_CONNECTED_SUBCHANNEL_UNREF(calld->pick.connected_subchannel, "picked");
} }
for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
if (calld->subchannel_call_context[i].value != nullptr) { if (calld->pick.subchannel_call_context[i].value != nullptr) {
calld->subchannel_call_context[i].destroy( calld->pick.subchannel_call_context[i].destroy(
calld->subchannel_call_context[i].value); calld->pick.subchannel_call_context[i].value);
} }
} }
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);

@ -19,8 +19,6 @@
#include "src/core/ext/filters/client_channel/lb_policy.h" #include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"
#define WEAK_REF_BITS 16
grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount( grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount(
false, "lb_policy_refcount"); false, "lb_policy_refcount");
@ -28,91 +26,60 @@ void grpc_lb_policy_init(grpc_lb_policy* policy,
const grpc_lb_policy_vtable* vtable, const grpc_lb_policy_vtable* vtable,
grpc_combiner* combiner) { grpc_combiner* combiner) {
policy->vtable = vtable; policy->vtable = vtable;
gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS); gpr_ref_init(&policy->refs, 1);
policy->interested_parties = grpc_pollset_set_create(); policy->interested_parties = grpc_pollset_set_create();
policy->combiner = GRPC_COMBINER_REF(combiner, "lb_policy"); policy->combiner = GRPC_COMBINER_REF(combiner, "lb_policy");
} }
#ifndef NDEBUG #ifndef NDEBUG
#define REF_FUNC_EXTRA_ARGS , const char *file, int line, const char *reason void grpc_lb_policy_ref(grpc_lb_policy* lb_policy, const char* file, int line,
#define REF_MUTATE_EXTRA_ARGS REF_FUNC_EXTRA_ARGS, const char* purpose const char* reason) {
#define REF_FUNC_PASS_ARGS(new_reason) , file, line, new_reason if (grpc_trace_lb_policy_refcount.enabled()) {
#define REF_MUTATE_PASS_ARGS(purpose) , file, line, reason, purpose gpr_atm old_refs = gpr_atm_no_barrier_load(&lb_policy->refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"LB_POLICY:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", lb_policy,
old_refs, old_refs + 1, reason);
}
#else #else
#define REF_FUNC_EXTRA_ARGS void grpc_lb_policy_ref(grpc_lb_policy* lb_policy) {
#define REF_MUTATE_EXTRA_ARGS
#define REF_FUNC_PASS_ARGS(new_reason)
#define REF_MUTATE_PASS_ARGS(x)
#endif #endif
gpr_ref(&lb_policy->refs);
}
static gpr_atm ref_mutate(grpc_lb_policy* c, gpr_atm delta,
int barrier REF_MUTATE_EXTRA_ARGS) {
gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta)
: gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
#ifndef NDEBUG #ifndef NDEBUG
void grpc_lb_policy_unref(grpc_lb_policy* lb_policy, const char* file, int line,
const char* reason) {
if (grpc_trace_lb_policy_refcount.enabled()) { if (grpc_trace_lb_policy_refcount.enabled()) {
gpr_atm old_refs = gpr_atm_no_barrier_load(&lb_policy->refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"LB_POLICY: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", c, "LB_POLICY:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", lb_policy,
purpose, old_val, old_val + delta, reason); old_refs, old_refs - 1, reason);
} }
#else
void grpc_lb_policy_unref(grpc_lb_policy* lb_policy) {
#endif #endif
return old_val; if (gpr_unref(&lb_policy->refs)) {
} grpc_pollset_set_destroy(lb_policy->interested_parties);
grpc_combiner* combiner = lb_policy->combiner;
void grpc_lb_policy_ref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) { lb_policy->vtable->destroy(lb_policy);
ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF")); GRPC_COMBINER_UNREF(combiner, "lb_policy");
}
static void shutdown_locked(void* arg, grpc_error* error) {
grpc_lb_policy* policy = (grpc_lb_policy*)arg;
policy->vtable->shutdown_locked(policy);
GRPC_LB_POLICY_WEAK_UNREF(policy, "strong-unref");
}
void grpc_lb_policy_unref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) {
gpr_atm old_val =
ref_mutate(policy, (gpr_atm)1 - (gpr_atm)(1 << WEAK_REF_BITS),
1 REF_MUTATE_PASS_ARGS("STRONG_UNREF"));
gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1);
gpr_atm check = 1 << WEAK_REF_BITS;
if ((old_val & mask) == check) {
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE(shutdown_locked, policy,
grpc_combiner_scheduler(policy->combiner)),
GRPC_ERROR_NONE);
} else {
grpc_lb_policy_weak_unref(policy REF_FUNC_PASS_ARGS("strong-unref"));
} }
} }
void grpc_lb_policy_weak_ref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) { void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy,
ref_mutate(policy, 1, 0 REF_MUTATE_PASS_ARGS("WEAK_REF")); grpc_lb_policy* new_policy) {
} policy->vtable->shutdown_locked(policy, new_policy);
void grpc_lb_policy_weak_unref(grpc_lb_policy* policy REF_FUNC_EXTRA_ARGS) {
gpr_atm old_val =
ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF"));
if (old_val == 1) {
grpc_pollset_set_destroy(policy->interested_parties);
grpc_combiner* combiner = policy->combiner;
policy->vtable->destroy(policy);
GRPC_COMBINER_UNREF(combiner, "lb_policy");
}
} }
int grpc_lb_policy_pick_locked(grpc_lb_policy* policy, int grpc_lb_policy_pick_locked(grpc_lb_policy* policy,
const grpc_lb_policy_pick_args* pick_args, grpc_lb_policy_pick_state* pick) {
grpc_connected_subchannel** target, return policy->vtable->pick_locked(policy, pick);
grpc_call_context_element* context,
void** user_data, grpc_closure* on_complete) {
return policy->vtable->pick_locked(policy, pick_args, target, context,
user_data, on_complete);
} }
void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy, void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy,
grpc_connected_subchannel** target, grpc_lb_policy_pick_state* pick,
grpc_error* error) { grpc_error* error) {
policy->vtable->cancel_pick_locked(policy, target, error); policy->vtable->cancel_pick_locked(policy, pick, error);
} }
void grpc_lb_policy_cancel_picks_locked(grpc_lb_policy* policy, void grpc_lb_policy_cancel_picks_locked(grpc_lb_policy* policy,

@ -33,7 +33,7 @@ extern grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount;
struct grpc_lb_policy { struct grpc_lb_policy {
const grpc_lb_policy_vtable* vtable; const grpc_lb_policy_vtable* vtable;
gpr_atm ref_pair; gpr_refcount refs;
/* owned pointer to interested parties in load balancing decisions */ /* owned pointer to interested parties in load balancing decisions */
grpc_pollset_set* interested_parties; grpc_pollset_set* interested_parties;
/* combiner under which lb_policy actions take place */ /* combiner under which lb_policy actions take place */
@ -42,32 +42,42 @@ struct grpc_lb_policy {
grpc_closure* request_reresolution; grpc_closure* request_reresolution;
}; };
/** Extra arguments for an LB pick */ /// State used for an LB pick.
typedef struct grpc_lb_policy_pick_args { typedef struct grpc_lb_policy_pick_state {
/** Initial metadata associated with the picking call. */ /// Initial metadata associated with the picking call.
grpc_metadata_batch* initial_metadata; grpc_metadata_batch* initial_metadata;
/** Bitmask used for selective cancelling. See \a /// Bitmask used for selective cancelling. See \a
* grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in /// grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in
* grpc_types.h */ /// grpc_types.h.
uint32_t initial_metadata_flags; uint32_t initial_metadata_flags;
/** Storage for LB token in \a initial_metadata, or NULL if not used */ /// Storage for LB token in \a initial_metadata, or NULL if not used.
grpc_linked_mdelem* lb_token_mdelem_storage; grpc_linked_mdelem lb_token_mdelem_storage;
} grpc_lb_policy_pick_args; /// Closure to run when pick is complete, if not completed synchronously.
grpc_closure* on_complete;
/// Will be set to the selected subchannel, or NULL on failure or when
/// the LB policy decides to drop the call.
grpc_connected_subchannel* connected_subchannel;
/// Will be populated with context to pass to the subchannel call, if needed.
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
/// Upon success, \a *user_data will be set to whatever opaque information
/// may need to be propagated from the LB policy, or NULL if not needed.
void** user_data;
/// Next pointer. For internal use by LB policy.
struct grpc_lb_policy_pick_state* next;
} grpc_lb_policy_pick_state;
struct grpc_lb_policy_vtable { struct grpc_lb_policy_vtable {
void (*destroy)(grpc_lb_policy* policy); void (*destroy)(grpc_lb_policy* policy);
void (*shutdown_locked)(grpc_lb_policy* policy);
/// \see grpc_lb_policy_shutdown_locked().
void (*shutdown_locked)(grpc_lb_policy* policy, grpc_lb_policy* new_policy);
/** \see grpc_lb_policy_pick */ /** \see grpc_lb_policy_pick */
int (*pick_locked)(grpc_lb_policy* policy, int (*pick_locked)(grpc_lb_policy* policy, grpc_lb_policy_pick_state* pick);
const grpc_lb_policy_pick_args* pick_args,
grpc_connected_subchannel** target,
grpc_call_context_element* context, void** user_data,
grpc_closure* on_complete);
/** \see grpc_lb_policy_cancel_pick */ /** \see grpc_lb_policy_cancel_pick */
void (*cancel_pick_locked)(grpc_lb_policy* policy, void (*cancel_pick_locked)(grpc_lb_policy* policy,
grpc_connected_subchannel** target, grpc_lb_policy_pick_state* pick,
grpc_error* error); grpc_error* error);
/** \see grpc_lb_policy_cancel_picks */ /** \see grpc_lb_policy_cancel_picks */
@ -103,37 +113,19 @@ struct grpc_lb_policy_vtable {
}; };
#ifndef NDEBUG #ifndef NDEBUG
/* Strong references: the policy will shutdown when they reach zero */
#define GRPC_LB_POLICY_REF(p, r) \ #define GRPC_LB_POLICY_REF(p, r) \
grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) grpc_lb_policy_ref((p), __FILE__, __LINE__, (r))
#define GRPC_LB_POLICY_UNREF(p, r) \ #define GRPC_LB_POLICY_UNREF(p, r) \
grpc_lb_policy_unref((p), __FILE__, __LINE__, (r)) grpc_lb_policy_unref((p), __FILE__, __LINE__, (r))
/* Weak references: they don't prevent the shutdown of the LB policy. When no
* strong references are left but there are still weak ones, shutdown is called.
* Once the weak reference also reaches zero, the LB policy is destroyed. */
#define GRPC_LB_POLICY_WEAK_REF(p, r) \
grpc_lb_policy_weak_ref((p), __FILE__, __LINE__, (r))
#define GRPC_LB_POLICY_WEAK_UNREF(p, r) \
grpc_lb_policy_weak_unref((p), __FILE__, __LINE__, (r))
void grpc_lb_policy_ref(grpc_lb_policy* policy, const char* file, int line, void grpc_lb_policy_ref(grpc_lb_policy* policy, const char* file, int line,
const char* reason); const char* reason);
void grpc_lb_policy_unref(grpc_lb_policy* policy, const char* file, int line, void grpc_lb_policy_unref(grpc_lb_policy* policy, const char* file, int line,
const char* reason); const char* reason);
void grpc_lb_policy_weak_ref(grpc_lb_policy* policy, const char* file, int line, #else // !NDEBUG
const char* reason);
void grpc_lb_policy_weak_unref(grpc_lb_policy* policy, const char* file,
int line, const char* reason);
#else
#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p)) #define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p))
#define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p)) #define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p))
#define GRPC_LB_POLICY_WEAK_REF(p, r) grpc_lb_policy_weak_ref((p))
#define GRPC_LB_POLICY_WEAK_UNREF(p, r) grpc_lb_policy_weak_unref((p))
void grpc_lb_policy_ref(grpc_lb_policy* policy); void grpc_lb_policy_ref(grpc_lb_policy* policy);
void grpc_lb_policy_unref(grpc_lb_policy* policy); void grpc_lb_policy_unref(grpc_lb_policy* policy);
void grpc_lb_policy_weak_ref(grpc_lb_policy* policy);
void grpc_lb_policy_weak_unref(grpc_lb_policy* policy);
#endif #endif
/** called by concrete implementations to initialize the base struct */ /** called by concrete implementations to initialize the base struct */
@ -141,28 +133,24 @@ void grpc_lb_policy_init(grpc_lb_policy* policy,
const grpc_lb_policy_vtable* vtable, const grpc_lb_policy_vtable* vtable,
grpc_combiner* combiner); grpc_combiner* combiner);
/** Finds an appropriate subchannel for a call, based on \a pick_args. /// Shuts down \a policy.
/// If \a new_policy is non-null, any pending picks will be restarted
\a target will be set to the selected subchannel, or NULL on failure /// on that policy; otherwise, they will be failed.
or when the LB policy decides to drop the call. void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy,
grpc_lb_policy* new_policy);
Upon success, \a user_data will be set to whatever opaque information /** Finds an appropriate subchannel for a call, based on data in \a pick.
may need to be propagated from the LB policy, or NULL if not needed. \a pick must remain alive until the pick is complete.
\a context will be populated with context to pass to the subchannel
call, if needed.
If the pick succeeds and a result is known immediately, a non-zero If the pick succeeds and a result is known immediately, a non-zero
value will be returned. Otherwise, \a on_complete will be invoked value will be returned. Otherwise, \a pick->on_complete will be invoked
once the pick is complete with its error argument set to indicate once the pick is complete with its error argument set to indicate
success or failure. success or failure.
Any IO should be done under the \a interested_parties \a grpc_pollset_set Any IO should be done under the \a interested_parties \a grpc_pollset_set
in the \a grpc_lb_policy struct. */ in the \a grpc_lb_policy struct. */
int grpc_lb_policy_pick_locked(grpc_lb_policy* policy, int grpc_lb_policy_pick_locked(grpc_lb_policy* policy,
const grpc_lb_policy_pick_args* pick_args, grpc_lb_policy_pick_state* pick);
grpc_connected_subchannel** target,
grpc_call_context_element* context,
void** user_data, grpc_closure* on_complete);
/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping) /** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
against one of the connected subchannels managed by \a policy. */ against one of the connected subchannels managed by \a policy. */
@ -170,11 +158,11 @@ void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy,
grpc_closure* on_initiate, grpc_closure* on_initiate,
grpc_closure* on_ack); grpc_closure* on_ack);
/** Cancel picks for \a target. /** Cancel picks for \a pick.
The \a on_complete callback of the pending picks will be invoked with \a The \a on_complete callback of the pending picks will be invoked with \a
*target set to NULL. */ *target set to NULL. */
void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy, void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy,
grpc_connected_subchannel** target, grpc_lb_policy_pick_state* pick,
grpc_error* error); grpc_error* error);
/** Cancel all pending picks for which their \a initial_metadata_flags (as given /** Cancel all pending picks for which their \a initial_metadata_flags (as given

@ -54,7 +54,7 @@
* operations in progress over the old RR instance. This is done by * operations in progress over the old RR instance. This is done by
* decreasing the reference count on the old policy. The moment no more * decreasing the reference count on the old policy. The moment no more
* references are held on the old RR policy, it'll be destroyed and \a * references are held on the old RR policy, it'll be destroyed and \a
* glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN * on_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
* state. At this point we can transition to a new RR instance safely, which * state. At this point we can transition to a new RR instance safely, which
* is done once again via \a rr_handover_locked(). * is done once again via \a rr_handover_locked().
* *
@ -128,187 +128,48 @@
grpc_core::TraceFlag grpc_lb_glb_trace(false, "glb"); grpc_core::TraceFlag grpc_lb_glb_trace(false, "glb");
/* add lb_token of selected subchannel (address) to the call's initial struct glb_lb_policy;
* metadata */
static grpc_error* initial_metadata_add_lb_token(
grpc_metadata_batch* initial_metadata,
grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) {
GPR_ASSERT(lb_token_mdelem_storage != nullptr);
GPR_ASSERT(!GRPC_MDISNULL(lb_token));
return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
lb_token);
}
static void destroy_client_stats(void* arg) { namespace {
grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats*)arg);
}
typedef struct wrapped_rr_closure_arg {
/* the closure instance using this struct as argument */
grpc_closure wrapper_closure;
/* the original closure. Usually a on_complete/notify cb for pick() and ping()
* calls against the internal RR instance, respectively. */
grpc_closure* wrapped_closure;
/* the pick's initial metadata, kept in order to append the LB token for the
* pick */
grpc_metadata_batch* initial_metadata;
/* the picked target, used to determine which LB token to add to the pick's
* initial metadata */
grpc_connected_subchannel** target;
/* the context to be populated for the subchannel call */
grpc_call_context_element* context;
/* Stats for client-side load reporting. Note that this holds a /// Linked list of pending pick requests. It stores all information needed to
* reference, which must be either passed on via context or unreffed. */ /// eventually call (Round Robin's) pick() on them. They mainly stay pending
/// waiting for the RR policy to be created.
///
/// Note that when a pick is sent to the RR policy, we inject our own
/// on_complete callback, so that we can intercept the result before
/// invoking the original on_complete callback. This allows us to set the
/// LB token metadata and add client_stats to the call context.
/// See \a pending_pick_complete() for details.
struct pending_pick {
// Our on_complete closure and the original one.
grpc_closure on_complete;
grpc_closure* original_on_complete;
// The original pick.
grpc_lb_policy_pick_state* pick;
// Stats for client-side load reporting. Note that this holds a
// reference, which must be either passed on via context or unreffed.
grpc_grpclb_client_stats* client_stats; grpc_grpclb_client_stats* client_stats;
// The LB token associated with the pick. This is set via user_data in
/* the LB token associated with the pick */ // the pick.
grpc_mdelem lb_token; grpc_mdelem lb_token;
// The grpclb instance that created the wrapping. This instance is not owned,
/* storage for the lb token initial metadata mdelem */ // reference counts are untouched. It's used only for logging purposes.
grpc_linked_mdelem* lb_token_mdelem_storage; glb_lb_policy* glb_policy;
// Next pending pick.
/* The RR instance related to the closure */
grpc_lb_policy* rr_policy;
/* The grpclb instance that created the wrapping. This instance is not owned,
* reference counts are untouched. It's used only for logging purposes. */
grpc_lb_policy* glb_policy;
/* heap memory to be freed upon closure execution. */
void* free_when_done;
} wrapped_rr_closure_arg;
/* The \a on_complete closure passed as part of the pick requires keeping a
* reference to its associated round robin instance. We wrap this closure in
* order to unref the round robin instance upon its invocation */
static void wrapped_rr_closure(void* arg, grpc_error* error) {
wrapped_rr_closure_arg* wc_arg = (wrapped_rr_closure_arg*)arg;
GPR_ASSERT(wc_arg->wrapped_closure != nullptr);
GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
if (wc_arg->rr_policy != nullptr) {
/* if *target is nullptr, no pick has been made by the RR policy (eg, all
* addresses failed to connect). There won't be any user_data/token
* available */
if (*wc_arg->target != nullptr) {
if (!GRPC_MDISNULL(wc_arg->lb_token)) {
initial_metadata_add_lb_token(wc_arg->initial_metadata,
wc_arg->lb_token_mdelem_storage,
GRPC_MDELEM_REF(wc_arg->lb_token));
} else {
gpr_log(
GPR_ERROR,
"[grpclb %p] No LB token for connected subchannel pick %p (from RR "
"instance %p).",
wc_arg->glb_policy, *wc_arg->target, wc_arg->rr_policy);
abort();
}
// Pass on client stats via context. Passes ownership of the reference.
GPR_ASSERT(wc_arg->client_stats != nullptr);
wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
} else {
grpc_grpclb_client_stats_unref(wc_arg->client_stats);
}
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", wc_arg->glb_policy,
wc_arg->rr_policy);
}
GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "wrapped_rr_closure");
}
GPR_ASSERT(wc_arg->free_when_done != nullptr);
gpr_free(wc_arg->free_when_done);
}
namespace {
/* Linked list of pending pick requests. It stores all information needed to
* eventually call (Round Robin's) pick() on them. They mainly stay pending
* waiting for the RR policy to be created/updated.
*
* One particularity is the wrapping of the user-provided \a on_complete closure
* (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in
* order to correctly unref the RR policy instance upon completion of the pick.
* See \a wrapped_rr_closure for details. */
struct pending_pick {
struct pending_pick* next; struct pending_pick* next;
/* original pick()'s arguments */
grpc_lb_policy_pick_args pick_args;
/* output argument where to store the pick()ed connected subchannel, or
* nullptr upon error. */
grpc_connected_subchannel** target;
/* args for wrapped_on_complete */
wrapped_rr_closure_arg wrapped_on_complete_arg;
}; };
} // namespace
static void add_pending_pick(pending_pick** root, /// A linked list of pending pings waiting for the RR policy to be created.
const grpc_lb_policy_pick_args* pick_args, struct pending_ping {
grpc_connected_subchannel** target, grpc_closure* on_initiate;
grpc_call_context_element* context, grpc_closure* on_ack;
grpc_closure* on_complete) {
pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp));
pp->next = *root;
pp->pick_args = *pick_args;
pp->target = target;
pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
pp->wrapped_on_complete_arg.target = target;
pp->wrapped_on_complete_arg.context = context;
pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
pick_args->lb_token_mdelem_storage;
pp->wrapped_on_complete_arg.free_when_done = pp;
GRPC_CLOSURE_INIT(&pp->wrapped_on_complete_arg.wrapper_closure,
wrapped_rr_closure, &pp->wrapped_on_complete_arg,
grpc_schedule_on_exec_ctx);
*root = pp;
}
/* Same as the \a pending_pick struct but for ping operations */
typedef struct pending_ping {
struct pending_ping* next; struct pending_ping* next;
};
/* args for sending the ping */ } // namespace
wrapped_rr_closure_arg* on_initiate;
wrapped_rr_closure_arg* on_ack;
} pending_ping;
static void add_pending_ping(pending_ping** root, grpc_closure* on_initiate,
grpc_closure* on_ack) {
pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping));
if (on_initiate != nullptr) {
pping->on_initiate =
(wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_initiate));
pping->on_initiate->wrapped_closure = on_initiate;
pping->on_initiate->free_when_done = pping->on_initiate;
GRPC_CLOSURE_INIT(&pping->on_initiate->wrapper_closure, wrapped_rr_closure,
&pping->on_initiate, grpc_schedule_on_exec_ctx);
}
if (on_ack != nullptr) {
pping->on_ack = (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_ack));
pping->on_ack->wrapped_closure = on_ack;
pping->on_ack->free_when_done = pping->on_ack;
GRPC_CLOSURE_INIT(&pping->on_ack->wrapper_closure, wrapped_rr_closure,
&pping->on_ack, grpc_schedule_on_exec_ctx);
}
pping->next = *root;
*root = pping;
}
/*
* glb_lb_policy
*/
typedef struct rr_connectivity_data rr_connectivity_data;
typedef struct glb_lb_policy { struct glb_lb_policy {
/** base policy: must be first */ /** base policy: must be first */
grpc_lb_policy base; grpc_lb_policy base;
@ -333,6 +194,9 @@ typedef struct glb_lb_policy {
/** the RR policy to use of the backend servers returned by the LB server */ /** the RR policy to use of the backend servers returned by the LB server */
grpc_lb_policy* rr_policy; grpc_lb_policy* rr_policy;
grpc_closure on_rr_connectivity_changed;
grpc_connectivity_state rr_connectivity_state;
bool started_picking; bool started_picking;
/** our connectivity state tracker */ /** our connectivity state tracker */
@ -437,15 +301,87 @@ typedef struct glb_lb_policy {
grpc_closure client_load_report_closure; grpc_closure client_load_report_closure;
/* Client load report message payload. */ /* Client load report message payload. */
grpc_byte_buffer* client_load_report_payload; grpc_byte_buffer* client_load_report_payload;
} glb_lb_policy;
/* Keeps track and reacts to changes in connectivity of the RR instance */
struct rr_connectivity_data {
grpc_closure on_change;
grpc_connectivity_state state;
glb_lb_policy* glb_policy;
}; };
/* add lb_token of selected subchannel (address) to the call's initial
* metadata */
static grpc_error* initial_metadata_add_lb_token(
grpc_metadata_batch* initial_metadata,
grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) {
GPR_ASSERT(lb_token_mdelem_storage != nullptr);
GPR_ASSERT(!GRPC_MDISNULL(lb_token));
return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
lb_token);
}
static void destroy_client_stats(void* arg) {
grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats*)arg);
}
static void pending_pick_set_metadata_and_context(pending_pick* pp) {
/* if connected_subchannel is nullptr, no pick has been made by the RR
* policy (e.g., all addresses failed to connect). There won't be any
* user_data/token available */
if (pp->pick->connected_subchannel != nullptr) {
if (!GRPC_MDISNULL(pp->lb_token)) {
initial_metadata_add_lb_token(pp->pick->initial_metadata,
&pp->pick->lb_token_mdelem_storage,
GRPC_MDELEM_REF(pp->lb_token));
} else {
gpr_log(GPR_ERROR,
"[grpclb %p] No LB token for connected subchannel pick %p",
pp->glb_policy, pp->pick);
abort();
}
// Pass on client stats via context. Passes ownership of the reference.
GPR_ASSERT(pp->client_stats != nullptr);
pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
pp->client_stats;
pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
destroy_client_stats;
} else {
if (pp->client_stats != nullptr) {
grpc_grpclb_client_stats_unref(pp->client_stats);
}
}
}
/* The \a on_complete closure passed as part of the pick requires keeping a
* reference to its associated round robin instance. We wrap this closure in
* order to unref the round robin instance upon its invocation */
static void pending_pick_complete(void* arg, grpc_error* error) {
pending_pick* pp = (pending_pick*)arg;
pending_pick_set_metadata_and_context(pp);
GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error));
gpr_free(pp);
}
static pending_pick* pending_pick_create(glb_lb_policy* glb_policy,
grpc_lb_policy_pick_state* pick) {
pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp));
pp->pick = pick;
pp->glb_policy = glb_policy;
GRPC_CLOSURE_INIT(&pp->on_complete, pending_pick_complete, pp,
grpc_schedule_on_exec_ctx);
pp->original_on_complete = pick->on_complete;
pp->pick->on_complete = &pp->on_complete;
return pp;
}
static void pending_pick_add(pending_pick** root, pending_pick* new_pp) {
new_pp->next = *root;
*root = new_pp;
}
static void pending_ping_add(pending_ping** root, grpc_closure* on_initiate,
grpc_closure* on_ack) {
pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping));
pping->on_initiate = on_initiate;
pping->on_ack = on_ack;
pping->next = *root;
*root = pping;
}
static bool is_server_valid(const grpc_grpclb_server* server, size_t idx, static bool is_server_valid(const grpc_grpclb_server* server, size_t idx,
bool log) { bool log) {
if (server->drop) return false; if (server->drop) return false;
@ -557,7 +493,6 @@ static grpc_lb_addresses* process_serverlist_locked(
gpr_free(uri); gpr_free(uri);
user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
} }
grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
false /* is_balancer */, false /* is_balancer */,
nullptr /* balancer_name */, user_data); nullptr /* balancer_name */, user_data);
@ -598,7 +533,6 @@ static void update_lb_connectivity_status_locked(
grpc_error* rr_state_error) { grpc_error* rr_state_error) {
const grpc_connectivity_state curr_glb_state = const grpc_connectivity_state curr_glb_state =
grpc_connectivity_state_check(&glb_policy->state_tracker); grpc_connectivity_state_check(&glb_policy->state_tracker);
/* The new connectivity status is a function of the previous one and the new /* The new connectivity status is a function of the previous one and the new
* input coming from the status of the RR policy. * input coming from the status of the RR policy.
* *
@ -628,7 +562,6 @@ static void update_lb_connectivity_status_locked(
* *
* (*) This function mustn't be called during shutting down. */ * (*) This function mustn't be called during shutting down. */
GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
switch (rr_state) { switch (rr_state) {
case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_SHUTDOWN: case GRPC_CHANNEL_SHUTDOWN:
@ -639,7 +572,6 @@ static void update_lb_connectivity_status_locked(
case GRPC_CHANNEL_READY: case GRPC_CHANNEL_READY:
GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE); GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
} }
if (grpc_lb_glb_trace.enabled()) { if (grpc_lb_glb_trace.enabled()) {
gpr_log( gpr_log(
GPR_INFO, GPR_INFO,
@ -657,10 +589,8 @@ static void update_lb_connectivity_status_locked(
* cleanups this callback would otherwise be responsible for. * cleanups this callback would otherwise be responsible for.
* If \a force_async is true, then we will manually schedule the * If \a force_async is true, then we will manually schedule the
* completion callback even if the pick is available immediately. */ * completion callback even if the pick is available immediately. */
static bool pick_from_internal_rr_locked( static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy,
glb_lb_policy* glb_policy, const grpc_lb_policy_pick_args* pick_args, bool force_async, pending_pick* pp) {
bool force_async, grpc_connected_subchannel** target,
wrapped_rr_closure_arg* wc_arg) {
// Check for drops if we are not using fallback backend addresses. // Check for drops if we are not using fallback backend addresses.
if (glb_policy->serverlist != nullptr) { if (glb_policy->serverlist != nullptr) {
// Look at the index into the serverlist to see if we should drop this call. // Look at the index into the serverlist to see if we should drop this call.
@ -670,57 +600,36 @@ static bool pick_from_internal_rr_locked(
glb_policy->serverlist_index = 0; // Wrap-around. glb_policy->serverlist_index = 0; // Wrap-around.
} }
if (server->drop) { if (server->drop) {
// Not using the RR policy, so unref it.
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p for drop", glb_policy,
wc_arg->rr_policy);
}
GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync");
// Update client load reporting stats to indicate the number of // Update client load reporting stats to indicate the number of
// dropped calls. Note that we have to do this here instead of in // dropped calls. Note that we have to do this here instead of in
// the client_load_reporting filter, because we do not create a // the client_load_reporting filter, because we do not create a
// subchannel call (and therefore no client_load_reporting filter) // subchannel call (and therefore no client_load_reporting filter)
// for dropped calls. // for dropped calls.
GPR_ASSERT(wc_arg->client_stats != nullptr); GPR_ASSERT(glb_policy->client_stats != nullptr);
grpc_grpclb_client_stats_add_call_dropped_locked( grpc_grpclb_client_stats_add_call_dropped_locked(
server->load_balance_token, wc_arg->client_stats); server->load_balance_token, glb_policy->client_stats);
grpc_grpclb_client_stats_unref(wc_arg->client_stats);
if (force_async) { if (force_async) {
GPR_ASSERT(wc_arg->wrapped_closure != nullptr); GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE); gpr_free(pp);
gpr_free(wc_arg->free_when_done);
return false; return false;
} }
gpr_free(wc_arg->free_when_done); gpr_free(pp);
return true; return true;
} }
} }
// Set client_stats and user_data.
pp->client_stats = grpc_grpclb_client_stats_ref(glb_policy->client_stats);
GPR_ASSERT(pp->pick->user_data == nullptr);
pp->pick->user_data = (void**)&pp->lb_token;
// Pick via the RR policy. // Pick via the RR policy.
const bool pick_done = grpc_lb_policy_pick_locked( bool pick_done = grpc_lb_policy_pick_locked(glb_policy->rr_policy, pp->pick);
wc_arg->rr_policy, pick_args, target, wc_arg->context,
(void**)&wc_arg->lb_token, &wc_arg->wrapper_closure);
if (pick_done) { if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ pending_pick_set_metadata_and_context(pp);
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", glb_policy,
wc_arg->rr_policy);
}
GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync");
/* add the load reporting initial metadata */
initial_metadata_add_lb_token(pick_args->initial_metadata,
pick_args->lb_token_mdelem_storage,
GRPC_MDELEM_REF(wc_arg->lb_token));
// Pass on client stats via context. Passes ownership of the reference.
GPR_ASSERT(wc_arg->client_stats != nullptr);
wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
if (force_async) { if (force_async) {
GPR_ASSERT(wc_arg->wrapped_closure != nullptr); GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE); pick_done = false;
gpr_free(wc_arg->free_when_done);
return false;
} }
gpr_free(wc_arg->free_when_done); gpr_free(pp);
} }
/* else, the pending pick will be registered and taken care of by the /* else, the pending pick will be registered and taken care of by the
* pending pick list inside the RR policy (glb_policy->rr_policy). * pending pick list inside the RR policy (glb_policy->rr_policy).
@ -762,7 +671,7 @@ static void lb_policy_args_destroy(grpc_lb_policy_args* args) {
gpr_free(args); gpr_free(args);
} }
static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error); static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error);
static void create_rr_locked(glb_lb_policy* glb_policy, static void create_rr_locked(glb_lb_policy* glb_policy,
grpc_lb_policy_args* args) { grpc_lb_policy_args* args) {
GPR_ASSERT(glb_policy->rr_policy == nullptr); GPR_ASSERT(glb_policy->rr_policy == nullptr);
@ -784,72 +693,46 @@ static void create_rr_locked(glb_lb_policy* glb_policy,
glb_policy->base.request_reresolution = nullptr; glb_policy->base.request_reresolution = nullptr;
glb_policy->rr_policy = new_rr_policy; glb_policy->rr_policy = new_rr_policy;
grpc_error* rr_state_error = nullptr; grpc_error* rr_state_error = nullptr;
const grpc_connectivity_state rr_state = glb_policy->rr_connectivity_state = grpc_lb_policy_check_connectivity_locked(
grpc_lb_policy_check_connectivity_locked(glb_policy->rr_policy, glb_policy->rr_policy, &rr_state_error);
&rr_state_error);
/* Connectivity state is a function of the RR policy updated/created */ /* Connectivity state is a function of the RR policy updated/created */
update_lb_connectivity_status_locked(glb_policy, rr_state, rr_state_error); update_lb_connectivity_status_locked(
glb_policy, glb_policy->rr_connectivity_state, rr_state_error);
/* Add the gRPC LB's interested_parties pollset_set to that of the newly /* Add the gRPC LB's interested_parties pollset_set to that of the newly
* created RR policy. This will make the RR policy progress upon activity on * created RR policy. This will make the RR policy progress upon activity on
* gRPC LB, which in turn is tied to the application's call */ * gRPC LB, which in turn is tied to the application's call */
grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties, grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties,
glb_policy->base.interested_parties); glb_policy->base.interested_parties);
GRPC_CLOSURE_INIT(&glb_policy->on_rr_connectivity_changed,
/* Allocate the data for the tracking of the new RR policy's connectivity. on_rr_connectivity_changed_locked, glb_policy,
* It'll be deallocated in glb_rr_connectivity_changed() */
rr_connectivity_data* rr_connectivity =
(rr_connectivity_data*)gpr_zalloc(sizeof(rr_connectivity_data));
GRPC_CLOSURE_INIT(&rr_connectivity->on_change,
glb_rr_connectivity_changed_locked, rr_connectivity,
grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_combiner_scheduler(glb_policy->base.combiner));
rr_connectivity->glb_policy = glb_policy;
rr_connectivity->state = rr_state;
/* Subscribe to changes to the connectivity of the new RR */ /* Subscribe to changes to the connectivity of the new RR */
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb"); GRPC_LB_POLICY_REF(&glb_policy->base, "glb_rr_connectivity_cb");
grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy, grpc_lb_policy_notify_on_state_change_locked(
&rr_connectivity->state, glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
&rr_connectivity->on_change); &glb_policy->on_rr_connectivity_changed);
grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy); grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy);
// Send pending picks to RR policy.
/* Update picks and pings in wait */
pending_pick* pp; pending_pick* pp;
while ((pp = glb_policy->pending_picks)) { while ((pp = glb_policy->pending_picks)) {
glb_policy->pending_picks = pp->next; glb_policy->pending_picks = pp->next;
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
pp->wrapped_on_complete_arg.client_stats =
grpc_grpclb_client_stats_ref(glb_policy->client_stats);
if (grpc_lb_glb_trace.enabled()) { if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[grpclb %p] Pending pick about to (async) PICK from RR %p", "[grpclb %p] Pending pick about to (async) PICK from RR %p",
glb_policy, glb_policy->rr_policy); glb_policy, glb_policy->rr_policy);
} }
pick_from_internal_rr_locked(glb_policy, &pp->pick_args, pick_from_internal_rr_locked(glb_policy, true /* force_async */, pp);
true /* force_async */, pp->target,
&pp->wrapped_on_complete_arg);
} }
// Send pending pings to RR policy.
pending_ping* pping; pending_ping* pping;
while ((pping = glb_policy->pending_pings)) { while ((pping = glb_policy->pending_pings)) {
glb_policy->pending_pings = pping->next; glb_policy->pending_pings = pping->next;
grpc_closure* on_initiate = nullptr;
grpc_closure* on_ack = nullptr;
if (pping->on_initiate != nullptr) {
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
pping->on_initiate->rr_policy = glb_policy->rr_policy;
on_initiate = &pping->on_initiate->wrapper_closure;
}
if (pping->on_ack != nullptr) {
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
pping->on_ack->rr_policy = glb_policy->rr_policy;
on_ack = &pping->on_ack->wrapper_closure;
}
if (grpc_lb_glb_trace.enabled()) { if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p", gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
glb_policy, glb_policy->rr_policy); glb_policy, glb_policy->rr_policy);
} }
grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack); grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, pping->on_initiate,
pping->on_ack);
gpr_free(pping); gpr_free(pping);
} }
} }
@ -875,31 +758,28 @@ static void rr_handover_locked(glb_lb_policy* glb_policy) {
lb_policy_args_destroy(args); lb_policy_args_destroy(args);
} }
static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error) { static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error) {
rr_connectivity_data* rr_connectivity = (rr_connectivity_data*)arg; glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
glb_lb_policy* glb_policy = rr_connectivity->glb_policy;
if (glb_policy->shutting_down) { if (glb_policy->shutting_down) {
GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
gpr_free(rr_connectivity);
return; return;
} }
if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN) { if (glb_policy->rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
/* An RR policy that has transitioned into the SHUTDOWN connectivity state /* An RR policy that has transitioned into the SHUTDOWN connectivity state
* should not be considered for picks or updates: the SHUTDOWN state is a * should not be considered for picks or updates: the SHUTDOWN state is a
* sink, policies can't transition back from it. .*/ * sink, policies can't transition back from it. .*/
GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown"); GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown");
glb_policy->rr_policy = nullptr; glb_policy->rr_policy = nullptr;
GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
gpr_free(rr_connectivity);
return; return;
} }
/* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */ /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
update_lb_connectivity_status_locked(glb_policy, rr_connectivity->state, update_lb_connectivity_status_locked(
GRPC_ERROR_REF(error)); glb_policy, glb_policy->rr_connectivity_state, GRPC_ERROR_REF(error));
/* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */ /* Resubscribe. Reuse the "glb_rr_connectivity_cb" ref. */
grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy, grpc_lb_policy_notify_on_state_change_locked(
&rr_connectivity->state, glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
&rr_connectivity->on_change); &glb_policy->on_rr_connectivity_changed);
} }
static void destroy_balancer_name(void* balancer_name) { static void destroy_balancer_name(void* balancer_name) {
@ -1007,22 +887,17 @@ static void glb_destroy(grpc_lb_policy* pol) {
gpr_free(glb_policy); gpr_free(glb_policy);
} }
static void glb_shutdown_locked(grpc_lb_policy* pol) { static void glb_shutdown_locked(grpc_lb_policy* pol,
grpc_lb_policy* new_policy) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol; glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
glb_policy->shutting_down = true; glb_policy->shutting_down = true;
/* We need a copy of the lb_call pointer because we can't cancell the call
* while holding glb_policy->mu: lb_on_server_status_received, invoked due to
* the cancel, needs to acquire that same lock */
grpc_call* lb_call = glb_policy->lb_call;
/* glb_policy->lb_call and this local lb_call must be consistent at this point /* glb_policy->lb_call and this local lb_call must be consistent at this point
* because glb_policy->lb_call is only assigned in lb_call_init_locked as part * because glb_policy->lb_call is only assigned in lb_call_init_locked as part
* of query_for_backends_locked, which can only be invoked while * of query_for_backends_locked, which can only be invoked while
* glb_policy->shutting_down is false. */ * glb_policy->shutting_down is false. */
if (lb_call != nullptr) { if (glb_policy->lb_call != nullptr) {
grpc_call_cancel(lb_call, nullptr); grpc_call_cancel(glb_policy->lb_call, nullptr);
/* lb_on_server_status_received will pick up the cancel and clean up */ /* lb_on_server_status_received will pick up the cancel and clean up */
} }
if (glb_policy->retry_timer_callback_pending) { if (glb_policy->retry_timer_callback_pending) {
@ -1031,12 +906,8 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) {
if (glb_policy->fallback_timer_callback_pending) { if (glb_policy->fallback_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_fallback_timer); grpc_timer_cancel(&glb_policy->lb_fallback_timer);
} }
pending_pick* pp = glb_policy->pending_picks;
glb_policy->pending_picks = nullptr;
pending_ping* pping = glb_policy->pending_pings;
glb_policy->pending_pings = nullptr;
if (glb_policy->rr_policy != nullptr) { if (glb_policy->rr_policy != nullptr) {
grpc_lb_policy_shutdown_locked(glb_policy->rr_policy, nullptr);
GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown"); GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown");
} else { } else {
grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED); grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
@ -1051,28 +922,35 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) {
} }
grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "glb_shutdown"); GRPC_ERROR_REF(error), "glb_shutdown");
// Clear pending picks.
pending_pick* pp = glb_policy->pending_picks;
glb_policy->pending_picks = nullptr;
while (pp != nullptr) { while (pp != nullptr) {
pending_pick* next = pp->next; pending_pick* next = pp->next;
*pp->target = nullptr; if (new_policy != nullptr) {
GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, // Hand pick over to new policy.
GRPC_ERROR_REF(error)); if (pp->client_stats != nullptr) {
grpc_grpclb_client_stats_unref(pp->client_stats);
}
pp->pick->on_complete = pp->original_on_complete;
if (grpc_lb_policy_pick_locked(new_policy, pp->pick)) {
// Synchronous return; schedule callback.
GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE);
}
gpr_free(pp); gpr_free(pp);
} else {
pp->pick->connected_subchannel = nullptr;
GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
}
pp = next; pp = next;
} }
// Clear pending pings.
pending_ping* pping = glb_policy->pending_pings;
glb_policy->pending_pings = nullptr;
while (pping != nullptr) { while (pping != nullptr) {
pending_ping* next = pping->next; pending_ping* next = pping->next;
if (pping->on_initiate != nullptr) { GRPC_CLOSURE_SCHED(pping->on_initiate, GRPC_ERROR_REF(error));
GRPC_CLOSURE_SCHED(&pping->on_initiate->wrapper_closure, GRPC_CLOSURE_SCHED(pping->on_ack, GRPC_ERROR_REF(error));
GRPC_ERROR_REF(error));
gpr_free(pping->on_initiate);
}
if (pping->on_ack != nullptr) {
GRPC_CLOSURE_SCHED(&pping->on_ack->wrapper_closure,
GRPC_ERROR_REF(error));
gpr_free(pping->on_ack);
}
gpr_free(pping); gpr_free(pping);
pping = next; pping = next;
} }
@ -1090,16 +968,16 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) {
// level (grpclb), inside the glb_policy->pending_picks list. To cancel these, // level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
// we invoke the completion closure and set *target to nullptr right here. // we invoke the completion closure and set *target to nullptr right here.
static void glb_cancel_pick_locked(grpc_lb_policy* pol, static void glb_cancel_pick_locked(grpc_lb_policy* pol,
grpc_connected_subchannel** target, grpc_lb_policy_pick_state* pick,
grpc_error* error) { grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol; glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
pending_pick* pp = glb_policy->pending_picks; pending_pick* pp = glb_policy->pending_picks;
glb_policy->pending_picks = nullptr; glb_policy->pending_picks = nullptr;
while (pp != nullptr) { while (pp != nullptr) {
pending_pick* next = pp->next; pending_pick* next = pp->next;
if (pp->target == target) { if (pp->pick == pick) {
*target = nullptr; pick->connected_subchannel = nullptr;
GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, GRPC_CLOSURE_SCHED(&pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1)); "Pick Cancelled", &error, 1));
} else { } else {
@ -1109,7 +987,7 @@ static void glb_cancel_pick_locked(grpc_lb_policy* pol,
pp = next; pp = next;
} }
if (glb_policy->rr_policy != nullptr) { if (glb_policy->rr_policy != nullptr) {
grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, target, grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, pick,
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
} }
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
@ -1134,9 +1012,9 @@ static void glb_cancel_picks_locked(grpc_lb_policy* pol,
glb_policy->pending_picks = nullptr; glb_policy->pending_picks = nullptr;
while (pp != nullptr) { while (pp != nullptr) {
pending_pick* next = pp->next; pending_pick* next = pp->next;
if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) == if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) { initial_metadata_flags_eq) {
GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, GRPC_CLOSURE_SCHED(&pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1)); "Pick Cancelled", &error, 1));
} else { } else {
@ -1162,7 +1040,7 @@ static void start_picking_locked(glb_lb_policy* glb_policy) {
!glb_policy->fallback_timer_callback_pending) { !glb_policy->fallback_timer_callback_pending) {
grpc_millis deadline = grpc_millis deadline =
grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_fallback_timeout_ms; grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_fallback_timeout_ms;
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer"); GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_fallback_timer");
GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked, GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked,
glb_policy, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_combiner_scheduler(glb_policy->base.combiner));
@ -1184,19 +1062,9 @@ static void glb_exit_idle_locked(grpc_lb_policy* pol) {
} }
static int glb_pick_locked(grpc_lb_policy* pol, static int glb_pick_locked(grpc_lb_policy* pol,
const grpc_lb_policy_pick_args* pick_args, grpc_lb_policy_pick_state* pick) {
grpc_connected_subchannel** target,
grpc_call_context_element* context, void** user_data,
grpc_closure* on_complete) {
if (pick_args->lb_token_mdelem_storage == nullptr) {
*target = nullptr;
GRPC_CLOSURE_SCHED(on_complete,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No mdelem storage for the LB token. Load reporting "
"won't work without it. Failing"));
return 0;
}
glb_lb_policy* glb_policy = (glb_lb_policy*)pol; glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
pending_pick* pp = pending_pick_create(glb_policy, pick);
bool pick_done = false; bool pick_done = false;
if (glb_policy->rr_policy != nullptr) { if (glb_policy->rr_policy != nullptr) {
const grpc_connectivity_state rr_connectivity_state = const grpc_connectivity_state rr_connectivity_state =
@ -1204,7 +1072,7 @@ static int glb_pick_locked(grpc_lb_policy* pol,
nullptr); nullptr);
// The glb_policy->rr_policy may have transitioned to SHUTDOWN but the // The glb_policy->rr_policy may have transitioned to SHUTDOWN but the
// callback registered to capture this event // callback registered to capture this event
// (glb_rr_connectivity_changed_locked) may not have been invoked yet. We // (on_rr_connectivity_changed_locked) may not have been invoked yet. We
// need to make sure we aren't trying to pick from a RR policy instance // need to make sure we aren't trying to pick from a RR policy instance
// that's in shutdown. // that's in shutdown.
if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
@ -1214,32 +1082,16 @@ static int glb_pick_locked(grpc_lb_policy* pol,
glb_policy, glb_policy->rr_policy, glb_policy, glb_policy->rr_policy,
grpc_connectivity_state_name(rr_connectivity_state)); grpc_connectivity_state_name(rr_connectivity_state));
} }
add_pending_pick(&glb_policy->pending_picks, pick_args, target, context, pending_pick_add(&glb_policy->pending_picks, pp);
on_complete);
pick_done = false; pick_done = false;
} else { // RR not in shutdown } else { // RR not in shutdown
if (grpc_lb_glb_trace.enabled()) { if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy, gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy,
glb_policy->rr_policy); glb_policy->rr_policy);
} }
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
wrapped_rr_closure_arg* wc_arg =
(wrapped_rr_closure_arg*)gpr_zalloc(sizeof(wrapped_rr_closure_arg));
GRPC_CLOSURE_INIT(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg,
grpc_schedule_on_exec_ctx);
wc_arg->rr_policy = glb_policy->rr_policy;
wc_arg->target = target;
wc_arg->context = context;
GPR_ASSERT(glb_policy->client_stats != nullptr); GPR_ASSERT(glb_policy->client_stats != nullptr);
wc_arg->client_stats = pick_done =
grpc_grpclb_client_stats_ref(glb_policy->client_stats); pick_from_internal_rr_locked(glb_policy, false /* force_async */, pp);
wc_arg->wrapped_closure = on_complete;
wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
wc_arg->initial_metadata = pick_args->initial_metadata;
wc_arg->free_when_done = wc_arg;
wc_arg->glb_policy = pol;
pick_done = pick_from_internal_rr_locked(
glb_policy, pick_args, false /* force_async */, target, wc_arg);
} }
} else { // glb_policy->rr_policy == NULL } else { // glb_policy->rr_policy == NULL
if (grpc_lb_glb_trace.enabled()) { if (grpc_lb_glb_trace.enabled()) {
@ -1247,8 +1099,7 @@ static int glb_pick_locked(grpc_lb_policy* pol,
"[grpclb %p] No RR policy. Adding to grpclb's pending picks", "[grpclb %p] No RR policy. Adding to grpclb's pending picks",
glb_policy); glb_policy);
} }
add_pending_pick(&glb_policy->pending_picks, pick_args, target, context, pending_pick_add(&glb_policy->pending_picks, pp);
on_complete);
if (!glb_policy->started_picking) { if (!glb_policy->started_picking) {
start_picking_locked(glb_policy); start_picking_locked(glb_policy);
} }
@ -1270,7 +1121,7 @@ static void glb_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
if (glb_policy->rr_policy) { if (glb_policy->rr_policy) {
grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack); grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack);
} else { } else {
add_pending_ping(&glb_policy->pending_pings, on_initiate, on_ack); pending_ping_add(&glb_policy->pending_pings, on_initiate, on_ack);
if (!glb_policy->started_picking) { if (!glb_policy->started_picking) {
start_picking_locked(glb_policy); start_picking_locked(glb_policy);
} }
@ -1295,7 +1146,7 @@ static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) {
} }
query_for_backends_locked(glb_policy); query_for_backends_locked(glb_policy);
} }
GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_retry_timer"); GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_retry_timer");
} }
static void maybe_restart_lb_call(glb_lb_policy* glb_policy) { static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
@ -1321,7 +1172,7 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
glb_policy); glb_policy);
} }
} }
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer");
GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry, GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry,
lb_call_on_retry_timer_locked, glb_policy, lb_call_on_retry_timer_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_combiner_scheduler(glb_policy->base.combiner));
@ -1329,7 +1180,7 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try, grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try,
&glb_policy->lb_on_call_retry); &glb_policy->lb_on_call_retry);
} }
GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, GRPC_LB_POLICY_UNREF(&glb_policy->base,
"lb_on_server_status_received_locked"); "lb_on_server_status_received_locked");
} }
@ -1353,7 +1204,7 @@ static void client_load_report_done_locked(void* arg, grpc_error* error) {
glb_policy->client_load_report_payload = nullptr; glb_policy->client_load_report_payload = nullptr;
if (error != GRPC_ERROR_NONE || glb_policy->lb_call == nullptr) { if (error != GRPC_ERROR_NONE || glb_policy->lb_call == nullptr) {
glb_policy->client_load_report_timer_callback_pending = false; glb_policy->client_load_report_timer_callback_pending = false;
GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report"); GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report");
if (glb_policy->lb_call == nullptr) { if (glb_policy->lb_call == nullptr) {
maybe_restart_lb_call(glb_policy); maybe_restart_lb_call(glb_policy);
} }
@ -1394,7 +1245,7 @@ static void send_client_load_report_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg; glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == nullptr) { if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == nullptr) {
glb_policy->client_load_report_timer_callback_pending = false; glb_policy->client_load_report_timer_callback_pending = false;
GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report"); GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report");
if (glb_policy->lb_call == nullptr) { if (glb_policy->lb_call == nullptr) {
maybe_restart_lb_call(glb_policy); maybe_restart_lb_call(glb_policy);
} }
@ -1547,10 +1398,8 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) {
op->flags = 0; op->flags = 0;
op->reserved = nullptr; op->reserved = nullptr;
op++; op++;
/* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref /* take a ref to be released in lb_on_sent_initial_request_locked() */
* count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */ GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_sent_initial_request_locked");
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base,
"lb_on_sent_initial_request_locked");
call_error = grpc_call_start_batch_and_execute( call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_call, ops, (size_t)(op - ops), glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_sent_initial_request); &glb_policy->lb_on_sent_initial_request);
@ -1566,10 +1415,8 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) {
op->flags = 0; op->flags = 0;
op->reserved = nullptr; op->reserved = nullptr;
op++; op++;
/* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref /* take a ref to be released in lb_on_server_status_received_locked() */
* count goes to zero) to be unref'd in lb_on_server_status_received_locked */ GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_server_status_received_locked");
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base,
"lb_on_server_status_received_locked");
call_error = grpc_call_start_batch_and_execute( call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_call, ops, (size_t)(op - ops), glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_server_status_received); &glb_policy->lb_on_server_status_received);
@ -1581,9 +1428,8 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) {
op->flags = 0; op->flags = 0;
op->reserved = nullptr; op->reserved = nullptr;
op++; op++;
/* take another weak ref to be unref'd/reused in /* take a ref to be unref'd/reused in lb_on_response_received_locked() */
* lb_on_response_received_locked */ GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_response_received_locked");
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received_locked");
call_error = grpc_call_start_batch_and_execute( call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_call, ops, (size_t)(op - ops), glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_response_received); &glb_policy->lb_on_response_received);
@ -1598,8 +1444,7 @@ static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) {
if (glb_policy->client_load_report_payload != nullptr) { if (glb_policy->client_load_report_payload != nullptr) {
do_send_client_load_report_locked(glb_policy); do_send_client_load_report_locked(glb_policy);
} }
GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, GRPC_LB_POLICY_UNREF(&glb_policy->base, "lb_on_sent_initial_request_locked");
"lb_on_sent_initial_request_locked");
} }
static void lb_on_response_received_locked(void* arg, grpc_error* error) { static void lb_on_response_received_locked(void* arg, grpc_error* error) {
@ -1631,11 +1476,9 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
"client load reporting interval = %" PRIdPTR " milliseconds", "client load reporting interval = %" PRIdPTR " milliseconds",
glb_policy, glb_policy->client_stats_report_interval); glb_policy, glb_policy->client_stats_report_interval);
} }
/* take a weak ref (won't prevent calling of \a glb_shutdown() if the /* take a ref to be unref'd in send_client_load_report_locked() */
* strong ref count goes to zero) to be unref'd in
* send_client_load_report_locked() */
glb_policy->client_load_report_timer_callback_pending = true; glb_policy->client_load_report_timer_callback_pending = true;
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report"); GRPC_LB_POLICY_REF(&glb_policy->base, "client_load_report");
schedule_next_client_load_report(glb_policy); schedule_next_client_load_report(glb_policy);
} else if (grpc_lb_glb_trace.enabled()) { } else if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
@ -1717,20 +1560,20 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
op->flags = 0; op->flags = 0;
op->reserved = nullptr; op->reserved = nullptr;
op++; op++;
/* reuse the "lb_on_response_received_locked" weak ref taken in /* reuse the "lb_on_response_received_locked" ref taken in
* query_for_backends_locked() */ * query_for_backends_locked() */
const grpc_call_error call_error = grpc_call_start_batch_and_execute( const grpc_call_error call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_call, ops, (size_t)(op - ops), glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_response_received); /* loop */ &glb_policy->lb_on_response_received); /* loop */
GPR_ASSERT(GRPC_CALL_OK == call_error); GPR_ASSERT(GRPC_CALL_OK == call_error);
} else { } else {
GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, GRPC_LB_POLICY_UNREF(&glb_policy->base,
"lb_on_response_received_locked_shutdown"); "lb_on_response_received_locked_shutdown");
} }
} else { /* empty payload: call cancelled. */ } else { /* empty payload: call cancelled. */
/* dispose of the "lb_on_response_received_locked" weak ref taken in /* dispose of the "lb_on_response_received_locked" ref taken in
* query_for_backends_locked() and reused in every reception loop */ * query_for_backends_locked() and reused in every reception loop */
GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, GRPC_LB_POLICY_UNREF(&glb_policy->base,
"lb_on_response_received_locked_empty_payload"); "lb_on_response_received_locked_empty_payload");
} }
} }
@ -1751,7 +1594,7 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
rr_handover_locked(glb_policy); rr_handover_locked(glb_policy);
} }
} }
GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_fallback_timer"); GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer");
} }
static void lb_on_server_status_received_locked(void* arg, grpc_error* error) { static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
@ -1835,7 +1678,7 @@ static void glb_update_locked(grpc_lb_policy* policy,
grpc_channel_get_channel_stack(glb_policy->lb_channel)); grpc_channel_get_channel_stack(glb_policy->lb_channel));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
glb_policy->watching_lb_channel = true; glb_policy->watching_lb_channel = true;
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity"); GRPC_LB_POLICY_REF(&glb_policy->base, "watch_lb_channel_connectivity");
grpc_client_channel_watch_connectivity_state( grpc_client_channel_watch_connectivity_state(
client_channel_elem, client_channel_elem,
grpc_polling_entity_create_from_pollset_set( grpc_polling_entity_create_from_pollset_set(
@ -1891,7 +1734,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
case GRPC_CHANNEL_SHUTDOWN: case GRPC_CHANNEL_SHUTDOWN:
done: done:
glb_policy->watching_lb_channel = false; glb_policy->watching_lb_channel = false;
GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, GRPC_LB_POLICY_UNREF(&glb_policy->base,
"watch_lb_channel_connectivity_cb_shutdown"); "watch_lb_channel_connectivity_cb_shutdown");
break; break;
} }

@ -31,15 +31,6 @@
grpc_core::TraceFlag grpc_lb_pick_first_trace(false, "pick_first"); grpc_core::TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
namespace {
struct pending_pick {
struct pending_pick* next;
uint32_t initial_metadata_flags;
grpc_connected_subchannel** target;
grpc_closure* on_complete;
};
} // namespace
typedef struct { typedef struct {
/** base policy: must be first */ /** base policy: must be first */
grpc_lb_policy base; grpc_lb_policy base;
@ -54,7 +45,7 @@ typedef struct {
/** are we shut down? */ /** are we shut down? */
bool shutdown; bool shutdown;
/** list of picks that are waiting on connectivity */ /** list of picks that are waiting on connectivity */
pending_pick* pending_picks; grpc_lb_policy_pick_state* pending_picks;
/** our connectivity state tracker */ /** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker; grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy; } pick_first_lb_policy;
@ -72,19 +63,27 @@ static void pf_destroy(grpc_lb_policy* pol) {
} }
} }
static void pf_shutdown_locked(grpc_lb_policy* pol) { static void pf_shutdown_locked(grpc_lb_policy* pol,
grpc_lb_policy* new_policy) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol; pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_pick_first_trace.enabled()) { if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p); gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p);
} }
p->shutdown = true; p->shutdown = true;
pending_pick* pp; grpc_lb_policy_pick_state* pick;
while ((pp = p->pending_picks) != nullptr) { while ((pick = p->pending_picks) != nullptr) {
p->pending_picks = pp->next; p->pending_picks = pick->next;
*pp->target = nullptr; if (new_policy != nullptr) {
GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error)); // Hand off to new LB policy.
gpr_free(pp); if (grpc_lb_policy_pick_locked(new_policy, pick)) {
// Synchronous return, schedule closure.
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
} else {
pick->connected_subchannel = nullptr;
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
}
} }
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN, grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "shutdown"); GRPC_ERROR_REF(error), "shutdown");
@ -104,19 +103,18 @@ static void pf_shutdown_locked(grpc_lb_policy* pol) {
} }
static void pf_cancel_pick_locked(grpc_lb_policy* pol, static void pf_cancel_pick_locked(grpc_lb_policy* pol,
grpc_connected_subchannel** target, grpc_lb_policy_pick_state* pick,
grpc_error* error) { grpc_error* error) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol; pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
pending_pick* pp = p->pending_picks; grpc_lb_policy_pick_state* pp = p->pending_picks;
p->pending_picks = nullptr; p->pending_picks = nullptr;
while (pp != nullptr) { while (pp != nullptr) {
pending_pick* next = pp->next; grpc_lb_policy_pick_state* next = pp->next;
if (pp->target == target) { if (pp == pick) {
*target = nullptr; pick->connected_subchannel = nullptr;
GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1)); "Pick Cancelled", &error, 1));
gpr_free(pp);
} else { } else {
pp->next = p->pending_picks; pp->next = p->pending_picks;
p->pending_picks = pp; p->pending_picks = pp;
@ -131,21 +129,20 @@ static void pf_cancel_picks_locked(grpc_lb_policy* pol,
uint32_t initial_metadata_flags_eq, uint32_t initial_metadata_flags_eq,
grpc_error* error) { grpc_error* error) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol; pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
pending_pick* pp = p->pending_picks; grpc_lb_policy_pick_state* pick = p->pending_picks;
p->pending_picks = nullptr; p->pending_picks = nullptr;
while (pp != nullptr) { while (pick != nullptr) {
pending_pick* next = pp->next; grpc_lb_policy_pick_state* next = pick->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) { initial_metadata_flags_eq) {
GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1)); "Pick Cancelled", &error, 1));
gpr_free(pp);
} else { } else {
pp->next = p->pending_picks; pick->next = p->pending_picks;
p->pending_picks = pp; p->pending_picks = pick;
} }
pp = next; pick = next;
} }
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
@ -175,27 +172,20 @@ static void pf_exit_idle_locked(grpc_lb_policy* pol) {
} }
static int pf_pick_locked(grpc_lb_policy* pol, static int pf_pick_locked(grpc_lb_policy* pol,
const grpc_lb_policy_pick_args* pick_args, grpc_lb_policy_pick_state* pick) {
grpc_connected_subchannel** target,
grpc_call_context_element* context, void** user_data,
grpc_closure* on_complete) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol; pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
// If we have a selected subchannel already, return synchronously. // If we have a selected subchannel already, return synchronously.
if (p->selected != nullptr) { if (p->selected != nullptr) {
*target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected->connected_subchannel, pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
"picked"); p->selected->connected_subchannel, "picked");
return 1; return 1;
} }
// No subchannel selected yet, so handle asynchronously. // No subchannel selected yet, so handle asynchronously.
if (!p->started_picking) { if (!p->started_picking) {
start_picking_locked(p); start_picking_locked(p);
} }
pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp)); pick->next = p->pending_picks;
pp->next = p->pending_picks; p->pending_picks = pick;
pp->target = target;
pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->on_complete = on_complete;
p->pending_picks = pp;
return 0; return 0;
} }
@ -481,18 +471,17 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// Drop all other subchannels, since we are now connected. // Drop all other subchannels, since we are now connected.
destroy_unselected_subchannels_locked(p); destroy_unselected_subchannels_locked(p);
// Update any calls that were waiting for a pick. // Update any calls that were waiting for a pick.
pending_pick* pp; grpc_lb_policy_pick_state* pick;
while ((pp = p->pending_picks)) { while ((pick = p->pending_picks)) {
p->pending_picks = pp->next; p->pending_picks = pick->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
p->selected->connected_subchannel, "picked"); p->selected->connected_subchannel, "picked");
if (grpc_lb_pick_first_trace.enabled()) { if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p", "Servicing pending pick with selected subchannel %p",
(void*)p->selected); (void*)p->selected);
} }
GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
} }
// Renew notification. // Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(sd); grpc_lb_subchannel_data_start_connectivity_watch(sd);

@ -41,31 +41,6 @@
grpc_core::TraceFlag grpc_lb_round_robin_trace(false, "round_robin"); grpc_core::TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
namespace {
/** List of entities waiting for a pick.
*
* Once a pick is available, \a target is updated and \a on_complete called. */
struct pending_pick {
pending_pick* next;
/* output argument where to store the pick()ed user_data. It'll be NULL if no
* such data is present or there's an error (the definite test for errors is
* \a target being NULL). */
void** user_data;
/* bitmask passed to pick() and used for selective cancelling. See
* grpc_lb_policy_cancel_picks() */
uint32_t initial_metadata_flags;
/* output argument where to store the pick()ed connected subchannel, or NULL
* upon error. */
grpc_connected_subchannel** target;
/* to be invoked once the pick() has completed (regardless of success) */
grpc_closure* on_complete;
};
} // namespace
typedef struct round_robin_lb_policy { typedef struct round_robin_lb_policy {
/** base policy: must be first */ /** base policy: must be first */
grpc_lb_policy base; grpc_lb_policy base;
@ -77,7 +52,7 @@ typedef struct round_robin_lb_policy {
/** are we shutting down? */ /** are we shutting down? */
bool shutdown; bool shutdown;
/** List of picks that are waiting on connectivity */ /** List of picks that are waiting on connectivity */
pending_pick* pending_picks; grpc_lb_policy_pick_state* pending_picks;
/** our connectivity state tracker */ /** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker; grpc_connectivity_state_tracker state_tracker;
@ -169,19 +144,27 @@ static void rr_destroy(grpc_lb_policy* pol) {
gpr_free(p); gpr_free(p);
} }
static void rr_shutdown_locked(grpc_lb_policy* pol) { static void rr_shutdown_locked(grpc_lb_policy* pol,
grpc_lb_policy* new_policy) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol; round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) { if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p); gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p);
} }
p->shutdown = true; p->shutdown = true;
pending_pick* pp; grpc_lb_policy_pick_state* pick;
while ((pp = p->pending_picks) != nullptr) { while ((pick = p->pending_picks) != nullptr) {
p->pending_picks = pp->next; p->pending_picks = pick->next;
*pp->target = nullptr; if (new_policy != nullptr) {
GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error)); // Hand off to new LB policy.
gpr_free(pp); if (grpc_lb_policy_pick_locked(new_policy, pick)) {
// Synchronous return; schedule callback.
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
} else {
pick->connected_subchannel = nullptr;
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
}
} }
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN, grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "rr_shutdown"); GRPC_ERROR_REF(error), "rr_shutdown");
@ -201,19 +184,18 @@ static void rr_shutdown_locked(grpc_lb_policy* pol) {
} }
static void rr_cancel_pick_locked(grpc_lb_policy* pol, static void rr_cancel_pick_locked(grpc_lb_policy* pol,
grpc_connected_subchannel** target, grpc_lb_policy_pick_state* pick,
grpc_error* error) { grpc_error* error) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol; round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
pending_pick* pp = p->pending_picks; grpc_lb_policy_pick_state* pp = p->pending_picks;
p->pending_picks = nullptr; p->pending_picks = nullptr;
while (pp != nullptr) { while (pp != nullptr) {
pending_pick* next = pp->next; grpc_lb_policy_pick_state* next = pp->next;
if (pp->target == target) { if (pp == pick) {
*target = nullptr; pick->connected_subchannel = nullptr;
GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1)); "Pick cancelled", &error, 1));
gpr_free(pp);
} else { } else {
pp->next = p->pending_picks; pp->next = p->pending_picks;
p->pending_picks = pp; p->pending_picks = pp;
@ -228,22 +210,21 @@ static void rr_cancel_picks_locked(grpc_lb_policy* pol,
uint32_t initial_metadata_flags_eq, uint32_t initial_metadata_flags_eq,
grpc_error* error) { grpc_error* error) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol; round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
pending_pick* pp = p->pending_picks; grpc_lb_policy_pick_state* pick = p->pending_picks;
p->pending_picks = nullptr; p->pending_picks = nullptr;
while (pp != nullptr) { while (pick != nullptr) {
pending_pick* next = pp->next; grpc_lb_policy_pick_state* next = pick->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) { initial_metadata_flags_eq) {
*pp->target = nullptr; pick->connected_subchannel = nullptr;
GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1)); "Pick cancelled", &error, 1));
gpr_free(pp);
} else { } else {
pp->next = p->pending_picks; pick->next = p->pending_picks;
p->pending_picks = pp; p->pending_picks = pick;
} }
pp = next; pick = next;
} }
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
@ -268,13 +249,10 @@ static void rr_exit_idle_locked(grpc_lb_policy* pol) {
} }
static int rr_pick_locked(grpc_lb_policy* pol, static int rr_pick_locked(grpc_lb_policy* pol,
const grpc_lb_policy_pick_args* pick_args, grpc_lb_policy_pick_state* pick) {
grpc_connected_subchannel** target,
grpc_call_context_element* context, void** user_data,
grpc_closure* on_complete) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol; round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
if (grpc_lb_round_robin_trace.enabled()) { if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", (void*)pol, gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", pol,
p->shutdown); p->shutdown);
} }
GPR_ASSERT(!p->shutdown); GPR_ASSERT(!p->shutdown);
@ -284,18 +262,18 @@ static int rr_pick_locked(grpc_lb_policy* pol,
/* readily available, report right away */ /* readily available, report right away */
grpc_lb_subchannel_data* sd = grpc_lb_subchannel_data* sd =
&p->subchannel_list->subchannels[next_ready_index]; &p->subchannel_list->subchannels[next_ready_index];
*target = pick->connected_subchannel =
GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked"); GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked");
if (user_data != nullptr) { if (pick->user_data != nullptr) {
*user_data = sd->user_data; *pick->user_data = sd->user_data;
} }
if (grpc_lb_round_robin_trace.enabled()) { if (grpc_lb_round_robin_trace.enabled()) {
gpr_log( gpr_log(
GPR_DEBUG, GPR_DEBUG,
"[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
"index %lu)", "index %" PRIuPTR ")",
(void*)p, (void*)sd->subchannel, (void*)*target, p, sd->subchannel, pick->connected_subchannel, sd->subchannel_list,
(void*)sd->subchannel_list, (unsigned long)next_ready_index); next_ready_index);
} }
/* only advance the last picked pointer if the selection was used */ /* only advance the last picked pointer if the selection was used */
update_last_ready_subchannel_index_locked(p, next_ready_index); update_last_ready_subchannel_index_locked(p, next_ready_index);
@ -306,13 +284,8 @@ static int rr_pick_locked(grpc_lb_policy* pol,
if (!p->started_picking) { if (!p->started_picking) {
start_picking_locked(p); start_picking_locked(p);
} }
pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp)); pick->next = p->pending_picks;
pp->next = p->pending_picks; p->pending_picks = pick;
pp->target = target;
pp->on_complete = on_complete;
pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->user_data = user_data;
p->pending_picks = pp;
return 0; return 0;
} }
@ -495,13 +468,13 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
// picks, update the last picked pointer // picks, update the last picked pointer
update_last_ready_subchannel_index_locked(p, next_ready_index); update_last_ready_subchannel_index_locked(p, next_ready_index);
} }
pending_pick* pp; grpc_lb_policy_pick_state* pick;
while ((pp = p->pending_picks)) { while ((pick = p->pending_picks)) {
p->pending_picks = pp->next; p->pending_picks = pick->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
selected->connected_subchannel, "rr_picked"); selected->connected_subchannel, "rr_picked");
if (pp->user_data != nullptr) { if (pick->user_data != nullptr) {
*pp->user_data = selected->user_data; *pick->user_data = selected->user_data;
} }
if (grpc_lb_round_robin_trace.enabled()) { if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
@ -510,8 +483,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
(void*)p, (void*)selected->subchannel, (void*)p, (void*)selected->subchannel,
(void*)p->subchannel_list, (unsigned long)next_ready_index); (void*)p->subchannel_list, (unsigned long)next_ready_index);
} }
GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
} }
} }
// Renew notification. // Renew notification.

@ -213,13 +213,13 @@ void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list,
void grpc_lb_subchannel_list_ref_for_connectivity_watch( void grpc_lb_subchannel_list_ref_for_connectivity_watch(
grpc_lb_subchannel_list* subchannel_list, const char* reason) { grpc_lb_subchannel_list* subchannel_list, const char* reason) {
GRPC_LB_POLICY_WEAK_REF(subchannel_list->policy, reason); GRPC_LB_POLICY_REF(subchannel_list->policy, reason);
grpc_lb_subchannel_list_ref(subchannel_list, reason); grpc_lb_subchannel_list_ref(subchannel_list, reason);
} }
void grpc_lb_subchannel_list_unref_for_connectivity_watch( void grpc_lb_subchannel_list_unref_for_connectivity_watch(
grpc_lb_subchannel_list* subchannel_list, const char* reason) { grpc_lb_subchannel_list* subchannel_list, const char* reason) {
GRPC_LB_POLICY_WEAK_UNREF(subchannel_list->policy, reason); GRPC_LB_POLICY_UNREF(subchannel_list->policy, reason);
grpc_lb_subchannel_list_unref(subchannel_list, reason); grpc_lb_subchannel_list_unref(subchannel_list, reason);
} }

Loading…
Cancel
Save