Merge pull request #13870 from markdroth/c++_lb_policy

C++ LB policy API
pull/14455/head
Mark D. Roth 7 years ago committed by GitHub
commit 824b21e13a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 2
      build.yaml
  3. 1
      gRPC-C++.podspec
  4. 2
      gRPC-Core.podspec
  5. 1
      grpc.gemspec
  6. 1
      package.xml
  7. 110
      src/core/ext/filters/client_channel/client_channel.cc
  8. 4
      src/core/ext/filters/client_channel/client_channel_plugin.cc
  9. 123
      src/core/ext/filters/client_channel/lb_policy.cc
  10. 331
      src/core/ext/filters/client_channel/lb_policy.h
  11. 2953
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  12. 29
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
  13. 523
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  14. 584
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  15. 28
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
  16. 16
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  17. 14
      src/core/ext/filters/client_channel/lb_policy_factory.cc
  18. 50
      src/core/ext/filters/client_channel/lb_policy_factory.h
  19. 91
      src/core/ext/filters/client_channel/lb_policy_registry.cc
  20. 37
      src/core/ext/filters/client_channel/lb_policy_registry.h
  21. 1
      tools/doxygen/Doxyfile.core.internal
  22. 4
      tools/run_tests/generated/sources_and_headers.json

@ -1148,7 +1148,6 @@ grpc_cc_library(
],
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
@ -1177,7 +1176,6 @@ grpc_cc_library(
],
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",

@ -518,7 +518,6 @@ filegroups:
- name: grpc_lb_policy_grpclb
headers:
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
@ -539,7 +538,6 @@ filegroups:
- name: grpc_lb_policy_grpclb_secure
headers:
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h

@ -504,7 +504,6 @@ Pod::Spec.new do |s|
'src/core/lib/transport/transport_impl.h',
'src/core/lib/debug/trace.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h',

@ -448,7 +448,6 @@ Pod::Spec.new do |s|
'src/core/lib/transport/transport_impl.h',
'src/core/lib/debug/trace.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h',
@ -937,7 +936,6 @@ Pod::Spec.new do |s|
'src/core/lib/transport/transport_impl.h',
'src/core/lib/debug/trace.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h',

@ -374,7 +374,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/transport/transport_impl.h )
s.files += %w( src/core/lib/debug/trace.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h )

@ -381,7 +381,6 @@
<file baseinstalldir="/" name="src/core/lib/transport/transport_impl.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/debug/trace.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" role="src" />

