LB policy ctors no longer perform updates; UpdateLocked() must be called

after construction.
reviewable/pr18096/r4
Mark D. Roth 6 years ago
parent aa149fedbb
commit 624fb64f61
  1. 22
      src/core/ext/filters/client_channel/lb_policy.cc
  2. 20
      src/core/ext/filters/client_channel/lb_policy.h
  3. 138
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  4. 48
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  5. 45
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  6. 102
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  7. 32
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  8. 8
      src/core/ext/filters/client_channel/resolving_lb_policy.h
  9. 1
      test/core/util/test_lb_policies.cc

@ -28,6 +28,17 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount(
namespace grpc_core {
LoadBalancingPolicy::LoadBalancingPolicy(Args args, intptr_t initial_refcount)
: InternallyRefCounted(&grpc_trace_lb_policy_refcount, initial_refcount),
combiner_(GRPC_COMBINER_REF(args.combiner, "lb_policy")),
interested_parties_(grpc_pollset_set_create()),
channel_control_helper_(std::move(args.channel_control_helper)) {}
LoadBalancingPolicy::~LoadBalancingPolicy() {
grpc_pollset_set_destroy(interested_parties_);
GRPC_COMBINER_UNREF(combiner_, "lb_policy");
}
grpc_json* LoadBalancingPolicy::ParseLoadBalancingConfig(
const grpc_json* lb_config_array) {
if (lb_config_array == nullptr || lb_config_array->type != GRPC_JSON_ARRAY) {
@ -54,15 +65,4 @@ grpc_json* LoadBalancingPolicy::ParseLoadBalancingConfig(
return nullptr;
}
LoadBalancingPolicy::LoadBalancingPolicy(Args args, intptr_t initial_refcount)
: InternallyRefCounted(&grpc_trace_lb_policy_refcount, initial_refcount),
combiner_(GRPC_COMBINER_REF(args.combiner, "lb_policy")),
interested_parties_(grpc_pollset_set_create()),
channel_control_helper_(std::move(args.channel_control_helper)) {}
LoadBalancingPolicy::~LoadBalancingPolicy() {
grpc_pollset_set_destroy(interested_parties_);
GRPC_COMBINER_UNREF(combiner_, "lb_policy");
}
} // namespace grpc_core

@ -223,13 +223,11 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
// of a reference.
grpc_combiner* combiner = nullptr;
/// Channel control helper.
/// Note: LB policies MUST NOT call any method on the helper from
/// their constructor.
UniquePtr<ChannelControlHelper> channel_control_helper;
/// Channel args from the resolver.
/// Note that the LB policy gets the set of addresses from the
/// GRPC_ARG_SERVER_ADDRESS_LIST channel arg.
/// Channel args.
const grpc_channel_args* args = nullptr;
/// Load balancing config from the resolver.
grpc_json* lb_config = nullptr;
};
// Not copyable nor movable.
@ -240,15 +238,17 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
virtual const char* name() const GRPC_ABSTRACT;
/// Updates the policy with a new set of \a args and a new \a lb_config from
/// the resolver. Note that the LB policy gets the set of addresses from the
/// the resolver. Will be invoked immediately after LB policy is constructed,
/// and then again whenever the resolver returns a new result.
/// Note that the LB policy gets the set of addresses from the
/// GRPC_ARG_SERVER_ADDRESS_LIST channel arg.
virtual void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) GRPC_ABSTRACT;
/// Tries to enter a READY connectivity state.
/// TODO(roth): As part of restructuring how we handle IDLE state,
/// consider whether this method is still needed.
virtual void ExitIdleLocked() GRPC_ABSTRACT;
/// This is a no-op by default, since most LB policies never go into
/// IDLE state.
virtual void ExitIdleLocked() {}
/// Resets connection backoff.
virtual void ResetBackoffLocked() GRPC_ABSTRACT;
@ -290,6 +290,8 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
grpc_combiner* combiner() const { return combiner_; }
// Note: LB policies MUST NOT call any method on the helper from
// their constructor.
// Note: This will return null after ShutdownLocked() has been called.
ChannelControlHelper* channel_control_helper() const {
return channel_control_helper_.get();

@ -26,15 +26,13 @@
/// channel that uses pick_first to select from the list of balancer
/// addresses.
///
/// The first time the policy gets a request for a pick, a ping, or to exit
/// the idle state, \a StartPickingLocked() is called. This method is
/// responsible for instantiating the internal *streaming* call to the LB
/// server (whichever address pick_first chose). The call will be complete
/// when either the balancer sends status or when we cancel the call (e.g.,
/// because we are shutting down). In needed, we retry the call. If we
/// received at least one valid message from the server, a new call attempt
/// will be made immediately; otherwise, we apply back-off delays between
/// attempts.
/// When we get our initial update, we instantiate the internal *streaming*
/// call to the LB server (whichever address pick_first chose). The call
/// will be complete when either the balancer sends status or when we cancel
/// the call (e.g., because we are shutting down). In needed, we retry the
/// call. If we received at least one valid message from the server, a new
/// call attempt will be made immediately; otherwise, we apply back-off
/// delays between attempts.
///
/// We maintain an internal round_robin policy instance for distributing
/// requests across backends. Whenever we receive a new serverlist from
@ -130,7 +128,6 @@ class GrpcLb : public LoadBalancingPolicy {
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
void FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
@ -290,11 +287,10 @@ class GrpcLb : public LoadBalancingPolicy {
void ShutdownLocked() override;
// Helper function used in ctor and UpdateLocked().
// Helper function used in UpdateLocked().
void ProcessChannelArgsLocked(const grpc_channel_args& args);
// Methods for dealing with the balancer channel and call.
void StartPickingLocked();
void StartBalancerCallLocked();
static void OnFallbackTimerLocked(void* arg, grpc_error* error);
void StartBalancerCallRetryTimerLocked();
@ -303,9 +299,9 @@ class GrpcLb : public LoadBalancingPolicy {
grpc_error* error);
// Methods for dealing with the RR policy.
void CreateOrUpdateRoundRobinPolicyLocked();
grpc_channel_args* CreateRoundRobinPolicyArgsLocked();
void CreateRoundRobinPolicyLocked(Args args);
void CreateOrUpdateRoundRobinPolicyLocked();
// Who the client is trying to communicate with.
const char* server_name_ = nullptr;
@ -314,7 +310,6 @@ class GrpcLb : public LoadBalancingPolicy {
grpc_channel_args* args_ = nullptr;
// Internal state.
bool started_picking_ = false;
bool shutting_down_ = false;
// The channel for communicating with the LB server.
@ -1211,12 +1206,6 @@ GrpcLb::GrpcLb(LoadBalancingPolicy::Args args)
arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer(
arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
// Process channel args.
ProcessChannelArgsLocked(*args.args);
// Initialize channel with a picker that will start us connecting.
channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE,
UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
}
GrpcLb::~GrpcLb() {
@ -1249,12 +1238,6 @@ void GrpcLb::ShutdownLocked() {
// public methods
//
void GrpcLb::ExitIdleLocked() {
if (!started_picking_) {
StartPickingLocked();
}
}
void GrpcLb::ResetBackoffLocked() {
if (lb_channel_ != nullptr) {
grpc_channel_reset_connect_backoff(lb_channel_);
@ -1339,12 +1322,26 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
}
void GrpcLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) {
const bool is_initial_update = lb_channel_ == nullptr;
ProcessChannelArgsLocked(args);
// Update the existing RR policy.
if (rr_policy_ != nullptr) CreateOrUpdateRoundRobinPolicyLocked();
// Start watching the LB channel connectivity for connection, if not
// already doing so.
if (!watching_lb_channel_) {
// If this is the initial update, start the fallback timer.
if (is_initial_update) {
if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr &&
!fallback_timer_callback_pending_) {
grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
grpc_combiner_scheduler(combiner()));
fallback_timer_callback_pending_ = true;
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
}
StartBalancerCallLocked();
} else if (!watching_lb_channel_) {
// If this is not the initial update and we're not already watching
// the LB channel's connectivity state, start a watch now. This
// ensures that we'll know when to switch to a new balancer call.
lb_channel_connectivity_ = grpc_channel_check_connectivity_state(
lb_channel_, true /* try to connect */);
grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
@ -1368,25 +1365,6 @@ void GrpcLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) {
// code for balancer channel and call
//
void GrpcLb::StartPickingLocked() {
// Start a timer to fall back.
if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr &&
!fallback_timer_callback_pending_) {
grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
// TODO(roth): We currently track this ref manually. Once the
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
// with the callback.
auto self = Ref(DEBUG_LOCATION, "on_fallback_timer");
self.release();
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
grpc_combiner_scheduler(combiner()));
fallback_timer_callback_pending_ = true;
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
}
started_picking_ = true;
StartBalancerCallLocked();
}
void GrpcLb::StartBalancerCallLocked() {
GPR_ASSERT(lb_channel_ != nullptr);
if (shutting_down_) return;
@ -1488,13 +1466,11 @@ void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_READY:
grpclb_policy->lb_calld_.reset();
if (grpclb_policy->started_picking_) {
if (grpclb_policy->retry_timer_callback_pending_) {
grpc_timer_cancel(&grpclb_policy->lb_call_retry_timer_);
}
grpclb_policy->lb_call_backoff_.Reset();
grpclb_policy->StartBalancerCallLocked();
if (grpclb_policy->retry_timer_callback_pending_) {
grpc_timer_cancel(&grpclb_policy->lb_call_retry_timer_);
}
grpclb_policy->lb_call_backoff_.Reset();
grpclb_policy->StartBalancerCallLocked();
// fallthrough
case GRPC_CHANNEL_SHUTDOWN:
done:
@ -1508,27 +1484,6 @@ void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
// code for interacting with the RR policy
//
void GrpcLb::CreateRoundRobinPolicyLocked(Args args) {
GPR_ASSERT(rr_policy_ == nullptr);
rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
"round_robin", std::move(args));
if (GPR_UNLIKELY(rr_policy_ == nullptr)) {
gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy",
this);
return;
}
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this,
rr_policy_.get());
}
// Add the gRPC LB's interested_parties pollset_set to that of the newly
// created RR policy. This will make the RR policy progress upon activity on
// gRPC LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(),
interested_parties());
rr_policy_->ExitIdleLocked();
}
grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
ServerAddressList tmp_addresses;
ServerAddressList* addresses = &tmp_addresses;
@ -1570,17 +1525,31 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
return args;
}
void GrpcLb::CreateRoundRobinPolicyLocked(Args args) {
GPR_ASSERT(rr_policy_ == nullptr);
rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
"round_robin", std::move(args));
if (GPR_UNLIKELY(rr_policy_ == nullptr)) {
gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy",
this);
return;
}
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this,
rr_policy_.get());
}
// Add the gRPC LB's interested_parties pollset_set to that of the newly
// created RR policy. This will make the RR policy progress upon activity on
// gRPC LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(),
interested_parties());
}
void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() {
if (shutting_down_) return;
grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked();
GPR_ASSERT(args != nullptr);
if (rr_policy_ != nullptr) {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Updating RR policy %p", this,
rr_policy_.get());
}
rr_policy_->UpdateLocked(*args, nullptr);
} else {
if (rr_policy_ == nullptr) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.args = args;
@ -1588,6 +1557,11 @@ void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() {
UniquePtr<ChannelControlHelper>(New<Helper>(Ref()));
CreateRoundRobinPolicyLocked(std::move(lb_policy_args));
}
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Updating RR policy %p", this,
rr_policy_.get());
}
rr_policy_->UpdateLocked(*args, nullptr);
grpc_channel_args_destroy(args);
}