@ -175,7 +175,7 @@ typedef struct client_channel_channel_data {
/** combiner protecting all variables below in this data structure */
grpc_combiner* combiner;
/** currently active load balancer */
grpc_lb_policy* lb_policy;
grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy;
/** retry throttle data */
grpc_server_retry_throttle_data* retry_throttle_data;
/** maps method names to method_parameters structs */
@ -212,7 +212,7 @@ typedef struct {
channel_data* chand;
/** used as an identifier, don't dereference it because the LB policy may be
* non-existing when the callback is run */
grpc_lb_policy* lb_policy;
grpc_core::LoadBalancingPolicy* lb_policy;
grpc_closure closure;
} reresolution_request_args;
@ -223,11 +223,11 @@ typedef struct {
channel_data* chand;
grpc_closure on_changed;
grpc_connectivity_state state;
grpc_lb_policy* lb_policy;
grpc_core::LoadBalancingPolicy* lb_policy;
} lb_policy_connectivity_watcher;
static void watch_lb_policy_locked(channel_data* chand,
grpc_lb_policy* lb_policy,
grpc_core::LoadBalancingPolicy* lb_policy,
grpc_connectivity_state current_state);
static void set_channel_connectivity_state_locked(channel_data* chand,
@ -241,15 +241,13 @@ static void set_channel_connectivity_state_locked(channel_data* chand,
if (chand->lb_policy != nullptr) {
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
/* cancel picks with wait_for_ready=false */
grpc_lb_policy_cancel_picks_locked(
chand->lb_policy,
chand->lb_policy->CancelMatchingPicksLocked(
/* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
/* check= */ 0, GRPC_ERROR_REF(error));
} else if (state == GRPC_CHANNEL_SHUTDOWN) {
/* cancel all picks */
grpc_lb_policy_cancel_picks_locked(chand->lb_policy,
/* mask= */ 0, /* check= */ 0,
GRPC_ERROR_REF(error));
chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0,
GRPC_ERROR_REF(error));
}
}
if (grpc_client_channel_trace.enabled()) {
@ -263,7 +261,7 @@ static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) {
lb_policy_connectivity_watcher* w =
static_cast<lb_policy_connectivity_watcher*>(arg);
/* check if the notification is for the latest policy */
if (w->lb_policy == w->chand->lb_policy) {
if (w->lb_policy == w->chand->lb_policy.get()) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand,
w->lb_policy, grpc_connectivity_state_name(w->state));
@ -279,7 +277,7 @@ static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) {
}
static void watch_lb_policy_locked(channel_data* chand,
grpc_lb_policy* lb_policy,
grpc_core::LoadBalancingPolicy* lb_policy,
grpc_connectivity_state current_state) {
lb_policy_connectivity_watcher* w =
static_cast<lb_policy_connectivity_watcher*>(gpr_malloc(sizeof(*w)));
@ -289,8 +287,7 @@ static void watch_lb_policy_locked(channel_data* chand,
grpc_combiner_scheduler(chand->combiner));
w->state = current_state;
w->lb_policy = lb_policy;
grpc_lb_policy_notify_on_state_change_locked(lb_policy, &w->state,
&w->on_changed);
lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed);
}
static void start_resolving_locked(channel_data* chand) {
@ -371,7 +368,7 @@ static void request_reresolution_locked(void* arg, grpc_error* error) {
channel_data* chand = args->chand;
// If this invocation is for a stale LB policy, treat it as an LB shutdown
// signal.
if (args->lb_policy != chand->lb_policy || error != GRPC_ERROR_NONE ||
if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE ||
chand->resolver == nullptr) {
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution");
gpr_free(args);
@ -382,7 +379,7 @@ static void request_reresolution_locked(void* arg, grpc_error* error) {
}
chand->resolver->RequestReresolutionLocked();
// Give back the closure to the LB policy.
grpc_lb_policy_set_reresolve_closure_locked(chand->lb_policy, &args->closure);
chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
}
static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
@ -393,9 +390,10 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
}
// Extract the following fields from the resolver result, if non-NULL.
bool lb_policy_updated = false;
bool lb_policy_created = false;
char* lb_policy_name_dup = nullptr;
bool lb_policy_name_changed = false;
grpc_lb_policy* new_lb_policy = nullptr;
grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy;
char* service_config_json = nullptr;
grpc_server_retry_throttle_data* retry_throttle_data = nullptr;
grpc_slice_hash_table* method_params_table = nullptr;
@ -433,10 +431,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
grpc_lb_policy_args lb_policy_args;
lb_policy_args.args = chand->resolver_result;
lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy_args.combiner = chand->combiner;
// Check to see if we're already using the right LB policy.
// Note: It's safe to use chand->info_lb_policy_name here without
// taking a lock on chand->info_mu, because this function is the
@ -448,10 +443,17 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
// Continue using the same LB policy. Update with new addresses.
lb_policy_updated = true;
grpc_lb_policy_update_locked(chand->lb_policy, &lb_policy_args);
chand->lb_policy->UpdateLocked(*chand->resolver_result);
} else {
// Instantiate new LB policy.
new_lb_policy = grpc_lb_policy_create(lb_policy_name, &lb_policy_args);
lb_policy_created = true;
grpc_core::LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = chand->combiner;
lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy_args.args = chand->resolver_result;
new_lb_policy =
grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
lb_policy_name, lb_policy_args);
if (new_lb_policy == nullptr) {
gpr_log(GPR_ERROR, "could not create LB policy \"%s\"",
lb_policy_name);
@ -460,12 +462,11 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
static_cast<reresolution_request_args*>(
gpr_zalloc(sizeof(*args)));
args->chand = chand;
args->lb_policy = new_lb_policy;
args->lb_policy = new_lb_policy.get();
GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
grpc_combiner_scheduler(chand->combiner));
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
grpc_lb_policy_set_reresolve_closure_locked(new_lb_policy,
&args->closure);
new_lb_policy->SetReresolutionClosureLocked(&args->closure);
}
}
// Find service config.
@ -548,14 +549,14 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
if (chand->lb_policy != nullptr) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand,
chand->lb_policy);
chand->lb_policy.get());
}
grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
chand->interested_parties);
grpc_lb_policy_shutdown_locked(chand->lb_policy, new_lb_policy);
GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get());
chand->lb_policy.reset();
}
chand->lb_policy = new_lb_policy;
chand->lb_policy = std::move(new_lb_policy);
}
// Now that we've swapped out the relevant fields of chand, check for
// error or shutdown.
@ -583,21 +584,20 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_error* state_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
if (new_lb_policy != nullptr) {
if (lb_policy_created) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand);
}
GRPC_ERROR_UNREF(state_error);
state =
grpc_lb_policy_check_connectivity_locked(new_lb_policy, &state_error);
grpc_pollset_set_add_pollset_set(new_lb_policy->interested_parties,
state = chand->lb_policy->CheckConnectivityLocked(&state_error);
grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(),
chand->interested_parties);
GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
if (chand->exit_idle_when_lb_policy_arrives) {
grpc_lb_policy_exit_idle_locked(new_lb_policy);
chand->lb_policy->ExitIdleLocked();
chand->exit_idle_when_lb_policy_arrives = false;
}
watch_lb_policy_locked(chand, new_lb_policy, state);
watch_lb_policy_locked(chand, chand->lb_policy.get(), state);
}
if (!lb_policy_updated) {
set_channel_connectivity_state_locked(
@ -632,8 +632,8 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
op->send_ping.on_ack,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
} else {
grpc_lb_policy_ping_one_locked(
chand->lb_policy, op->send_ping.on_initiate, op->send_ping.on_ack);
chand->lb_policy->PingOneLocked(op->send_ping.on_initiate,
op->send_ping.on_ack);
op->bind_pollset = nullptr;
}
op->send_ping.on_initiate = nullptr;
@ -652,11 +652,9 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
}
if (chand->lb_policy != nullptr) {
grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
chand->interested_parties);
grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr);
GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
chand->lb_policy = nullptr;
chand->lb_policy.reset();
}
}
GRPC_ERROR_UNREF(op->disconnect_with_error);
@ -786,10 +784,9 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
grpc_client_channel_factory_unref(chand->client_channel_factory);
}
if (chand->lb_policy != nullptr) {
grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
chand->interested_parties);
grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr);
GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
chand->lb_policy.reset();
}
gpr_free(chand->info_lb_policy_name);
gpr_free(chand->info_service_config_json);
@ -849,7 +846,7 @@ typedef struct client_channel_call_data {
grpc_subchannel_call* subchannel_call;
grpc_error* error;
grpc_lb_policy_pick_state pick;
grpc_core::LoadBalancingPolicy::PickState pick;
grpc_closure lb_pick_closure;
grpc_closure lb_pick_cancel_closure;
@ -1070,15 +1067,14 @@ static void pick_callback_cancel_locked(void* arg, grpc_error* error) {
if (error != GRPC_ERROR_NONE && chand->lb_policy != nullptr) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
chand, calld, chand->lb_policy);
chand, calld, chand->lb_policy.get());
}
grpc_lb_policy_cancel_pick_locked(chand->lb_policy, &calld->pick,
GRPC_ERROR_REF(error));
chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error));
}
GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
}
// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
// Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
// Unrefs the LB policy and invokes async_pick_done_locked().
static void pick_callback_done_locked(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
@ -1092,15 +1088,14 @@ static void pick_callback_done_locked(void* arg, grpc_error* error) {
GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
}
// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
// If the pick was completed synchronously, unrefs the LB policy and
// returns true.
// Starts a pick on chand->lb_policy.
// Returns true if pick is completed synchronously.
static bool pick_callback_start_locked(grpc_call_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p",
chand, calld, chand->lb_policy);
chand, calld, chand->lb_policy.get());
}
apply_service_config_to_call_locked(elem);
// If the application explicitly set wait_for_ready, use that.
@ -1130,10 +1125,9 @@ static bool pick_callback_start_locked(grpc_call_element* elem) {
grpc_combiner_scheduler(chand->combiner));
calld->pick.on_complete = &calld->lb_pick_closure;
GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
const bool pick_done =
grpc_lb_policy_pick_locked(chand->lb_policy, &calld->pick);
const bool pick_done = chand->lb_policy->PickLocked(&calld->pick);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
// Pick completed synchronously.
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously",
chand, calld);
@ -1498,7 +1492,7 @@ const grpc_channel_filter grpc_client_channel_filter = {
static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
channel_data* chand = static_cast<channel_data*>(arg);
if (chand->lb_policy != nullptr) {
grpc_lb_policy_exit_idle_locked(chand->lb_policy);
chand->lb_policy->ExitIdleLocked();
} else {
chand->exit_idle_when_lb_policy_arrives = true;
if (!chand->started_resolving && chand->resolver != nullptr) {

@ -63,7 +63,7 @@ static bool set_default_host_if_unset(grpc_channel_stack_builder* builder,
}
void grpc_client_channel_init(void) {
grpc_lb_policy_registry_init();
grpc_core::LoadBalancingPolicyRegistry::Builder::InitRegistry();
grpc_core::ResolverRegistry::Builder::InitRegistry();
grpc_retry_throttle_map_init();
grpc_proxy_mapper_registry_init();
@ -83,5 +83,5 @@ void grpc_client_channel_shutdown(void) {
grpc_proxy_mapper_registry_shutdown();
grpc_retry_throttle_map_shutdown();
grpc_core::ResolverRegistry::Builder::ShutdownRegistry();
grpc_lb_policy_registry_shutdown();
grpc_core::LoadBalancingPolicyRegistry::Builder::ShutdownRegistry();
}

@ -22,121 +22,36 @@
grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount(
false, "lb_policy_refcount");
void grpc_lb_policy_init(grpc_lb_policy* policy,
const grpc_lb_policy_vtable* vtable,
grpc_combiner* combiner) {
policy->vtable = vtable;
gpr_ref_init(&policy->refs, 1);
policy->interested_parties = grpc_pollset_set_create();
policy->combiner = GRPC_COMBINER_REF(combiner, "lb_policy");
}
#ifndef NDEBUG
void grpc_lb_policy_ref(grpc_lb_policy* lb_policy, const char* file, int line,
const char* reason) {
if (grpc_trace_lb_policy_refcount.enabled()) {
gpr_atm old_refs = gpr_atm_no_barrier_load(&lb_policy->refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"LB_POLICY:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", lb_policy,
old_refs, old_refs + 1, reason);
}
#else
void grpc_lb_policy_ref(grpc_lb_policy* lb_policy) {
#endif
gpr_ref(&lb_policy->refs);
}
#ifndef NDEBUG
void grpc_lb_policy_unref(grpc_lb_policy* lb_policy, const char* file, int line,
const char* reason) {
if (grpc_trace_lb_policy_refcount.enabled()) {
gpr_atm old_refs = gpr_atm_no_barrier_load(&lb_policy->refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"LB_POLICY:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", lb_policy,
old_refs, old_refs - 1, reason);
}
#else
void grpc_lb_policy_unref(grpc_lb_policy* lb_policy) {
#endif
if (gpr_unref(&lb_policy->refs)) {
grpc_pollset_set_destroy(lb_policy->interested_parties);
grpc_combiner* combiner = lb_policy->combiner;
lb_policy->vtable->destroy(lb_policy);
GRPC_COMBINER_UNREF(combiner, "lb_policy");
}
}
void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy,
grpc_lb_policy* new_policy) {
policy->vtable->shutdown_locked(policy, new_policy);
}
int grpc_lb_policy_pick_locked(grpc_lb_policy* policy,
grpc_lb_policy_pick_state* pick) {
return policy->vtable->pick_locked(policy, pick);
}
void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy,
grpc_lb_policy_pick_state* pick,
grpc_error* error) {
policy->vtable->cancel_pick_locked(policy, pick, error);
}
void grpc_lb_policy_cancel_picks_locked(grpc_lb_policy* policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) {
policy->vtable->cancel_picks_locked(policy, initial_metadata_flags_mask,
initial_metadata_flags_eq, error);
}
namespace grpc_core {
void grpc_lb_policy_exit_idle_locked(grpc_lb_policy* policy) {
policy->vtable->exit_idle_locked(policy);
}
LoadBalancingPolicy::LoadBalancingPolicy(const Args& args)
: InternallyRefCountedWithTracing(&grpc_trace_lb_policy_refcount),
combiner_(GRPC_COMBINER_REF(args.combiner, "lb_policy")),
client_channel_factory_(args.client_channel_factory),
interested_parties_(grpc_pollset_set_create()),
request_reresolution_(nullptr) {}
void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy,
grpc_closure* on_initiate,
grpc_closure* on_ack) {
policy->vtable->ping_one_locked(policy, on_initiate, on_ack);
LoadBalancingPolicy::~LoadBalancingPolicy() {
grpc_pollset_set_destroy(interested_parties_);
GRPC_COMBINER_UNREF(combiner_, "lb_policy");
}
void grpc_lb_policy_notify_on_state_change_locked(
grpc_lb_policy* policy, grpc_connectivity_state* state,
grpc_closure* closure) {
policy->vtable->notify_on_state_change_locked(policy, state, closure);
}
grpc_connectivity_state grpc_lb_policy_check_connectivity_locked(
grpc_lb_policy* policy, grpc_error** connectivity_error) {
return policy->vtable->check_connectivity_locked(policy, connectivity_error);
}
void grpc_lb_policy_update_locked(grpc_lb_policy* policy,
const grpc_lb_policy_args* lb_policy_args) {
policy->vtable->update_locked(policy, lb_policy_args);
}
void grpc_lb_policy_set_reresolve_closure_locked(
grpc_lb_policy* policy, grpc_closure* request_reresolution) {
GPR_ASSERT(policy->request_reresolution == nullptr);
policy->request_reresolution = request_reresolution;
}
void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy,
grpc_core::TraceFlag* grpc_lb_trace,
grpc_error* error) {
if (policy->request_reresolution != nullptr) {
GRPC_CLOSURE_SCHED(policy->request_reresolution, error);
policy->request_reresolution = nullptr;
void LoadBalancingPolicy::TryReresolutionLocked(
grpc_core::TraceFlag* grpc_lb_trace, grpc_error* error) {
if (request_reresolution_ != nullptr) {
GRPC_CLOSURE_SCHED(request_reresolution_, error);
request_reresolution_ = nullptr;
if (grpc_lb_trace->enabled()) {
gpr_log(GPR_DEBUG,
"%s %p: scheduling re-resolution closure with error=%s.",
grpc_lb_trace->name(), policy, grpc_error_string(error));
grpc_lb_trace->name(), this, grpc_error_string(error));
}
} else {
if (grpc_lb_trace->enabled()) {
gpr_log(GPR_DEBUG, "%s %p: no available re-resolution closure.",
grpc_lb_trace->name(), policy);
grpc_lb_trace->name(), this);
}
}
}
} // namespace grpc_core

@ -19,182 +19,181 @@
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H
#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
/** A load balancing policy: specified by a vtable and a struct (which
is expected to be extended to contain some parameters) */
typedef struct grpc_lb_policy grpc_lb_policy;
typedef struct grpc_lb_policy_vtable grpc_lb_policy_vtable;
typedef struct grpc_lb_policy_args grpc_lb_policy_args;
extern grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount;
struct grpc_lb_policy {
const grpc_lb_policy_vtable* vtable;
gpr_refcount refs;
/* owned pointer to interested parties in load balancing decisions */
grpc_pollset_set* interested_parties;
/* combiner under which lb_policy actions take place */
grpc_combiner* combiner;
/* callback to force a re-resolution */
grpc_closure* request_reresolution;
};
/// State used for an LB pick.
typedef struct grpc_lb_policy_pick_state {
/// Initial metadata associated with the picking call.
grpc_metadata_batch* initial_metadata;
/// Bitmask used for selective cancelling. See \a
/// grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in
/// grpc_types.h.
uint32_t initial_metadata_flags;
/// Storage for LB token in \a initial_metadata, or NULL if not used.
grpc_linked_mdelem lb_token_mdelem_storage;
/// Closure to run when pick is complete, if not completed synchronously.
grpc_closure* on_complete;
/// Will be set to the selected subchannel, or nullptr on failure or when
/// the LB policy decides to drop the call.
grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
/// Will be populated with context to pass to the subchannel call, if needed.
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
/// Upon success, \a *user_data will be set to whatever opaque information
/// may need to be propagated from the LB policy, or NULL if not needed.
void** user_data;
/// Next pointer. For internal use by LB policy.
struct grpc_lb_policy_pick_state* next;
} grpc_lb_policy_pick_state;
struct grpc_lb_policy_vtable {
void (*destroy)(grpc_lb_policy* policy);
/// \see grpc_lb_policy_shutdown_locked().
void (*shutdown_locked)(grpc_lb_policy* policy, grpc_lb_policy* new_policy);
/** \see grpc_lb_policy_pick */
int (*pick_locked)(grpc_lb_policy* policy, grpc_lb_policy_pick_state* pick);
/** \see grpc_lb_policy_cancel_pick */
void (*cancel_pick_locked)(grpc_lb_policy* policy,
grpc_lb_policy_pick_state* pick,
namespace grpc_core {
/// Interface for load balancing policies.
///
/// Note: All methods with a "Locked" suffix must be called from the
/// combiner passed to the constructor.
///
/// Any I/O done by the LB policy should be done under the pollset_set
/// returned by \a interested_parties().
class LoadBalancingPolicy
: public InternallyRefCountedWithTracing<LoadBalancingPolicy> {
public:
struct Args {
/// The combiner under which all LB policy calls will be run.
/// Policy does NOT take ownership of the reference to the combiner.
// TODO(roth): Once we have a C++-like interface for combiners, this
// API should change to take a smart pointer that does pass ownership
// of a reference.
grpc_combiner* combiner = nullptr;
/// Used to create channels and subchannels.
grpc_client_channel_factory* client_channel_factory = nullptr;
/// Channel args from the resolver.
/// Note that the LB policy gets the set of addresses from the
/// GRPC_ARG_LB_ADDRESSES channel arg.
grpc_channel_args* args = nullptr;
};
/// State used for an LB pick.
struct PickState {
/// Initial metadata associated with the picking call.
grpc_metadata_batch* initial_metadata;
/// Bitmask used for selective cancelling. See
/// \a CancelMatchingPicksLocked() and \a GRPC_INITIAL_METADATA_* in
/// grpc_types.h.
uint32_t initial_metadata_flags;
/// Storage for LB token in \a initial_metadata, or nullptr if not used.
grpc_linked_mdelem lb_token_mdelem_storage;
/// Closure to run when pick is complete, if not completed synchronously.
grpc_closure* on_complete;
/// Will be set to the selected subchannel, or nullptr on failure or when
/// the LB policy decides to drop the call.
RefCountedPtr<ConnectedSubchannel> connected_subchannel;
/// Will be populated with context to pass to the subchannel call, if
/// needed.
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
/// Upon success, \a *user_data will be set to whatever opaque information
/// may need to be propagated from the LB policy, or nullptr if not needed.
// TODO(roth): As part of revamping our metadata APIs, try to find a
// way to clean this up and C++-ify it.
void** user_data;
/// Next pointer. For internal use by LB policy.
PickState* next;
};
// Not copyable nor movable.
LoadBalancingPolicy(const LoadBalancingPolicy&) = delete;
LoadBalancingPolicy& operator=(const LoadBalancingPolicy&) = delete;
/// Updates the policy with a new set of \a args from the resolver.
/// Note that the LB policy gets the set of addresses from the
/// GRPC_ARG_LB_ADDRESSES channel arg.
virtual void UpdateLocked(const grpc_channel_args& args) GRPC_ABSTRACT;
/// Finds an appropriate subchannel for a call, based on data in \a pick.
/// \a pick must remain alive until the pick is complete.
///
/// If the pick succeeds and a result is known immediately, returns true.
/// Otherwise, \a pick->on_complete will be invoked once the pick is
/// complete with its error argument set to indicate success or failure.
virtual bool PickLocked(PickState* pick) GRPC_ABSTRACT;
/// Cancels \a pick.
/// The \a on_complete callback of the pending pick will be invoked with
/// \a pick->connected_subchannel set to null.
virtual void CancelPickLocked(PickState* pick,
grpc_error* error) GRPC_ABSTRACT;
/// Cancels all pending picks for which their \a initial_metadata_flags (as
/// given in the call to \a PickLocked()) matches
/// \a initial_metadata_flags_eq when ANDed with
/// \a initial_metadata_flags_mask.
virtual void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) GRPC_ABSTRACT;
/// Requests a notification when the connectivity state of the policy
/// changes from \a *state. When that happens, sets \a *state to the
/// new state and schedules \a closure.
virtual void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
grpc_closure* closure) GRPC_ABSTRACT;
/// Returns the policy's current connectivity state. Sets \a error to
/// the associated error, if any.
virtual grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) GRPC_ABSTRACT;
/// Hands off pending picks to \a new_policy.
virtual void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy)
GRPC_ABSTRACT;
/// Performs a connected subchannel ping via \a ConnectedSubchannel::Ping()
/// against one of the connected subchannels managed by the policy.
/// Note: This is intended only for use in tests.
virtual void PingOneLocked(grpc_closure* on_initiate,
grpc_closure* on_ack) GRPC_ABSTRACT;
/// Tries to enter a READY connectivity state.
/// TODO(roth): As part of restructuring how we handle IDLE state,
/// consider whether this method is still needed.
virtual void ExitIdleLocked() GRPC_ABSTRACT;
void Orphan() override {
// Invoke ShutdownAndUnrefLocked() inside of the combiner.
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE(&LoadBalancingPolicy::ShutdownAndUnrefLocked, this,
grpc_combiner_scheduler(combiner_)),
GRPC_ERROR_NONE);
}
/// Sets the re-resolution closure to \a request_reresolution.
void SetReresolutionClosureLocked(grpc_closure* request_reresolution) {
GPR_ASSERT(request_reresolution_ == nullptr);
request_reresolution_ = request_reresolution;
}
grpc_pollset_set* interested_parties() const { return interested_parties_; }
GRPC_ABSTRACT_BASE_CLASS
protected:
explicit LoadBalancingPolicy(const Args& args);
virtual ~LoadBalancingPolicy();
grpc_combiner* combiner() const { return combiner_; }
grpc_client_channel_factory* client_channel_factory() const {
return client_channel_factory_;
}
/// Shuts down the policy. Any pending picks that have not been
/// handed off to a new policy via HandOffPendingPicksLocked() will be
/// failed.
virtual void ShutdownLocked() GRPC_ABSTRACT;
/// Tries to request a re-resolution.
void TryReresolutionLocked(grpc_core::TraceFlag* grpc_lb_trace,
grpc_error* error);
/** \see grpc_lb_policy_cancel_picks */
void (*cancel_picks_locked)(grpc_lb_policy* policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error);
/** \see grpc_lb_policy_ping_one */
void (*ping_one_locked)(grpc_lb_policy* policy, grpc_closure* on_initiate,
grpc_closure* on_ack);
/** Try to enter a READY connectivity state */
void (*exit_idle_locked)(grpc_lb_policy* policy);
/** check the current connectivity of the lb_policy */
grpc_connectivity_state (*check_connectivity_locked)(
grpc_lb_policy* policy, grpc_error** connectivity_error);
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy. Calling with a NULL \a
state cancels the subscription. */
void (*notify_on_state_change_locked)(grpc_lb_policy* policy,
grpc_connectivity_state* state,
grpc_closure* closure);
void (*update_locked)(grpc_lb_policy* policy,
const grpc_lb_policy_args* args);
private:
static void ShutdownAndUnrefLocked(void* arg, grpc_error* ignored) {
LoadBalancingPolicy* policy = static_cast<LoadBalancingPolicy*>(arg);
policy->ShutdownLocked();
policy->Unref();
}
/// Combiner under which LB policy actions take place.
grpc_combiner* combiner_;
/// Client channel factory, used to create channels and subchannels.
grpc_client_channel_factory* client_channel_factory_;
/// Owned pointer to interested parties in load balancing decisions.
grpc_pollset_set* interested_parties_;
/// Callback to force a re-resolution.
grpc_closure* request_reresolution_;
};
#ifndef NDEBUG
#define GRPC_LB_POLICY_REF(p, r) \
grpc_lb_policy_ref((p), __FILE__, __LINE__, (r))
#define GRPC_LB_POLICY_UNREF(p, r) \
grpc_lb_policy_unref((p), __FILE__, __LINE__, (r))
void grpc_lb_policy_ref(grpc_lb_policy* policy, const char* file, int line,
const char* reason);
void grpc_lb_policy_unref(grpc_lb_policy* policy, const char* file, int line,
const char* reason);
#else // !NDEBUG
#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p))
#define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p))
void grpc_lb_policy_ref(grpc_lb_policy* policy);
void grpc_lb_policy_unref(grpc_lb_policy* policy);
#endif
/** called by concrete implementations to initialize the base struct */
void grpc_lb_policy_init(grpc_lb_policy* policy,
const grpc_lb_policy_vtable* vtable,
grpc_combiner* combiner);
/// Shuts down \a policy.
/// If \a new_policy is non-null, any pending picks will be restarted
/// on that policy; otherwise, they will be failed.
void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy,
grpc_lb_policy* new_policy);
/** Finds an appropriate subchannel for a call, based on data in \a pick.
\a pick must remain alive until the pick is complete.
If the pick succeeds and a result is known immediately, a non-zero
value will be returned. Otherwise, \a pick->on_complete will be invoked
once the pick is complete with its error argument set to indicate
success or failure.
Any IO should be done under the \a interested_parties \a grpc_pollset_set
in the \a grpc_lb_policy struct. */
int grpc_lb_policy_pick_locked(grpc_lb_policy* policy,
grpc_lb_policy_pick_state* pick);
/** Perform a connected subchannel ping (see \a
grpc_core::ConnectedSubchannel::Ping)
against one of the connected subchannels managed by \a policy. */
void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy,
grpc_closure* on_initiate,
grpc_closure* on_ack);
/** Cancel picks for \a pick.
The \a on_complete callback of the pending picks will be invoked with \a
*target set to NULL. */
void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy,
grpc_lb_policy_pick_state* pick,
grpc_error* error);
/** Cancel all pending picks for which their \a initial_metadata_flags (as given
in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq
when AND'd with \a initial_metadata_flags_mask */
void grpc_lb_policy_cancel_picks_locked(grpc_lb_policy* policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error);
/** Try to enter a READY connectivity state */
void grpc_lb_policy_exit_idle_locked(grpc_lb_policy* policy);
/* Call notify when the connectivity state of a channel changes from \a *state.
* Updates \a *state with the new state of the policy */
void grpc_lb_policy_notify_on_state_change_locked(
grpc_lb_policy* policy, grpc_connectivity_state* state,
grpc_closure* closure);
grpc_connectivity_state grpc_lb_policy_check_connectivity_locked(
grpc_lb_policy* policy, grpc_error** connectivity_error);
/** Update \a policy with \a lb_policy_args. */
void grpc_lb_policy_update_locked(grpc_lb_policy* policy,
const grpc_lb_policy_args* lb_policy_args);
/** Set the re-resolution closure to \a request_reresolution. */
void grpc_lb_policy_set_reresolve_closure_locked(
grpc_lb_policy* policy, grpc_closure* request_reresolution);
/** Try to request a re-resolution. It's NOT a public API; it's only for use by
the LB policy implementations. */
void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy,
grpc_core::TraceFlag* grpc_lb_trace,
grpc_error* error);
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H */

@ -1,29 +0,0 @@
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
/** Returns a load balancing factory for the glb policy, which tries to connect
* to a load balancing server to decide the next successfully connected
* subchannel to pick. */
grpc_lb_policy_factory* grpc_glb_lb_factory_create();
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H */

@ -29,194 +29,225 @@
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
grpc_core::TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
namespace grpc_core {
TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
namespace {
//
// pick_first LB policy
//
class PickFirst : public LoadBalancingPolicy {
public:
explicit PickFirst(const Args& args);
void UpdateLocked(const grpc_channel_args& args) override;
bool PickLocked(PickState* pick) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) override;
void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
grpc_closure* closure) override;
grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
private:
~PickFirst();
void ShutdownLocked() override;
void StartPickingLocked();
void DestroyUnselectedSubchannelsLocked();
static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
void SubchannelListRefForConnectivityWatch(
grpc_lb_subchannel_list* subchannel_list, const char* reason);
void SubchannelListUnrefForConnectivityWatch(
grpc_lb_subchannel_list* subchannel_list, const char* reason);
typedef struct {
/** base policy: must be first */
grpc_lb_policy base;
/** all our subchannels */
grpc_lb_subchannel_list* subchannel_list;
grpc_lb_subchannel_list* subchannel_list_ = nullptr;
/** latest pending subchannel list */
grpc_lb_subchannel_list* latest_pending_subchannel_list;
grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr;
/** selected subchannel in \a subchannel_list */
grpc_lb_subchannel_data* selected;
grpc_lb_subchannel_data* selected_ = nullptr;
/** have we started picking? */
bool started_picking;
bool started_picking_ = false;
/** are we shut down? */
bool shutdown;
bool shutdown_ = false;
/** list of picks that are waiting on connectivity */
grpc_lb_policy_pick_state* pending_picks;
PickState* pending_picks_ = nullptr;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
static void pf_destroy(grpc_lb_policy* pol) {
pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
GPR_ASSERT(p->subchannel_list == nullptr);
GPR_ASSERT(p->latest_pending_subchannel_list == nullptr);
GPR_ASSERT(p->pending_picks == nullptr);
grpc_connectivity_state_destroy(&p->state_tracker);
gpr_free(p);
grpc_subchannel_index_unref();
grpc_connectivity_state_tracker state_tracker_;
};
PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) {
GPR_ASSERT(args.client_channel_factory != nullptr);
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"pick_first");
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_DEBUG, "Pick First %p destroyed.", (void*)p);
gpr_log(GPR_DEBUG, "Pick First %p created.", this);
}
UpdateLocked(*args.args);
grpc_subchannel_index_ref();
}
static void pf_shutdown_locked(grpc_lb_policy* pol,
grpc_lb_policy* new_policy) {
pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
PickFirst::~PickFirst() {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p);
gpr_log(GPR_DEBUG, "Destroying Pick First %p", this);
}
p->shutdown = true;
grpc_lb_policy_pick_state* pick;
while ((pick = p->pending_picks) != nullptr) {
p->pending_picks = pick->next;
if (new_policy != nullptr) {
// Hand off to new LB policy.
if (grpc_lb_policy_pick_locked(new_policy, pick)) {
// Synchronous return, schedule closure.
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
} else {
pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
GPR_ASSERT(pending_picks_ == nullptr);
grpc_connectivity_state_destroy(&state_tracker_);
grpc_subchannel_index_unref();
}
void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
PickState* pick;
while ((pick = pending_picks_) != nullptr) {
pending_picks_ = pick->next;
if (new_policy->PickLocked(pick)) {
// Synchronous return, schedule closure.
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
}
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
}
void PickFirst::ShutdownLocked() {
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_DEBUG, "Pick First %p Shutting down", this);
}
shutdown_ = true;
PickState* pick;
while ((pick = pending_picks_) != nullptr) {
pending_picks_ = pick->next;
pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
}
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "shutdown");
if (p->subchannel_list != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
"pf_shutdown");
p->subchannel_list = nullptr;
if (subchannel_list_ != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "pf_shutdown");
subchannel_list_ = nullptr;
}
if (p->latest_pending_subchannel_list != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(
p->latest_pending_subchannel_list, "pf_shutdown");
p->latest_pending_subchannel_list = nullptr;
if (latest_pending_subchannel_list_ != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(latest_pending_subchannel_list_,
"pf_shutdown");
latest_pending_subchannel_list_ = nullptr;
}
grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
GRPC_ERROR_CANCELLED);
TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
static void pf_cancel_pick_locked(grpc_lb_policy* pol,
grpc_lb_policy_pick_state* pick,
grpc_error* error) {
pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
grpc_lb_policy_pick_state* pp = p->pending_picks;
p->pending_picks = nullptr;
void PickFirst::CancelPickLocked(PickState* pick, grpc_error* error) {
PickState* pp = pending_picks_;
pending_picks_ = nullptr;
while (pp != nullptr) {
grpc_lb_policy_pick_state* next = pp->next;
PickState* next = pp->next;
if (pp == pick) {
pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
pp->next = p->pending_picks;
p->pending_picks = pp;
pp->next = pending_picks_;
pending_picks_ = pp;
}
pp = next;
}
GRPC_ERROR_UNREF(error);
}
static void pf_cancel_picks_locked(grpc_lb_policy* pol,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) {
pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
grpc_lb_policy_pick_state* pick = p->pending_picks;
p->pending_picks = nullptr;
void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) {
PickState* pick = pending_picks_;
pending_picks_ = nullptr;
while (pick != nullptr) {
grpc_lb_policy_pick_state* next = pick->next;
PickState* next = pick->next;
if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
pick->next = p->pending_picks;
p->pending_picks = pick;
pick->next = pending_picks_;
pending_picks_ = pick;
}
pick = next;
}
GRPC_ERROR_UNREF(error);
}
static void start_picking_locked(pick_first_lb_policy* p) {
p->started_picking = true;
if (p->subchannel_list != nullptr &&
p->subchannel_list->num_subchannels > 0) {
p->subchannel_list->checking_subchannel = 0;
for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
if (p->subchannel_list->subchannels[i].subchannel != nullptr) {
grpc_lb_subchannel_list_ref_for_connectivity_watch(
p->subchannel_list, "connectivity_watch+start_picking");
void PickFirst::StartPickingLocked() {
started_picking_ = true;
if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels > 0) {
subchannel_list_->checking_subchannel = 0;
for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) {
if (subchannel_list_->subchannels[i].subchannel != nullptr) {
SubchannelListRefForConnectivityWatch(
subchannel_list_, "connectivity_watch+start_picking");
grpc_lb_subchannel_data_start_connectivity_watch(
&p->subchannel_list->subchannels[i]);
&subchannel_list_->subchannels[i]);
break;
}
}
}
}
static void pf_exit_idle_locked(grpc_lb_policy* pol) {
pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
if (!p->started_picking) {
start_picking_locked(p);
void PickFirst::ExitIdleLocked() {
if (!started_picking_) {
StartPickingLocked();
}
}
static int pf_pick_locked(grpc_lb_policy* pol,
grpc_lb_policy_pick_state* pick) {
pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
bool PickFirst::PickLocked(PickState* pick) {
// If we have a selected subchannel already, return synchronously.
if (p->selected != nullptr) {
pick->connected_subchannel = p->selected->connected_subchannel;
return 1;
if (selected_ != nullptr) {
pick->connected_subchannel = selected_->connected_subchannel;
return true;
}
// No subchannel selected yet, so handle asynchronously.
if (!p->started_picking) {
start_picking_locked(p);
if (!started_picking_) {
StartPickingLocked();
}
pick->next = p->pending_picks;
p->pending_picks = pick;
return 0;
pick->next = pending_picks_;
pending_picks_ = pick;
return false;
}
static void destroy_unselected_subchannels_locked(pick_first_lb_policy* p) {
for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
grpc_lb_subchannel_data* sd = &p->subchannel_list->subchannels[i];
if (p->selected != sd) {
void PickFirst::DestroyUnselectedSubchannelsLocked() {
for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) {
grpc_lb_subchannel_data* sd = &subchannel_list_->subchannels[i];
if (selected_ != sd) {
grpc_lb_subchannel_data_unref_subchannel(sd,
"selected_different_subchannel");
}
}
}
static grpc_connectivity_state pf_check_connectivity_locked(
grpc_lb_policy* pol, grpc_error** error) {
pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
return grpc_connectivity_state_get(&p->state_tracker, error);
grpc_connectivity_state PickFirst::CheckConnectivityLocked(grpc_error** error) {
return grpc_connectivity_state_get(&state_tracker_, error);
}
static void pf_notify_on_state_change_locked(grpc_lb_policy* pol,
grpc_connectivity_state* current,
grpc_closure* notify) {
pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
grpc_closure* notify) {
grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
notify);
}
static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
grpc_closure* on_ack) {
pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
if (p->selected) {
p->selected->connected_subchannel->Ping(on_initiate, on_ack);
void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
if (selected_ != nullptr) {
selected_->connected_subchannel->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
@ -225,18 +256,31 @@ static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
}
}
static void pf_connectivity_changed_locked(void* arg, grpc_error* error);
void PickFirst::SubchannelListRefForConnectivityWatch(
grpc_lb_subchannel_list* subchannel_list, const char* reason) {
// TODO(roth): We currently track this ref manually. Once the new
// ClosureRef API is ready and the subchannel_list code has been
// converted to a C++ API, find a way to hold the RefCountedPtr<>
// somewhere (maybe in the subchannel_data object) instead of doing
// this manually.
auto self = Ref(DEBUG_LOCATION, reason);
self.release();
grpc_lb_subchannel_list_ref(subchannel_list, reason);
}
static void pf_update_locked(grpc_lb_policy* policy,
const grpc_lb_policy_args* args) {
pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(policy);
const grpc_arg* arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
void PickFirst::SubchannelListUnrefForConnectivityWatch(
grpc_lb_subchannel_list* subchannel_list, const char* reason) {
Unref(DEBUG_LOCATION, reason);
grpc_lb_subchannel_list_unref(subchannel_list, reason);
}
void PickFirst::UpdateLocked(const grpc_channel_args& args) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
if (p->subchannel_list == nullptr) {
if (subchannel_list_ == nullptr) {
// If we don't have a current subchannel list, go into TRANSIENT FAILURE.
grpc_connectivity_state_set(
&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
"pf_update_missing");
} else {
@ -244,77 +288,78 @@ static void pf_update_locked(grpc_lb_policy* policy,
gpr_log(GPR_ERROR,
"No valid LB addresses channel arg for Pick First %p update, "
"ignoring.",
(void*)p);
this);
}
return;
}
const grpc_lb_addresses* addresses =
static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
(const grpc_lb_addresses*)arg->value.pointer.p;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses",
(void*)p, static_cast<unsigned long>(addresses->num_addresses));
gpr_log(GPR_INFO,
"Pick First %p received update with %" PRIuPTR " addresses", this,
addresses->num_addresses);
}
grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create(
&p->base, &grpc_lb_pick_first_trace, addresses, args,
pf_connectivity_changed_locked);
this, &grpc_lb_pick_first_trace, addresses, combiner(),
client_channel_factory(), args, &PickFirst::OnConnectivityChangedLocked);
if (subchannel_list->num_subchannels == 0) {
// Empty update or no valid subchannels. Unsubscribe from all current
// subchannels and put the channel in TRANSIENT_FAILURE.
grpc_connectivity_state_set(
&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"pf_update_empty");
if (p->subchannel_list != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
if (subchannel_list_ != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
"sl_shutdown_empty_update");
}
p->subchannel_list = subchannel_list; // Empty list.
p->selected = nullptr;
subchannel_list_ = subchannel_list; // Empty list.
selected_ = nullptr;
return;
}
if (p->selected == nullptr) {
if (selected_ == nullptr) {
// We don't yet have a selected subchannel, so replace the current
// subchannel list immediately.
if (p->subchannel_list != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
if (subchannel_list_ != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
"pf_update_before_selected");
}
p->subchannel_list = subchannel_list;
subchannel_list_ = subchannel_list;
} else {
// We do have a selected subchannel.
// Check if it's present in the new list. If so, we're done.
for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i];
if (sd->subchannel == p->selected->subchannel) {
if (sd->subchannel == selected_->subchannel) {
// The currently selected subchannel is in the update: we are done.
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p found already selected subchannel %p "
"at update index %" PRIuPTR " of %" PRIuPTR "; update done",
p, p->selected->subchannel, i,
this, selected_->subchannel, i,
subchannel_list->num_subchannels);
}
if (p->selected->connected_subchannel != nullptr) {
sd->connected_subchannel = p->selected->connected_subchannel;
if (selected_->connected_subchannel != nullptr) {
sd->connected_subchannel = selected_->connected_subchannel;
}
p->selected = sd;
if (p->subchannel_list != nullptr) {
selected_ = sd;
if (subchannel_list_ != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(
p->subchannel_list, "pf_update_includes_selected");
subchannel_list_, "pf_update_includes_selected");
}
p->subchannel_list = subchannel_list;
destroy_unselected_subchannels_locked(p);
grpc_lb_subchannel_list_ref_for_connectivity_watch(
subchannel_list_ = subchannel_list;
DestroyUnselectedSubchannelsLocked();
SubchannelListRefForConnectivityWatch(
subchannel_list, "connectivity_watch+replace_selected");
grpc_lb_subchannel_data_start_connectivity_watch(sd);
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
// it, so that it doesn't override what we've done here.
if (p->latest_pending_subchannel_list != nullptr) {
if (latest_pending_subchannel_list_ != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(
p->latest_pending_subchannel_list,
latest_pending_subchannel_list_,
"pf_update_includes_selected+outdated");
p->latest_pending_subchannel_list = nullptr;
latest_pending_subchannel_list_ = nullptr;
}
return;
}
@ -323,84 +368,81 @@ static void pf_update_locked(grpc_lb_policy* policy,
// pending subchannel list to the new subchannel list. We will wait
// for it to report READY before swapping it into the current
// subchannel list.
if (p->latest_pending_subchannel_list != nullptr) {
if (latest_pending_subchannel_list_ != nullptr) {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_DEBUG,
"Pick First %p Shutting down latest pending subchannel list "
"%p, about to be replaced by newer latest %p",
(void*)p, (void*)p->latest_pending_subchannel_list,
(void*)subchannel_list);
this, latest_pending_subchannel_list_, subchannel_list);
}
grpc_lb_subchannel_list_shutdown_and_unref(
p->latest_pending_subchannel_list, "sl_outdated_dont_smash");
latest_pending_subchannel_list_, "sl_outdated_dont_smash");
}
p->latest_pending_subchannel_list = subchannel_list;
latest_pending_subchannel_list_ = subchannel_list;
}
// If we've started picking, start trying to connect to the first
// subchannel in the new list.
if (p->started_picking) {
grpc_lb_subchannel_list_ref_for_connectivity_watch(
subchannel_list, "connectivity_watch+update");
if (started_picking_) {
SubchannelListRefForConnectivityWatch(subchannel_list,
"connectivity_watch+update");
grpc_lb_subchannel_data_start_connectivity_watch(
&subchannel_list->subchannels[0]);
}
}
static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg);
pick_first_lb_policy* p =
reinterpret_cast<pick_first_lb_policy*>(sd->subchannel_list->policy);
void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
grpc_lb_subchannel_data* sd = reinterpret_cast<grpc_lb_subchannel_data*>(arg);
PickFirst* p = reinterpret_cast<PickFirst*>(sd->subchannel_list->policy);
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_DEBUG,
"Pick First %p connectivity changed for subchannel %p (%" PRIuPTR
" of %" PRIuPTR
"), subchannel_list %p: state=%s p->shutdown=%d "
"), subchannel_list %p: state=%s p->shutdown_=%d "
"sd->subchannel_list->shutting_down=%d error=%s",
(void*)p, (void*)sd->subchannel,
sd->subchannel_list->checking_subchannel,
sd->subchannel_list->num_subchannels, (void*)sd->subchannel_list,
p, sd->subchannel, sd->subchannel_list->checking_subchannel,
sd->subchannel_list->num_subchannels, sd->subchannel_list,
grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe),
p->shutdown, sd->subchannel_list->shutting_down,
p->shutdown_, sd->subchannel_list->shutting_down,
grpc_error_string(error));
}
// If the policy is shutting down, unref and return.
if (p->shutdown) {
if (p->shutdown_) {
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
grpc_lb_subchannel_data_unref_subchannel(sd, "pf_shutdown");
grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list,
"pf_shutdown");
p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
"pf_shutdown");
return;
}
// If the subchannel list is shutting down, stop watching.
if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
grpc_lb_subchannel_data_unref_subchannel(sd, "pf_sl_shutdown");
grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list,
"pf_sl_shutdown");
p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
"pf_sl_shutdown");
return;
}
// If we're still here, the notification must be for a subchannel in
// either the current or latest pending subchannel lists.
GPR_ASSERT(sd->subchannel_list == p->subchannel_list ||
sd->subchannel_list == p->latest_pending_subchannel_list);
GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ ||
sd->subchannel_list == p->latest_pending_subchannel_list_);
// Update state.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
// Handle updates for the currently selected subchannel.
if (p->selected == sd) {
if (p->selected_ == sd) {
// If the new state is anything other than READY and there is a
// pending update, switch to the pending update.
if (sd->curr_connectivity_state != GRPC_CHANNEL_READY &&
p->latest_pending_subchannel_list != nullptr) {
p->selected = nullptr;
p->latest_pending_subchannel_list_ != nullptr) {
p->selected_ = nullptr;
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
grpc_lb_subchannel_list_unref_for_connectivity_watch(
p->SubchannelListUnrefForConnectivityWatch(
sd->subchannel_list, "selected_not_ready+switch_to_update");
grpc_lb_subchannel_list_shutdown_and_unref(
p->subchannel_list, "selected_not_ready+switch_to_update");
p->subchannel_list = p->latest_pending_subchannel_list;
p->latest_pending_subchannel_list = nullptr;
p->subchannel_list_, "selected_not_ready+switch_to_update");
p->subchannel_list_ = p->latest_pending_subchannel_list_;
p->latest_pending_subchannel_list_ = nullptr;
grpc_connectivity_state_set(
&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update");
} else {
// TODO(juanlishen): we re-resolve when the selected subchannel goes to
@ -411,21 +453,20 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution.
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
"selected_changed+reresolve");
p->started_picking = false;
grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
GRPC_ERROR_NONE);
// in transient failure. Rely on re-resolution to recover.
p->selected = nullptr;
p->started_picking_ = false;
p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
// In transient failure. Rely on re-resolution to recover.
p->selected_ = nullptr;
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
grpc_lb_subchannel_list_unref_for_connectivity_watch(
sd->subchannel_list, "pf_selected_shutdown");
p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
"pf_selected_shutdown");
grpc_lb_subchannel_data_unref_subchannel(
sd, "pf_selected_shutdown"); // Unrefs connected subchannel
} else {
grpc_connectivity_state_set(&p->state_tracker,
grpc_connectivity_state_set(&p->state_tracker_,
sd->curr_connectivity_state,
GRPC_ERROR_REF(error), "selected_changed");
// Renew notification.
@ -436,45 +477,45 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
}
// If we get here, there are two possible cases:
// 1. We do not currently have a selected subchannel, and the update is
// for a subchannel in p->subchannel_list that we're trying to
// for a subchannel in p->subchannel_list_ that we're trying to
// connect to. The goal here is to find a subchannel that we can
// select.
// 2. We do currently have a selected subchannel, and the update is
// for a subchannel in p->latest_pending_subchannel_list. The
// for a subchannel in p->latest_pending_subchannel_list_. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
switch (sd->curr_connectivity_state) {
case GRPC_CHANNEL_READY: {
// Case 2. Promote p->latest_pending_subchannel_list to
// p->subchannel_list.
// Case 2. Promote p->latest_pending_subchannel_list_ to
// p->subchannel_list_.
sd->connected_subchannel =
grpc_subchannel_get_connected_subchannel(sd->subchannel);
if (sd->subchannel_list == p->latest_pending_subchannel_list) {
GPR_ASSERT(p->subchannel_list != nullptr);
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
if (sd->subchannel_list == p->latest_pending_subchannel_list_) {
GPR_ASSERT(p->subchannel_list_ != nullptr);
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_,
"finish_update");
p->subchannel_list = p->latest_pending_subchannel_list;
p->latest_pending_subchannel_list = nullptr;
p->subchannel_list_ = p->latest_pending_subchannel_list_;
p->latest_pending_subchannel_list_ = nullptr;
}
// Cases 1 and 2.
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "connecting_ready");
p->selected = sd;
p->selected_ = sd;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p,
(void*)sd->subchannel);
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p,
sd->subchannel);
}
// Drop all other subchannels, since we are now connected.
destroy_unselected_subchannels_locked(p);
p->DestroyUnselectedSubchannelsLocked();
// Update any calls that were waiting for a pick.
grpc_lb_policy_pick_state* pick;
while ((pick = p->pending_picks)) {
p->pending_picks = pick->next;
pick->connected_subchannel = p->selected->connected_subchannel;
PickState* pick;
while ((pick = p->pending_picks_)) {
p->pending_picks_ = pick->next;
pick->connected_subchannel = p->selected_->connected_subchannel;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p",
(void*)p->selected);
p->selected_);
}
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
@ -494,9 +535,9 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
if (sd->subchannel_list->checking_subchannel == 0 &&
sd->subchannel_list == p->subchannel_list) {
sd->subchannel_list == p->subchannel_list_) {
grpc_connectivity_state_set(
&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
&p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
}
// Reuses the connectivity refs from the previous watch.
@ -506,8 +547,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE: {
// Only update connectivity state in case 1.
if (sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING,
if (sd->subchannel_list == p->subchannel_list_) {
grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_REF(error),
"connecting_changed");
}
@ -520,51 +561,29 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
}
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy,
pf_shutdown_locked,
pf_pick_locked,
pf_cancel_pick_locked,
pf_cancel_picks_locked,
pf_ping_one_locked,
pf_exit_idle_locked,
pf_check_connectivity_locked,
pf_notify_on_state_change_locked,
pf_update_locked};
static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {}
static void pick_first_factory_unref(grpc_lb_policy_factory* factory) {}
static grpc_lb_policy* create_pick_first(grpc_lb_policy_factory* factory,
grpc_lb_policy_args* args) {
GPR_ASSERT(args->client_channel_factory != nullptr);
pick_first_lb_policy* p =
static_cast<pick_first_lb_policy*>(gpr_zalloc(sizeof(*p)));
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_DEBUG, "Pick First %p created.", (void*)p);
}
pf_update_locked(&p->base, args);
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
grpc_subchannel_index_ref();
return &p->base;
}
//
// factory
//
static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = {
pick_first_factory_ref, pick_first_factory_unref, create_pick_first,
"pick_first"};
class PickFirstFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const LoadBalancingPolicy::Args& args) const override {
return OrphanablePtr<LoadBalancingPolicy>(New<PickFirst>(args));
}
static grpc_lb_policy_factory pick_first_lb_policy_factory = {
&pick_first_factory_vtable};
const char* name() const override { return "pick_first"; }
};
static grpc_lb_policy_factory* pick_first_lb_factory_create() {
return &pick_first_lb_policy_factory;
}
} // namespace
/* Plugin registration */
} // namespace grpc_core
void grpc_lb_policy_pick_first_init() {
grpc_register_lb_policy(pick_first_lb_factory_create());
grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory(
grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
grpc_core::New<grpc_core::PickFirstFactory>()));
}
void grpc_lb_policy_pick_first_shutdown() {}