@ -132,7 +132,6 @@ class PickFirst : public LoadBalancingPolicy {
void ShutdownLocked() override;
void StartPickingLocked();
void UpdateChildRefsLocked();
// All our subchannels.
@ -141,8 +140,8 @@ class PickFirst : public LoadBalancingPolicy {
OrphanablePtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
// Selected subchannel in \a subchannel_list_.
PickFirstSubchannelData* selected_ = nullptr;
// Have we started picking?
bool started_picking_ = false;
// Are we in IDLE state?
bool idle_ = false;
// Are we shut down?
bool shutdown_ = false;
@ -158,12 +157,6 @@ PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p created.", this);
}
// Initialize channel with a picker that will start us connecting upon
// the first pick.
channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE,
UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
UpdateLocked(*args.args, args.lb_config);
}
PickFirst::~PickFirst() {
@ -185,17 +178,14 @@ void PickFirst::ShutdownLocked() {
latest_pending_subchannel_list_.reset();
}
void PickFirst::StartPickingLocked() {
started_picking_ = true;
if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels() > 0) {
subchannel_list_->subchannel(0)
->CheckConnectivityStateAndStartWatchingLocked();
}
}
void PickFirst::ExitIdleLocked() {
if (!started_picking_) {
StartPickingLocked();
if (idle_) {
idle_ = false;
if (subchannel_list_ != nullptr &&
subchannel_list_->num_subchannels() > 0) {
subchannel_list_->subchannel(0)
->CheckConnectivityStateAndStartWatchingLocked();
}
}
}
@ -289,6 +279,8 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args,
// currently selected subchannel is also present in the update. It
// can also happen if one of the subchannels in the update is already
// in the subchannel index because it's in use by another channel.
// TODO(roth): If we're in IDLE state, we should probably defer this
// check and instead do it in ExitIdleLocked().
for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
grpc_error* error = GRPC_ERROR_NONE;
@ -305,7 +297,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args,
// Make sure that subsequent calls to ExitIdleLocked() don't cause
// us to start watching a subchannel other than the one we've
// selected.
started_picking_ = true;
idle_ = false;
return;
}
}
@ -313,17 +305,17 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args,
// We don't yet have a selected subchannel, so replace the current
// subchannel list immediately.
subchannel_list_ = std::move(subchannel_list);
// If we've started picking, start trying to connect to the first
// If we're not in IDLE state, start trying to connect to the first
// subchannel in the new list.
if (started_picking_) {
if (!idle_) {
// Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
// here, since we've already checked the initial connectivity
// state of all subchannels above.
subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
}
} else {
// We do have a selected subchannel, so keep using it until one of
// the subchannels in the new list reports READY.
// We do have a selected subchannel (which means it's READY), so keep
// using it until one of the subchannels in the new list reports READY.
if (latest_pending_subchannel_list_ != nullptr) {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
@ -334,9 +326,9 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args,
}
}
latest_pending_subchannel_list_ = std::move(subchannel_list);
// If we've started picking, start trying to connect to the first
// If we're not in IDLE state, start trying to connect to the first
// subchannel in the new list.
if (started_picking_) {
if (!idle_) {
// Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
// here, since we've already checked the initial connectivity
// state of all subchannels above.
@ -385,11 +377,11 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
} else {
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected subchannel goes bad, request a re-resolution. We also
// set the channel state to IDLE and reset started_picking_. The reason
// set the channel state to IDLE and reset idle_. The reason
// is that if the new state is TRANSIENT_FAILURE due to a GOAWAY
// reception we don't want to connect to the re-resolved backends until
// we leave the IDLE state.
p->started_picking_ = false;
p->idle_ = true;
p->channel_control_helper()->RequestReresolution();
// In transient failure. Rely on re-resolution to recover.
p->selected_ = nullptr;

@ -63,7 +63,6 @@ class RoundRobin : public LoadBalancingPolicy {
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* ignored) override;
@ -181,7 +180,6 @@ class RoundRobin : public LoadBalancingPolicy {
void ShutdownLocked() override;
void StartPickingLocked();
void UpdateChildRefsLocked();
/** list of subchannels */
@ -192,8 +190,6 @@ class RoundRobin : public LoadBalancingPolicy {
* racing callbacks that reference outdated subchannel lists won't perform any
* update. */
OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
/** have we started picking? */
bool started_picking_ = false;
/** are we shutting down? */
bool shutdown_ = false;
/// Lock and data used to capture snapshots of this channel's child
@ -254,11 +250,6 @@ RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Created", this);
}
// Initialize channel with a picker that will start us connecting.
channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE,
UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
UpdateLocked(*args.args, args.lb_config);
}
RoundRobin::~RoundRobin() {
@ -280,17 +271,6 @@ void RoundRobin::ShutdownLocked() {
latest_pending_subchannel_list_.reset();
}
void RoundRobin::StartPickingLocked() {
started_picking_ = true;
subchannel_list_->StartWatchingLocked();
}
void RoundRobin::ExitIdleLocked() {
if (!started_picking_) {
StartPickingLocked();
}
}
void RoundRobin::ResetBackoffLocked() {
subchannel_list_->ResetBackoffLocked();
if (latest_pending_subchannel_list_ != nullptr) {
@ -526,19 +506,22 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args,
}
latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
this, &grpc_lb_round_robin_trace, *addresses, combiner(), args);
// If we haven't started picking yet or the new list is empty,
// immediately promote the new list to the current list.
if (!started_picking_ ||
latest_pending_subchannel_list_->num_subchannels() == 0) {
if (latest_pending_subchannel_list_->num_subchannels() == 0) {
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update");
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error),
UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
}
if (latest_pending_subchannel_list_->num_subchannels() == 0) {
// If the new list is empty, immediately promote the new list to the
// current list and transition to TRANSIENT_FAILURE.
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update");
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error),
UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
subchannel_list_ = std::move(latest_pending_subchannel_list_);
} else if (subchannel_list_ == nullptr) {
// If there is no current list, immediately promote the new list to
// the current list and start watching it.
subchannel_list_ = std::move(latest_pending_subchannel_list_);
subchannel_list_->StartWatchingLocked();
} else {
// If we've started picking, start watching the new list.
// Start watching the pending list. It will get swapped into the
// current list when it reports READY.
latest_pending_subchannel_list_->StartWatchingLocked();
}
}

@ -26,14 +26,13 @@
/// channel that uses pick_first to select from the list of balancer
/// addresses.
///
/// The first time the xDS policy gets a request for a pick or to exit the idle
/// state, \a StartPickingLocked() is called. This method is responsible for
/// instantiating the internal *streaming* call to the LB server (whichever
/// address pick_first chose). The call will be complete when either the
/// balancer sends status or when we cancel the call (e.g., because we are
/// shutting down). In needed, we retry the call. If we received at least one
/// valid message from the server, a new call attempt will be made immediately;
/// otherwise, we apply back-off delays between attempts.
/// When we get our initial update, we instantiate the internal *streaming*
/// call to the LB server (whichever address pick_first chose). The call
/// will be complete when either the balancer sends status or when we cancel
/// the call (e.g., because we are shutting down). In needed, we retry the
/// call. If we received at least one valid message from the server, a new
/// call attempt will be made immediately; otherwise, we apply back-off
/// delays between attempts.
///
/// We maintain an internal child policy (round_robin) instance for distributing
/// requests across backends. Whenever we receive a new serverlist from
@ -124,7 +123,6 @@ class XdsLb : public LoadBalancingPolicy {
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
void FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
@ -239,7 +237,7 @@ class XdsLb : public LoadBalancingPolicy {
void ShutdownLocked() override;
// Helper function used in ctor and UpdateLocked().
// Helper function used in UpdateLocked().
void ProcessChannelArgsLocked(const grpc_channel_args& args);
// Parses the xds config given the JSON node of the first child of XdsConfig.
@ -249,7 +247,6 @@ class XdsLb : public LoadBalancingPolicy {
void ParseLbConfig(grpc_json* xds_config_json);
// Methods for dealing with the balancer channel and call.
void StartPickingLocked();
void StartBalancerCallLocked();
static void OnFallbackTimerLocked(void* arg, grpc_error* error);
void StartBalancerCallRetryTimerLocked();
@ -272,7 +269,6 @@ class XdsLb : public LoadBalancingPolicy {
grpc_channel_args* args_ = nullptr;
// Internal state.
bool started_picking_ = false;
bool shutting_down_ = false;
// The channel for communicating with the LB server.
@ -992,14 +988,6 @@ XdsLb::XdsLb(LoadBalancingPolicy::Args args)
arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer(
arg, {GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
// Parse the LB config.
ParseLbConfig(args.lb_config);
// Process channel args.
ProcessChannelArgsLocked(*args.args);
// Initialize channel with a picker that will start us connecting.
channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE,
UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
}
XdsLb::~XdsLb() {
@ -1037,12 +1025,6 @@ void XdsLb::ShutdownLocked() {
// public methods
//
void XdsLb::ExitIdleLocked() {
if (!started_picking_) {
StartPickingLocked();
}
}
void XdsLb::ResetBackoffLocked() {
if (lb_channel_ != nullptr) {
grpc_channel_reset_connect_backoff(lb_channel_);
@ -1137,6 +1119,7 @@ void XdsLb::ParseLbConfig(grpc_json* xds_config_json) {
}
void XdsLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) {
const bool is_initial_update = lb_channel_ == nullptr;
ParseLbConfig(lb_config);
// TODO(juanlishen): Pass fallback policy config update after fallback policy
// is added.
@ -1150,9 +1133,26 @@ void XdsLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) {
// TODO(vpowar): Handle the fallback_address changes when we add support for
// fallback in xDS.
if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
// Start watching the LB channel connectivity for connection, if not
// already doing so.
if (!watching_lb_channel_) {
// If this is the initial update, start the fallback timer.
if (is_initial_update) {
if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr &&
!fallback_timer_callback_pending_) {
grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
// TODO(roth): We currently track this ref manually. Once the
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
// with the callback.
auto self = Ref(DEBUG_LOCATION, "on_fallback_timer");
self.release();
GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this,
grpc_combiner_scheduler(combiner()));
fallback_timer_callback_pending_ = true;
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
}
StartBalancerCallLocked();
} else if (!watching_lb_channel_) {
// If this is not the initial update and we're not already watching
// the LB channel's connectivity state, start a watch now. This
// ensures that we'll know when to switch to a new balancer call.
lb_channel_connectivity_ = grpc_channel_check_connectivity_state(
lb_channel_, true /* try to connect */);
grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
@ -1176,25 +1176,6 @@ void XdsLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) {
// code for balancer channel and call
//
void XdsLb::StartPickingLocked() {
// Start a timer to fall back.
if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr &&
!fallback_timer_callback_pending_) {
grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
// TODO(roth): We currently track this ref manually. Once the
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
// with the callback.
auto self = Ref(DEBUG_LOCATION, "on_fallback_timer");
self.release();
GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this,
grpc_combiner_scheduler(combiner()));
fallback_timer_callback_pending_ = true;
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
}
started_picking_ = true;
StartBalancerCallLocked();
}
void XdsLb::StartBalancerCallLocked() {
GPR_ASSERT(lb_channel_ != nullptr);
if (shutting_down_) return;
@ -1293,13 +1274,11 @@ void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_READY:
xdslb_policy->lb_calld_.reset();
if (xdslb_policy->started_picking_) {
if (xdslb_policy->retry_timer_callback_pending_) {
grpc_timer_cancel(&xdslb_policy->lb_call_retry_timer_);
}
xdslb_policy->lb_call_backoff_.Reset();
xdslb_policy->StartBalancerCallLocked();
if (xdslb_policy->retry_timer_callback_pending_) {
grpc_timer_cancel(&xdslb_policy->lb_call_retry_timer_);
}
xdslb_policy->lb_call_backoff_.Reset();
xdslb_policy->StartBalancerCallLocked();
// Fall through.
case GRPC_CHANNEL_SHUTDOWN:
done:
@ -1326,7 +1305,6 @@ void XdsLb::CreateChildPolicyLocked(const char* name, Args args) {
// xDS LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
interested_parties());
child_policy_->ExitIdleLocked();
}
grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() {
@ -1375,25 +1353,23 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() {
child_policy_name = "round_robin";
}
// TODO(juanlishen): Switch policy according to child_policy_config->key.
if (child_policy_ != nullptr) {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] Updating the child policy %p", this,
child_policy_.get());
}
child_policy_->UpdateLocked(*args, child_policy_config);
} else {
if (child_policy_ == nullptr) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
UniquePtr<ChannelControlHelper>(New<Helper>(Ref()));
lb_policy_args.lb_config = child_policy_config;
CreateChildPolicyLocked(child_policy_name, std::move(lb_policy_args));
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] Created a new child policy %p", this,
child_policy_.get());
}
}
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] Updating child policy %p", this,
child_policy_.get());
}
child_policy_->UpdateLocked(*args, child_policy_config);
grpc_channel_args_destroy(args);
grpc_json_destroy(child_policy_json);
}