@ -40,34 +40,94 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
grpc_core::TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
typedef struct round_robin_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
grpc_lb_subchannel_list* subchannel_list;
namespace grpc_core {
TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
namespace {
//
// round_robin LB policy
//
class RoundRobin : public LoadBalancingPolicy {
public:
explicit RoundRobin(const Args& args);
void UpdateLocked(const grpc_channel_args& args) override;
bool PickLocked(PickState* pick) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) override;
void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
grpc_closure* closure) override;
grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override;
private:
~RoundRobin();
void ShutdownLocked() override;
void StartPickingLocked();
size_t GetNextReadySubchannelIndexLocked();
void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index);
void UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd,
grpc_error* error);
static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
void SubchannelListRefForConnectivityWatch(
grpc_lb_subchannel_list* subchannel_list, const char* reason);
void SubchannelListUnrefForConnectivityWatch(
grpc_lb_subchannel_list* subchannel_list, const char* reason);
/** list of subchannels */
grpc_lb_subchannel_list* subchannel_list_ = nullptr;
/** have we started picking? */
bool started_picking;
bool started_picking_ = false;
/** are we shutting down? */
bool shutdown;
bool shutdown_ = false;
/** List of picks that are waiting on connectivity */
grpc_lb_policy_pick_state* pending_picks;
PickState* pending_picks_ = nullptr;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker;
grpc_connectivity_state_tracker state_tracker_;
/** Index into subchannels for last pick. */
size_t last_ready_subchannel_index;
size_t last_ready_subchannel_index_ = 0;
/** Latest version of the subchannel list.
* Subchannel connectivity callbacks will only promote updated subchannel
* lists if they equal \a latest_pending_subchannel_list. In other words,
* racing callbacks that reference outdated subchannel lists won't perform any
* update. */
grpc_lb_subchannel_list* latest_pending_subchannel_list;
} round_robin_lb_policy;
grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr;
};
RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
GPR_ASSERT(args.client_channel_factory != nullptr);
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"round_robin");
UpdateLocked(*args.args);
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Created with %" PRIuPTR " subchannels", this,
subchannel_list_->num_subchannels);
}
grpc_subchannel_index_ref();
}
RoundRobin::~RoundRobin() {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy", this);
}
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
GPR_ASSERT(pending_picks_ == nullptr);
grpc_connectivity_state_destroy(&state_tracker_);
grpc_subchannel_index_unref();
}
/** Returns the index into p->subchannel_list->subchannels of the next
* subchannel in READY state, or p->subchannel_list->num_subchannels if no
@ -75,195 +135,190 @@ typedef struct round_robin_lb_policy {
*
* Note that this function does *not* update p->last_ready_subchannel_index.
* The caller must do that if it returns a pick. */
static size_t get_next_ready_subchannel_index_locked(
const round_robin_lb_policy* p) {
GPR_ASSERT(p->subchannel_list != nullptr);
size_t RoundRobin::GetNextReadySubchannelIndexLocked() {
GPR_ASSERT(subchannel_list_ != nullptr);
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO,
"[RR %p] getting next ready subchannel (out of %lu), "
"last_ready_subchannel_index=%lu",
(void*)p,
static_cast<unsigned long>(p->subchannel_list->num_subchannels),
static_cast<unsigned long>(p->last_ready_subchannel_index));
}
for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
const size_t index = (i + p->last_ready_subchannel_index + 1) %
p->subchannel_list->num_subchannels;
"[RR %p] getting next ready subchannel (out of %" PRIuPTR
"), "
"last_ready_subchannel_index=%" PRIuPTR,
this, subchannel_list_->num_subchannels,
last_ready_subchannel_index_);
}
for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) {
const size_t index = (i + last_ready_subchannel_index_ + 1) %
subchannel_list_->num_subchannels;
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(
GPR_DEBUG,
"[RR %p] checking subchannel %p, subchannel_list %p, index %lu: "
"state=%s",
(void*)p, (void*)p->subchannel_list->subchannels[index].subchannel,
(void*)p->subchannel_list, static_cast<unsigned long>(index),
"[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR
": state=%s",
this, subchannel_list_->subchannels[index].subchannel,
subchannel_list_, index,
grpc_connectivity_state_name(
p->subchannel_list->subchannels[index].curr_connectivity_state));
subchannel_list_->subchannels[index].curr_connectivity_state));
}
if (p->subchannel_list->subchannels[index].curr_connectivity_state ==
if (subchannel_list_->subchannels[index].curr_connectivity_state ==
GRPC_CHANNEL_READY) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[RR %p] found next ready subchannel (%p) at index %lu of "
"subchannel_list %p",
(void*)p,
(void*)p->subchannel_list->subchannels[index].subchannel,
static_cast<unsigned long>(index), (void*)p->subchannel_list);
"[RR %p] found next ready subchannel (%p) at index %" PRIuPTR
" of subchannel_list %p",
this, subchannel_list_->subchannels[index].subchannel, index,
subchannel_list_);
}
return index;
}
}
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", (void*)p);
gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", this);
}
return p->subchannel_list->num_subchannels;
return subchannel_list_->num_subchannels;
}
// Sets p->last_ready_subchannel_index to last_ready_index.
static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p,
size_t last_ready_index) {
GPR_ASSERT(last_ready_index < p->subchannel_list->num_subchannels);
p->last_ready_subchannel_index = last_ready_index;
// Sets last_ready_subchannel_index_ to last_ready_index.
void RoundRobin::UpdateLastReadySubchannelIndexLocked(size_t last_ready_index) {
GPR_ASSERT(last_ready_index < subchannel_list_->num_subchannels);
last_ready_subchannel_index_ = last_ready_index;
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)",
(void*)p, static_cast<unsigned long>(last_ready_index),
(void*)p->subchannel_list->subchannels[last_ready_index].subchannel,
(void*)p->subchannel_list->subchannels[last_ready_index]
"[RR %p] setting last_ready_subchannel_index=%" PRIuPTR
" (SC %p, CSC %p)",
this, last_ready_index,
subchannel_list_->subchannels[last_ready_index].subchannel,
subchannel_list_->subchannels[last_ready_index]
.connected_subchannel.get());
}
}
static void rr_destroy(grpc_lb_policy* pol) {
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p",
(void*)pol, (void*)pol);
void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
PickState* pick;
while ((pick = pending_picks_) != nullptr) {
pending_picks_ = pick->next;
if (new_policy->PickLocked(pick)) {
// Synchronous return, schedule closure.
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
}
GPR_ASSERT(p->subchannel_list == nullptr);
GPR_ASSERT(p->latest_pending_subchannel_list == nullptr);
grpc_connectivity_state_destroy(&p->state_tracker);
grpc_subchannel_index_unref();
gpr_free(p);
}
static void rr_shutdown_locked(grpc_lb_policy* pol,
grpc_lb_policy* new_policy) {
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
void RoundRobin::ShutdownLocked() {
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p);
}
p->shutdown = true;
grpc_lb_policy_pick_state* pick;
while ((pick = p->pending_picks) != nullptr) {
p->pending_picks = pick->next;
if (new_policy != nullptr) {
// Hand off to new LB policy.
if (grpc_lb_policy_pick_locked(new_policy, pick)) {
// Synchronous return; schedule callback.
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
} else {
pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
}
gpr_log(GPR_DEBUG, "[RR %p] Shutting down", this);
}
shutdown_ = true;
PickState* pick;
while ((pick = pending_picks_) != nullptr) {
pending_picks_ = pick->next;
pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
}
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "rr_shutdown");
if (p->subchannel_list != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
if (subchannel_list_ != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
"sl_shutdown_rr_shutdown");
p->subchannel_list = nullptr;
subchannel_list_ = nullptr;
}
if (p->latest_pending_subchannel_list != nullptr) {
if (latest_pending_subchannel_list_ != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(
p->latest_pending_subchannel_list, "sl_shutdown_pending_rr_shutdown");
p->latest_pending_subchannel_list = nullptr;
latest_pending_subchannel_list_, "sl_shutdown_pending_rr_shutdown");
latest_pending_subchannel_list_ = nullptr;
}
grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace,
GRPC_ERROR_CANCELLED);
TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
static void rr_cancel_pick_locked(grpc_lb_policy* pol,
grpc_lb_policy_pick_state* pick,
grpc_error* error) {
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
grpc_lb_policy_pick_state* pp = p->pending_picks;
p->pending_picks = nullptr;
void RoundRobin::CancelPickLocked(PickState* pick, grpc_error* error) {
PickState* pp = pending_picks_;
pending_picks_ = nullptr;
while (pp != nullptr) {
grpc_lb_policy_pick_state* next = pp->next;
PickState* next = pp->next;
if (pp == pick) {
pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
"Pick Cancelled", &error, 1));
} else {
pp->next = p->pending_picks;
p->pending_picks = pp;
pp->next = pending_picks_;
pending_picks_ = pp;
}
pp = next;
}
GRPC_ERROR_UNREF(error);
}
static void rr_cancel_picks_locked(grpc_lb_policy* pol,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) {
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
grpc_lb_policy_pick_state* pick = p->pending_picks;
p->pending_picks = nullptr;
void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error* error) {
PickState* pick = pending_picks_;
pending_picks_ = nullptr;
while (pick != nullptr) {
grpc_lb_policy_pick_state* next = pick->next;
PickState* next = pick->next;
if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
"Pick Cancelled", &error, 1));
} else {
pick->next = p->pending_picks;
p->pending_picks = pick;
pick->next = pending_picks_;
pending_picks_ = pick;
}
pick = next;
}
GRPC_ERROR_UNREF(error);
}
static void start_picking_locked(round_robin_lb_policy* p) {
p->started_picking = true;
for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
if (p->subchannel_list->subchannels[i].subchannel != nullptr) {
grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list,
"connectivity_watch");
void RoundRobin::SubchannelListRefForConnectivityWatch(
grpc_lb_subchannel_list* subchannel_list, const char* reason) {
// TODO(roth): We currently track this ref manually. Once the new
// ClosureRef API is ready and the subchannel_list code has been
// converted to a C++ API, find a way to hold the RefCountedPtr<>
// somewhere (maybe in the subchannel_data object) instead of doing
// this manually.
auto self = Ref(DEBUG_LOCATION, reason);
self.release();
grpc_lb_subchannel_list_ref(subchannel_list, reason);
}
void RoundRobin::SubchannelListUnrefForConnectivityWatch(
grpc_lb_subchannel_list* subchannel_list, const char* reason) {
Unref(DEBUG_LOCATION, reason);
grpc_lb_subchannel_list_unref(subchannel_list, reason);
}
void RoundRobin::StartPickingLocked() {
started_picking_ = true;
for (size_t i = 0; i < subchannel_list_->num_subchannels; i++) {
if (subchannel_list_->subchannels[i].subchannel != nullptr) {
SubchannelListRefForConnectivityWatch(subchannel_list_,
"connectivity_watch");
grpc_lb_subchannel_data_start_connectivity_watch(
&p->subchannel_list->subchannels[i]);
&subchannel_list_->subchannels[i]);
}
}
}
static void rr_exit_idle_locked(grpc_lb_policy* pol) {
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
if (!p->started_picking) {
start_picking_locked(p);
void RoundRobin::ExitIdleLocked() {
if (!started_picking_) {
StartPickingLocked();
}
}
static int rr_pick_locked(grpc_lb_policy* pol,
grpc_lb_policy_pick_state* pick) {
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
bool RoundRobin::PickLocked(PickState* pick) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", pol,
p->shutdown);
gpr_log(GPR_DEBUG, "[RR %p] Trying to pick (shutdown: %d)", this,
shutdown_);
}
GPR_ASSERT(!p->shutdown);
if (p->subchannel_list != nullptr) {
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
if (next_ready_index < p->subchannel_list->num_subchannels) {
GPR_ASSERT(!shutdown_);
if (subchannel_list_ != nullptr) {
const size_t next_ready_index = GetNextReadySubchannelIndexLocked();
if (next_ready_index < subchannel_list_->num_subchannels) {
/* readily available, report right away */
grpc_lb_subchannel_data* sd =
&p->subchannel_list->subchannels[next_ready_index];
&subchannel_list_->subchannels[next_ready_index];
pick->connected_subchannel = sd->connected_subchannel;
if (pick->user_data != nullptr) {
*pick->user_data = sd->user_data;
@ -273,24 +328,24 @@ static int rr_pick_locked(grpc_lb_policy* pol,
GPR_DEBUG,
"[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
"index %" PRIuPTR ")",
p, sd->subchannel, pick->connected_subchannel.get(),
this, sd->subchannel, pick->connected_subchannel.get(),
sd->subchannel_list, next_ready_index);
}
/* only advance the last picked pointer if the selection was used */
update_last_ready_subchannel_index_locked(p, next_ready_index);
return 1;
UpdateLastReadySubchannelIndexLocked(next_ready_index);
return true;
}
}
/* no pick currently available. Save for later in list of pending picks */
if (!p->started_picking) {
start_picking_locked(p);
if (!started_picking_) {
StartPickingLocked();
}
pick->next = p->pending_picks;
p->pending_picks = pick;
return 0;
pick->next = pending_picks_;
pending_picks_ = pick;
return false;
}
static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
void UpdateStateCountersLocked(grpc_lb_subchannel_data* sd) {
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
@ -318,8 +373,8 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
* (the grpc_lb_subchannel_data associated with the updated subchannel) and the
* subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used
* only if the policy transitions to state TRANSIENT_FAILURE. */
static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd,
grpc_error* error) {
void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd,
grpc_error* error) {
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
*
@ -335,64 +390,61 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd,
* subchannel_list->num_subchannels.
*/
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
round_robin_lb_policy* p =
reinterpret_cast<round_robin_lb_policy*>(subchannel_list->policy);
GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE);
if (subchannel_list->num_ready > 0) {
/* 1) READY */
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) {
/* 2) CONNECTING */
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING,
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "rr_connecting");
} else if (subchannel_list->num_transient_failures ==
subchannel_list->num_subchannels) {
/* 3) TRANSIENT_FAILURE */
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_transient_failure");
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error),
"rr_exhausted_subchannels");
}
GRPC_ERROR_UNREF(error);
}
static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg);
round_robin_lb_policy* p =
reinterpret_cast<round_robin_lb_policy*>(sd->subchannel_list->policy);
void RoundRobin::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
grpc_lb_subchannel_data* sd = reinterpret_cast<grpc_lb_subchannel_data*>(arg);
RoundRobin* p = reinterpret_cast<RoundRobin*>(sd->subchannel_list->policy);
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(
GPR_DEBUG,
"[RR %p] connectivity changed for subchannel %p, subchannel_list %p: "
"prev_state=%s new_state=%s p->shutdown=%d "
"sd->subchannel_list->shutting_down=%d error=%s",
(void*)p, (void*)sd->subchannel, (void*)sd->subchannel_list,
p, sd->subchannel, sd->subchannel_list,
grpc_connectivity_state_name(sd->prev_connectivity_state),
grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe),
p->shutdown, sd->subchannel_list->shutting_down,
p->shutdown_, sd->subchannel_list->shutting_down,
grpc_error_string(error));
}
GPR_ASSERT(sd->subchannel != nullptr);
// If the policy is shutting down, unref and return.
if (p->shutdown) {
if (p->shutdown_) {
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
grpc_lb_subchannel_data_unref_subchannel(sd, "rr_shutdown");
grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list,
"rr_shutdown");
p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
"rr_shutdown");
return;
}
// If the subchannel list is shutting down, stop watching.
if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
grpc_lb_subchannel_data_unref_subchannel(sd, "rr_sl_shutdown");
grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list,
"rr_sl_shutdown");
p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
"rr_sl_shutdown");
return;
}
// If we're still here, the notification must be for a subchannel in
// either the current or latest pending subchannel lists.
GPR_ASSERT(sd->subchannel_list == p->subchannel_list ||
sd->subchannel_list == p->latest_pending_subchannel_list);
GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ ||
sd->subchannel_list == p->latest_pending_subchannel_list_);
GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN);
// Now that we're inside the combiner, copy the pending connectivity
// state (which was set by the connectivity state watcher) to
@ -409,8 +461,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
"Requesting re-resolution",
p, sd->subchannel);
}
grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace,
GRPC_ERROR_NONE);
p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE);
break;
}
case GRPC_CHANNEL_READY: {
@ -418,49 +469,47 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
sd->connected_subchannel =
grpc_subchannel_get_connected_subchannel(sd->subchannel);
}
if (sd->subchannel_list != p->subchannel_list) {
// promote sd->subchannel_list to p->subchannel_list.
if (sd->subchannel_list != p->subchannel_list_) {
// promote sd->subchannel_list to p->subchannel_list_.
// sd->subchannel_list must be equal to
// p->latest_pending_subchannel_list because we have already filtered
// p->latest_pending_subchannel_list_ because we have already filtered
// for sds belonging to outdated subchannel lists.
GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list);
GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list_);
GPR_ASSERT(!sd->subchannel_list->shutting_down);
if (grpc_lb_round_robin_trace.enabled()) {
const unsigned long num_subchannels =
p->subchannel_list != nullptr
? static_cast<unsigned long>(
p->subchannel_list->num_subchannels)
const size_t num_subchannels =
p->subchannel_list_ != nullptr
? p->subchannel_list_->num_subchannels
: 0;
gpr_log(GPR_DEBUG,
"[RR %p] phasing out subchannel list %p (size %lu) in favor "
"of %p (size %lu)",
p, p->subchannel_list, num_subchannels, sd->subchannel_list,
"[RR %p] phasing out subchannel list %p (size %" PRIuPTR
") in favor of %p (size %" PRIuPTR ")",
p, p->subchannel_list_, num_subchannels, sd->subchannel_list,
num_subchannels);
}
if (p->subchannel_list != nullptr) {
if (p->subchannel_list_ != nullptr) {
// dispose of the current subchannel_list
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_,
"sl_phase_out_shutdown");
}
p->subchannel_list = p->latest_pending_subchannel_list;
p->latest_pending_subchannel_list = nullptr;
p->subchannel_list_ = p->latest_pending_subchannel_list_;
p->latest_pending_subchannel_list_ = nullptr;
}
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemptively replicates rr_pick()'s actions.
*/
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels);
* p->pending_picks. This preemptively replicates rr_pick()'s actions. */
const size_t next_ready_index = p->GetNextReadySubchannelIndexLocked();
GPR_ASSERT(next_ready_index < p->subchannel_list_->num_subchannels);
grpc_lb_subchannel_data* selected =
&p->subchannel_list->subchannels[next_ready_index];
if (p->pending_picks != nullptr) {
&p->subchannel_list_->subchannels[next_ready_index];
if (p->pending_picks_ != nullptr) {
// if the selected subchannel is going to be used for the pending
// picks, update the last picked pointer
update_last_ready_subchannel_index_locked(p, next_ready_index);
p->UpdateLastReadySubchannelIndexLocked(next_ready_index);
}
grpc_lb_policy_pick_state* pick;
while ((pick = p->pending_picks)) {
p->pending_picks = pick->next;
PickState* pick;
while ((pick = p->pending_picks_)) {
p->pending_picks_ = pick->next;
pick->connected_subchannel = selected->connected_subchannel;
if (pick->user_data != nullptr) {
*pick->user_data = selected->user_data;
@ -468,10 +517,9 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[RR %p] Fulfilling pending pick. Target <-- subchannel %p "
"(subchannel_list %p, index %lu)",
(void*)p, (void*)selected->subchannel,
(void*)p->subchannel_list,
static_cast<unsigned long>(next_ready_index));
"(subchannel_list %p, index %" PRIuPTR ")",
p, selected->subchannel, p->subchannel_list_,
next_ready_index);
}
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
@ -482,40 +530,34 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:; // fallthrough
}
// Update state counters and new overall state.
update_state_counters_locked(sd);
// Update state counters.
UpdateStateCountersLocked(sd);
// Only update connectivity based on the selected subchannel list.
if (sd->subchannel_list == p->subchannel_list) {
update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
if (sd->subchannel_list == p->subchannel_list_) {
p->UpdateConnectivityStatusLocked(sd, GRPC_ERROR_REF(error));
}
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
static grpc_connectivity_state rr_check_connectivity_locked(
grpc_lb_policy* pol, grpc_error** error) {
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
return grpc_connectivity_state_get(&p->state_tracker, error);
grpc_connectivity_state RoundRobin::CheckConnectivityLocked(
grpc_error** error) {
return grpc_connectivity_state_get(&state_tracker_, error);
}
static void rr_notify_on_state_change_locked(grpc_lb_policy* pol,
grpc_connectivity_state* current,
grpc_closure* notify) {
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
grpc_closure* notify) {
grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
notify);
}
static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
void RoundRobin::PingOneLocked(grpc_closure* on_initiate,
grpc_closure* on_ack) {
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
if (next_ready_index < p->subchannel_list->num_subchannels) {
const size_t next_ready_index = GetNextReadySubchannelIndexLocked();
if (next_ready_index < subchannel_list_->num_subchannels) {
grpc_lb_subchannel_data* selected =
&p->subchannel_list->subchannels[next_ready_index];
grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> target =
selected->connected_subchannel;
target->Ping(on_initiate, on_ack);
&subchannel_list_->subchannels[next_ready_index];
selected->connected_subchannel->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Round Robin not connected"));
@ -524,45 +566,41 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
}
}
static void rr_update_locked(grpc_lb_policy* policy,
const grpc_lb_policy_args* args) {
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(policy);
const grpc_arg* arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", p);
gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this);
// If we don't have a current subchannel list, go into TRANSIENT_FAILURE.
// Otherwise, keep using the current subchannel list (ignore this update).
if (p->subchannel_list == nullptr) {
if (subchannel_list_ == nullptr) {
grpc_connectivity_state_set(
&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
"rr_update_missing");
}
return;
}
grpc_lb_addresses* addresses =
static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
grpc_lb_addresses* addresses = (grpc_lb_addresses*)arg->value.pointer.p;
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", p,
addresses->num_addresses);
gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses",
this, addresses->num_addresses);
}
grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create(
&p->base, &grpc_lb_round_robin_trace, addresses, args,
rr_connectivity_changed_locked);
this, &grpc_lb_round_robin_trace, addresses, combiner(),
client_channel_factory(), args, &RoundRobin::OnConnectivityChangedLocked);
if (subchannel_list->num_subchannels == 0) {
grpc_connectivity_state_set(
&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"rr_update_empty");
if (p->subchannel_list != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
if (subchannel_list_ != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
"sl_shutdown_empty_update");
}
p->subchannel_list = subchannel_list; // empty list
subchannel_list_ = subchannel_list; // empty list
return;
}
if (p->started_picking) {
if (started_picking_) {
for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
const grpc_connectivity_state subchannel_state =
grpc_subchannel_check_connectivity(
@ -587,87 +625,61 @@ static void rr_update_locked(grpc_lb_policy* policy,
++subchannel_list->num_transient_failures;
}
}
if (p->latest_pending_subchannel_list != nullptr) {
if (latest_pending_subchannel_list_ != nullptr) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[RR %p] Shutting down latest pending subchannel list %p, "
"about to be replaced by newer latest %p",
(void*)p, (void*)p->latest_pending_subchannel_list,
(void*)subchannel_list);
this, latest_pending_subchannel_list_, subchannel_list);
}
grpc_lb_subchannel_list_shutdown_and_unref(
p->latest_pending_subchannel_list, "sl_outdated");
latest_pending_subchannel_list_, "sl_outdated");
}
p->latest_pending_subchannel_list = subchannel_list;
latest_pending_subchannel_list_ = subchannel_list;
for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
/* Watch every new subchannel. A subchannel list becomes active the
* moment one of its subchannels is READY. At that moment, we swap
* p->subchannel_list for sd->subchannel_list, provided the subchannel
* list is still valid (ie, isn't shutting down) */
grpc_lb_subchannel_list_ref_for_connectivity_watch(subchannel_list,
"connectivity_watch");
SubchannelListRefForConnectivityWatch(subchannel_list,
"connectivity_watch");
grpc_lb_subchannel_data_start_connectivity_watch(
&subchannel_list->subchannels[i]);
}
} else {
// The policy isn't picking yet. Save the update for later, disposing of
// previous version if any.
if (p->subchannel_list != nullptr) {
if (subchannel_list_ != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(
p->subchannel_list, "rr_update_before_started_picking");
subchannel_list_, "rr_update_before_started_picking");
}
p->subchannel_list = subchannel_list;
subchannel_list_ = subchannel_list;
}
}
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy,
rr_shutdown_locked,
rr_pick_locked,
rr_cancel_pick_locked,
rr_cancel_picks_locked,
rr_ping_one_locked,
rr_exit_idle_locked,
rr_check_connectivity_locked,
rr_notify_on_state_change_locked,
rr_update_locked};
static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {}
static void round_robin_factory_unref(grpc_lb_policy_factory* factory) {}
static grpc_lb_policy* round_robin_create(grpc_lb_policy_factory* factory,
grpc_lb_policy_args* args) {
GPR_ASSERT(args->client_channel_factory != nullptr);
round_robin_lb_policy* p =
static_cast<round_robin_lb_policy*>(gpr_zalloc(sizeof(*p)));
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner);
grpc_subchannel_index_ref();
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
rr_update_locked(&p->base, args);
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Created with %lu subchannels", (void*)p,
static_cast<unsigned long>(p->subchannel_list->num_subchannels));
}
return &p->base;
}
//
// factory
//
static const grpc_lb_policy_factory_vtable round_robin_factory_vtable = {
round_robin_factory_ref, round_robin_factory_unref, round_robin_create,
"round_robin"};
class RoundRobinFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const LoadBalancingPolicy::Args& args) const override {
return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(args));
}
static grpc_lb_policy_factory round_robin_lb_policy_factory = {
&round_robin_factory_vtable};
const char* name() const override { return "round_robin"; }
};
static grpc_lb_policy_factory* round_robin_lb_factory_create() {
return &round_robin_lb_policy_factory;
}
} // namespace
/* Plugin registration */
} // namespace grpc_core
void grpc_lb_policy_round_robin_init() {
grpc_register_lb_policy(round_robin_lb_factory_create());
grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory(
grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
grpc_core::New<grpc_core::RoundRobinFactory>()));
}
void grpc_lb_policy_round_robin_shutdown() {}

@ -67,7 +67,7 @@ void grpc_lb_subchannel_data_start_connectivity_watch(
}
sd->connectivity_notification_pending = true;
grpc_subchannel_notify_on_state_change(
sd->subchannel, sd->subchannel_list->policy->interested_parties,
sd->subchannel, sd->subchannel_list->policy->interested_parties(),
&sd->pending_connectivity_state_unsafe,
&sd->connectivity_changed_closure);
}
@ -88,9 +88,10 @@ void grpc_lb_subchannel_data_stop_connectivity_watch(
}
grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
grpc_lb_policy* p, grpc_core::TraceFlag* tracer,
const grpc_lb_addresses* addresses, const grpc_lb_policy_args* args,
grpc_iomgr_cb_func connectivity_changed_cb) {
grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer,
const grpc_lb_addresses* addresses, grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb) {
grpc_lb_subchannel_list* subchannel_list =
static_cast<grpc_lb_subchannel_list*>(
gpr_zalloc(sizeof(*subchannel_list)));
@ -118,12 +119,11 @@ grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
grpc_arg addr_arg =
grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg,
1);
&args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1);
gpr_free(addr_arg.value.string);
sc_args.args = new_args;
grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
args->client_channel_factory, &sc_args);
client_channel_factory, &sc_args);
grpc_channel_args_destroy(new_args);
if (subchannel == nullptr) {
// Subchannel could not be created.
@ -154,7 +154,7 @@ grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
sd->subchannel = subchannel;
GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure,
connectivity_changed_cb, sd,
grpc_combiner_scheduler(args->combiner));
grpc_combiner_scheduler(combiner));
// We assume that the current state is IDLE. If not, we'll get a
// callback telling us that.
sd->prev_connectivity_state = GRPC_CHANNEL_IDLE;
@ -212,18 +212,6 @@ void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list,
}
}
void grpc_lb_subchannel_list_ref_for_connectivity_watch(
grpc_lb_subchannel_list* subchannel_list, const char* reason) {
GRPC_LB_POLICY_REF(subchannel_list->policy, reason);
grpc_lb_subchannel_list_ref(subchannel_list, reason);
}
void grpc_lb_subchannel_list_unref_for_connectivity_watch(
grpc_lb_subchannel_list* subchannel_list, const char* reason) {
GRPC_LB_POLICY_UNREF(subchannel_list->policy, reason);
grpc_lb_subchannel_list_unref(subchannel_list, reason);
}
static void subchannel_data_cancel_connectivity_watch(
grpc_lb_subchannel_data* sd, const char* reason) {
if (sd->subchannel_list->tracer->enabled()) {

@ -82,7 +82,7 @@ void grpc_lb_subchannel_data_stop_connectivity_watch(
struct grpc_lb_subchannel_list {
/** backpointer to owning policy */
grpc_lb_policy* policy;
grpc_core::LoadBalancingPolicy* policy;
grpc_core::TraceFlag* tracer;
@ -115,9 +115,10 @@ struct grpc_lb_subchannel_list {
};
grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
grpc_lb_policy* p, grpc_core::TraceFlag* tracer,
const grpc_lb_addresses* addresses, const grpc_lb_policy_args* args,
grpc_iomgr_cb_func connectivity_changed_cb);
grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer,
const grpc_lb_addresses* addresses, grpc_combiner* combiner,
grpc_client_channel_factory* client_channel_factory,
const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb);
void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list,
const char* reason);
@ -125,13 +126,6 @@ void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list,
void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list,
const char* reason);
/// Takes and releases refs needed for a connectivity notification.
/// This includes a ref to subchannel_list and a weak ref to the LB policy.
void grpc_lb_subchannel_list_ref_for_connectivity_watch(
grpc_lb_subchannel_list* subchannel_list, const char* reason);
void grpc_lb_subchannel_list_unref_for_connectivity_watch(
grpc_lb_subchannel_list* subchannel_list, const char* reason);
/// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The
/// connectivity state notification callback will ultimately unref it.
void grpc_lb_subchannel_list_shutdown_and_unref(

@ -151,17 +151,3 @@ grpc_lb_addresses* grpc_lb_addresses_find_channel_arg(
return nullptr;
return static_cast<grpc_lb_addresses*>(lb_addresses_arg->value.pointer.p);
}
void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) {
factory->vtable->ref(factory);
}
void grpc_lb_policy_factory_unref(grpc_lb_policy_factory* factory) {
factory->vtable->unref(factory);
}
grpc_lb_policy* grpc_lb_policy_factory_create_lb_policy(
grpc_lb_policy_factory* factory, grpc_lb_policy_args* args) {
if (factory == nullptr) return nullptr;
return factory->vtable->create_lb_policy(factory, args);
}