@ -262,14 +262,12 @@ void ResolvingLoadBalancingPolicy::OnResolverShutdownLocked(grpc_error* error) {
// Creates a new LB policy, replacing any previous one.
// Updates trace_strings to indicate what was done.
void ResolvingLoadBalancingPolicy::CreateNewLbPolicyLocked(
const char* lb_policy_name, grpc_json* lb_config,
TraceStringVector* trace_strings) {
const char* lb_policy_name, TraceStringVector* trace_strings) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.channel_control_helper =
UniquePtr<ChannelControlHelper>(New<ResolvingControlHelper>(Ref()));
lb_policy_args.args = resolver_result_;
lb_policy_args.lb_config = lb_config;
OrphanablePtr<LoadBalancingPolicy> new_lb_policy =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
lb_policy_name, std::move(lb_policy_args));
@ -307,7 +305,6 @@ void ResolvingLoadBalancingPolicy::CreateNewLbPolicyLocked(
lb_policy_ = std::move(new_lb_policy);
grpc_pollset_set_add_pollset_set(lb_policy_->interested_parties(),
interested_parties());
lb_policy_->ExitIdleLocked();
}
}
@ -417,27 +414,22 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
lb_policy_config = self->child_lb_config_;
}
GPR_ASSERT(lb_policy_name != nullptr);
// Check to see if we're already using the right LB policy.
const bool lb_policy_name_changed =
self->lb_policy_ == nullptr ||
strcmp(self->lb_policy_->name(), lb_policy_name) != 0;
if (self->lb_policy_ != nullptr && !lb_policy_name_changed) {
// Continue using the same LB policy. Update with new addresses.
if (self->tracer_->enabled()) {
gpr_log(GPR_INFO,
"resolving_lb=%p: updating existing LB policy \"%s\" (%p)",
self, lb_policy_name, self->lb_policy_.get());
}
self->lb_policy_->UpdateLocked(*self->resolver_result_, lb_policy_config);
} else {
// Instantiate new LB policy.
// If we're not already using the right LB policy name, instantiate
// a new one.
if (self->lb_policy_ == nullptr ||
strcmp(self->lb_policy_->name(), lb_policy_name) != 0) {
if (self->tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: creating new LB policy \"%s\"",
self, lb_policy_name);
}
self->CreateNewLbPolicyLocked(lb_policy_name, lb_policy_config,
&trace_strings);
self->CreateNewLbPolicyLocked(lb_policy_name, &trace_strings);
}
// Update the LB policy with the new addresses and config.
if (self->tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: updating LB policy \"%s\" (%p)", self,
lb_policy_name, self->lb_policy_.get());
}
self->lb_policy_->UpdateLocked(*self->resolver_result_, lb_policy_config);
// Add channel trace event.
if (self->channelz_node() != nullptr) {
if (service_config_changed) {

@ -77,10 +77,8 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
virtual const char* name() const override { return "resolving_lb"; }
// No-op -- should never get updates from the channel.
// TODO(roth): Need to support updating child LB policy's config.
// For xds policy, will also need to support updating config
// independently of args from resolver, since they will be coming from
// different places. Maybe change LB policy API to support that?
// TODO(roth): Need to support updating child LB policy's config for xds
// use case.
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override {}
@ -104,7 +102,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
void StartResolvingLocked();
void OnResolverShutdownLocked(grpc_error* error);
void CreateNewLbPolicyLocked(const char* lb_policy_name, grpc_json* lb_config,
void CreateNewLbPolicyLocked(const char* lb_policy_name,
TraceStringVector* trace_strings);
void MaybeAddTraceMessagesForAddressChangesLocked(
TraceStringVector* trace_strings);

@ -56,7 +56,6 @@ class ForwardingLoadBalancingPolicy : public LoadBalancingPolicy {
delegate_args.combiner = combiner();
delegate_args.channel_control_helper = std::move(delegating_helper);
delegate_args.args = args.args;
delegate_args.lb_config = args.lb_config;
delegate_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
delegate_policy_name.c_str(), std::move(delegate_args));
grpc_pollset_set_add_pollset_set(delegate_->interested_parties(),

Loading…
Cancel
Save