@ -26,21 +26,20 @@
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/uri_parser.h"
//
// representation of an LB address
//
// Channel arg key for grpc_lb_addresses.
#define GRPC_ARG_LB_ADDRESSES "grpc.lb_addresses"
typedef struct grpc_lb_policy_factory grpc_lb_policy_factory;
typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable;
struct grpc_lb_policy_factory {
const grpc_lb_policy_factory_vtable* vtable;
};
/** A resolved address alongside any LB related information associated with it.
* \a user_data, if not NULL, contains opaque data meant to be consumed by the
* gRPC LB policy. Note that no all LB policies support \a user_data as input.
* Those who don't will simply ignore it and will correspondingly return NULL in
* their namesake pick() output argument. */
// TODO(roth): Once we figure out a better way of handling user_data in
// LB policies, convert these structs to C++ classes.
typedef struct grpc_lb_address {
grpc_resolved_address address;
bool is_balancer;
@ -101,30 +100,27 @@ grpc_arg grpc_lb_addresses_create_channel_arg(
grpc_lb_addresses* grpc_lb_addresses_find_channel_arg(
const grpc_channel_args* channel_args);
/** Arguments passed to LB policies. */
struct grpc_lb_policy_args {
grpc_client_channel_factory* client_channel_factory;
grpc_channel_args* args;
grpc_combiner* combiner;
};
//
// LB policy factory
//
struct grpc_lb_policy_factory_vtable {
void (*ref)(grpc_lb_policy_factory* factory);
void (*unref)(grpc_lb_policy_factory* factory);
namespace grpc_core {
/** Implementation of grpc_lb_policy_factory_create_lb_policy */
grpc_lb_policy* (*create_lb_policy)(grpc_lb_policy_factory* factory,
grpc_lb_policy_args* args);
class LoadBalancingPolicyFactory {
public:
/// Returns a new LB policy instance.
virtual OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const LoadBalancingPolicy::Args& args) const GRPC_ABSTRACT;
/** Name for the LB policy this factory implements */
const char* name;
};
/// Returns the LB policy name that this factory provides.
/// Caller does NOT take ownership of result.
virtual const char* name() const GRPC_ABSTRACT;
void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory);
void grpc_lb_policy_factory_unref(grpc_lb_policy_factory* factory);
virtual ~LoadBalancingPolicyFactory() {}
GRPC_ABSTRACT_BASE_CLASS
};
/** Create a lb_policy instance. */
grpc_lb_policy* grpc_lb_policy_factory_create_lb_policy(
grpc_lb_policy_factory* factory, grpc_lb_policy_args* args);
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_FACTORY_H */

@ -21,50 +21,75 @@
#include <string.h>
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#define MAX_POLICIES 10
namespace grpc_core {
static grpc_lb_policy_factory* g_all_of_the_lb_policies[MAX_POLICIES];
static int g_number_of_lb_policies = 0;
namespace {
void grpc_lb_policy_registry_init(void) { g_number_of_lb_policies = 0; }
class RegistryState {
public:
RegistryState() {}
void grpc_lb_policy_registry_shutdown(void) {
int i;
for (i = 0; i < g_number_of_lb_policies; i++) {
grpc_lb_policy_factory_unref(g_all_of_the_lb_policies[i]);
void RegisterLoadBalancingPolicyFactory(
UniquePtr<LoadBalancingPolicyFactory> factory) {
for (size_t i = 0; i < factories_.size(); ++i) {
GPR_ASSERT(strcmp(factories_[i]->name(), factory->name()) != 0);
}
factories_.push_back(std::move(factory));
}
}
void grpc_register_lb_policy(grpc_lb_policy_factory* factory) {
int i;
for (i = 0; i < g_number_of_lb_policies; i++) {
GPR_ASSERT(0 != gpr_stricmp(factory->vtable->name,
g_all_of_the_lb_policies[i]->vtable->name));
LoadBalancingPolicyFactory* GetLoadBalancingPolicyFactory(
const char* name) const {
for (size_t i = 0; i < factories_.size(); ++i) {
if (strcmp(name, factories_[i]->name()) == 0) {
return factories_[i].get();
}
}
return nullptr;
}
GPR_ASSERT(g_number_of_lb_policies != MAX_POLICIES);
grpc_lb_policy_factory_ref(factory);
g_all_of_the_lb_policies[g_number_of_lb_policies++] = factory;
}
static grpc_lb_policy_factory* lookup_factory(const char* name) {
int i;
private:
InlinedVector<UniquePtr<LoadBalancingPolicyFactory>, 10> factories_;
};
if (name == nullptr) return nullptr;
RegistryState* g_state = nullptr;
for (i = 0; i < g_number_of_lb_policies; i++) {
if (0 == gpr_stricmp(name, g_all_of_the_lb_policies[i]->vtable->name)) {
return g_all_of_the_lb_policies[i];
}
}
} // namespace
//
// LoadBalancingPolicyRegistry::Builder
//
return nullptr;
void LoadBalancingPolicyRegistry::Builder::InitRegistry() {
if (g_state == nullptr) g_state = New<RegistryState>();
}
grpc_lb_policy* grpc_lb_policy_create(const char* name,
grpc_lb_policy_args* args) {
grpc_lb_policy_factory* factory = lookup_factory(name);
grpc_lb_policy* lb_policy =
grpc_lb_policy_factory_create_lb_policy(factory, args);
return lb_policy;
void LoadBalancingPolicyRegistry::Builder::ShutdownRegistry() {
Delete(g_state);
g_state = nullptr;
}
void LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
UniquePtr<LoadBalancingPolicyFactory> factory) {
InitRegistry();
g_state->RegisterLoadBalancingPolicyFactory(std::move(factory));
}
//
// LoadBalancingPolicyRegistry
//
OrphanablePtr<LoadBalancingPolicy>
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
const char* name, const LoadBalancingPolicy::Args& args) {
GPR_ASSERT(g_state != nullptr);
// Find factory.
LoadBalancingPolicyFactory* factory =
g_state->GetLoadBalancingPolicyFactory(name);
if (factory == nullptr) return nullptr; // Specified name not found.
// Create policy via factory.
return factory->CreateLoadBalancingPolicy(args);
}
} // namespace grpc_core

@ -20,21 +20,34 @@
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/exec_ctx.h"
/** Initialize the registry and set \a default_factory as the factory to be
* returned when no name is provided in a lookup */
void grpc_lb_policy_registry_init(void);
void grpc_lb_policy_registry_shutdown(void);
namespace grpc_core {
/** Register a LB policy factory. */
void grpc_register_lb_policy(grpc_lb_policy_factory* factory);
class LoadBalancingPolicyRegistry {
public:
/// Methods used to create and populate the LoadBalancingPolicyRegistry.
/// NOT THREAD SAFE -- to be used only during global gRPC
/// initialization and shutdown.
class Builder {
public:
/// Global initialization and shutdown hooks.
static void InitRegistry();
static void ShutdownRegistry();
/** Create a \a grpc_lb_policy instance.
*
* If \a name is NULL, the default factory from \a grpc_lb_policy_registry_init
* will be returned. */
grpc_lb_policy* grpc_lb_policy_create(const char* name,
grpc_lb_policy_args* args);
/// Registers an LB policy factory. The factory will be used to create an
/// LB policy whose name matches that of the factory.
static void RegisterLoadBalancingPolicyFactory(
UniquePtr<LoadBalancingPolicyFactory> factory);
};
/// Creates an LB policy of the type specified by \a name.
static OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const char* name, const LoadBalancingPolicy::Args& args);
};
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H */

@ -885,7 +885,6 @@ src/core/ext/filters/client_channel/lb_policy.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc \

@ -9002,7 +9002,6 @@
],
"headers": [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
@ -9015,7 +9014,6 @@
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc",
@ -9039,7 +9037,6 @@
],
"headers": [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
@ -9052,7 +9049,6 @@
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc",

Loading…
Cancel
Save