diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index 9e3477b9ed5..527b241eb6b 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -28,6 +28,17 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount( namespace grpc_core { +LoadBalancingPolicy::LoadBalancingPolicy(Args args, intptr_t initial_refcount) + : InternallyRefCounted(&grpc_trace_lb_policy_refcount, initial_refcount), + combiner_(GRPC_COMBINER_REF(args.combiner, "lb_policy")), + interested_parties_(grpc_pollset_set_create()), + channel_control_helper_(std::move(args.channel_control_helper)) {} + +LoadBalancingPolicy::~LoadBalancingPolicy() { + grpc_pollset_set_destroy(interested_parties_); + GRPC_COMBINER_UNREF(combiner_, "lb_policy"); +} + grpc_json* LoadBalancingPolicy::ParseLoadBalancingConfig( const grpc_json* lb_config_array) { if (lb_config_array == nullptr || lb_config_array->type != GRPC_JSON_ARRAY) { @@ -54,15 +65,4 @@ grpc_json* LoadBalancingPolicy::ParseLoadBalancingConfig( return nullptr; } -LoadBalancingPolicy::LoadBalancingPolicy(Args args, intptr_t initial_refcount) - : InternallyRefCounted(&grpc_trace_lb_policy_refcount, initial_refcount), - combiner_(GRPC_COMBINER_REF(args.combiner, "lb_policy")), - interested_parties_(grpc_pollset_set_create()), - channel_control_helper_(std::move(args.channel_control_helper)) {} - -LoadBalancingPolicy::~LoadBalancingPolicy() { - grpc_pollset_set_destroy(interested_parties_); - GRPC_COMBINER_UNREF(combiner_, "lb_policy"); -} - } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index aeb8138a12e..20a94ed9ab9 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -223,13 +223,11 @@ class LoadBalancingPolicy : public InternallyRefCounted { // of a reference. grpc_combiner* combiner = nullptr; /// Channel control helper. + /// Note: LB policies MUST NOT call any method on the helper from + /// their constructor. UniquePtr channel_control_helper; - /// Channel args from the resolver. - /// Note that the LB policy gets the set of addresses from the - /// GRPC_ARG_SERVER_ADDRESS_LIST channel arg. + /// Channel args. const grpc_channel_args* args = nullptr; - /// Load balancing config from the resolver. - grpc_json* lb_config = nullptr; }; // Not copyable nor movable. @@ -240,15 +238,17 @@ class LoadBalancingPolicy : public InternallyRefCounted { virtual const char* name() const GRPC_ABSTRACT; /// Updates the policy with a new set of \a args and a new \a lb_config from - /// the resolver. Note that the LB policy gets the set of addresses from the + /// the resolver. Will be invoked immediately after LB policy is constructed, + /// and then again whenever the resolver returns a new result. + /// Note that the LB policy gets the set of addresses from the /// GRPC_ARG_SERVER_ADDRESS_LIST channel arg. virtual void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) GRPC_ABSTRACT; /// Tries to enter a READY connectivity state. - /// TODO(roth): As part of restructuring how we handle IDLE state, - /// consider whether this method is still needed. - virtual void ExitIdleLocked() GRPC_ABSTRACT; + /// This is a no-op by default, since most LB policies never go into + /// IDLE state. + virtual void ExitIdleLocked() {} /// Resets connection backoff. virtual void ResetBackoffLocked() GRPC_ABSTRACT; @@ -290,6 +290,8 @@ class LoadBalancingPolicy : public InternallyRefCounted { grpc_combiner* combiner() const { return combiner_; } + // Note: LB policies MUST NOT call any method on the helper from + // their constructor. // Note: This will return null after ShutdownLocked() has been called. ChannelControlHelper* channel_control_helper() const { return channel_control_helper_.get(); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index fa1ca6d127a..12daea46dd1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -26,15 +26,13 @@ /// channel that uses pick_first to select from the list of balancer /// addresses. /// -/// The first time the policy gets a request for a pick, a ping, or to exit -/// the idle state, \a StartPickingLocked() is called. This method is -/// responsible for instantiating the internal *streaming* call to the LB -/// server (whichever address pick_first chose). The call will be complete -/// when either the balancer sends status or when we cancel the call (e.g., -/// because we are shutting down). In needed, we retry the call. If we -/// received at least one valid message from the server, a new call attempt -/// will be made immediately; otherwise, we apply back-off delays between -/// attempts. +/// When we get our initial update, we instantiate the internal *streaming* +/// call to the LB server (whichever address pick_first chose). The call +/// will be complete when either the balancer sends status or when we cancel +/// the call (e.g., because we are shutting down). In needed, we retry the +/// call. If we received at least one valid message from the server, a new +/// call attempt will be made immediately; otherwise, we apply back-off +/// delays between attempts. /// /// We maintain an internal round_robin policy instance for distributing /// requests across backends. Whenever we receive a new serverlist from @@ -130,7 +128,6 @@ class GrpcLb : public LoadBalancingPolicy { void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; - void ExitIdleLocked() override; void ResetBackoffLocked() override; void FillChildRefsForChannelz( channelz::ChildRefsList* child_subchannels, @@ -290,11 +287,10 @@ class GrpcLb : public LoadBalancingPolicy { void ShutdownLocked() override; - // Helper function used in ctor and UpdateLocked(). + // Helper function used in UpdateLocked(). void ProcessChannelArgsLocked(const grpc_channel_args& args); // Methods for dealing with the balancer channel and call. - void StartPickingLocked(); void StartBalancerCallLocked(); static void OnFallbackTimerLocked(void* arg, grpc_error* error); void StartBalancerCallRetryTimerLocked(); @@ -303,9 +299,9 @@ class GrpcLb : public LoadBalancingPolicy { grpc_error* error); // Methods for dealing with the RR policy. - void CreateOrUpdateRoundRobinPolicyLocked(); grpc_channel_args* CreateRoundRobinPolicyArgsLocked(); void CreateRoundRobinPolicyLocked(Args args); + void CreateOrUpdateRoundRobinPolicyLocked(); // Who the client is trying to communicate with. const char* server_name_ = nullptr; @@ -314,7 +310,6 @@ class GrpcLb : public LoadBalancingPolicy { grpc_channel_args* args_ = nullptr; // Internal state. - bool started_picking_ = false; bool shutting_down_ = false; // The channel for communicating with the LB server. @@ -1211,12 +1206,6 @@ GrpcLb::GrpcLb(LoadBalancingPolicy::Args args) arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS); lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer( arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX}); - // Process channel args. - ProcessChannelArgsLocked(*args.args); - // Initialize channel with a picker that will start us connecting. - channel_control_helper()->UpdateState( - GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, - UniquePtr(New(Ref()))); } GrpcLb::~GrpcLb() { @@ -1249,12 +1238,6 @@ void GrpcLb::ShutdownLocked() { // public methods // -void GrpcLb::ExitIdleLocked() { - if (!started_picking_) { - StartPickingLocked(); - } -} - void GrpcLb::ResetBackoffLocked() { if (lb_channel_ != nullptr) { grpc_channel_reset_connect_backoff(lb_channel_); @@ -1339,12 +1322,26 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { } void GrpcLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) { + const bool is_initial_update = lb_channel_ == nullptr; ProcessChannelArgsLocked(args); // Update the existing RR policy. if (rr_policy_ != nullptr) CreateOrUpdateRoundRobinPolicyLocked(); - // Start watching the LB channel connectivity for connection, if not - // already doing so. - if (!watching_lb_channel_) { + // If this is the initial update, start the fallback timer. + if (is_initial_update) { + if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr && + !fallback_timer_callback_pending_) { + grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; + Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback + GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this, + grpc_combiner_scheduler(combiner())); + fallback_timer_callback_pending_ = true; + grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); + } + StartBalancerCallLocked(); + } else if (!watching_lb_channel_) { + // If this is not the initial update and we're not already watching + // the LB channel's connectivity state, start a watch now. This + // ensures that we'll know when to switch to a new balancer call. lb_channel_connectivity_ = grpc_channel_check_connectivity_state( lb_channel_, true /* try to connect */); grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( @@ -1368,25 +1365,6 @@ void GrpcLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) { // code for balancer channel and call // -void GrpcLb::StartPickingLocked() { - // Start a timer to fall back. - if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr && - !fallback_timer_callback_pending_) { - grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; - // TODO(roth): We currently track this ref manually. Once the - // ClosureRef API is ready, we should pass the RefCountedPtr<> along - // with the callback. - auto self = Ref(DEBUG_LOCATION, "on_fallback_timer"); - self.release(); - GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this, - grpc_combiner_scheduler(combiner())); - fallback_timer_callback_pending_ = true; - grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); - } - started_picking_ = true; - StartBalancerCallLocked(); -} - void GrpcLb::StartBalancerCallLocked() { GPR_ASSERT(lb_channel_ != nullptr); if (shutting_down_) return; @@ -1488,13 +1466,11 @@ void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg, case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_READY: grpclb_policy->lb_calld_.reset(); - if (grpclb_policy->started_picking_) { - if (grpclb_policy->retry_timer_callback_pending_) { - grpc_timer_cancel(&grpclb_policy->lb_call_retry_timer_); - } - grpclb_policy->lb_call_backoff_.Reset(); - grpclb_policy->StartBalancerCallLocked(); + if (grpclb_policy->retry_timer_callback_pending_) { + grpc_timer_cancel(&grpclb_policy->lb_call_retry_timer_); } + grpclb_policy->lb_call_backoff_.Reset(); + grpclb_policy->StartBalancerCallLocked(); // fallthrough case GRPC_CHANNEL_SHUTDOWN: done: @@ -1508,27 +1484,6 @@ void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg, // code for interacting with the RR policy // -void GrpcLb::CreateRoundRobinPolicyLocked(Args args) { - GPR_ASSERT(rr_policy_ == nullptr); - rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( - "round_robin", std::move(args)); - if (GPR_UNLIKELY(rr_policy_ == nullptr)) { - gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy", - this); - return; - } - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this, - rr_policy_.get()); - } - // Add the gRPC LB's interested_parties pollset_set to that of the newly - // created RR policy. This will make the RR policy progress upon activity on - // gRPC LB, which in turn is tied to the application's call. - grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(), - interested_parties()); - rr_policy_->ExitIdleLocked(); -} - grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { ServerAddressList tmp_addresses; ServerAddressList* addresses = &tmp_addresses; @@ -1570,17 +1525,31 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { return args; } +void GrpcLb::CreateRoundRobinPolicyLocked(Args args) { + GPR_ASSERT(rr_policy_ == nullptr); + rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( + "round_robin", std::move(args)); + if (GPR_UNLIKELY(rr_policy_ == nullptr)) { + gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy", + this); + return; + } + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this, + rr_policy_.get()); + } + // Add the gRPC LB's interested_parties pollset_set to that of the newly + // created RR policy. This will make the RR policy progress upon activity on + // gRPC LB, which in turn is tied to the application's call. + grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(), + interested_parties()); +} + void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() { if (shutting_down_) return; grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked(); GPR_ASSERT(args != nullptr); - if (rr_policy_ != nullptr) { - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Updating RR policy %p", this, - rr_policy_.get()); - } - rr_policy_->UpdateLocked(*args, nullptr); - } else { + if (rr_policy_ == nullptr) { LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = combiner(); lb_policy_args.args = args; @@ -1588,6 +1557,11 @@ void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() { UniquePtr(New(Ref())); CreateRoundRobinPolicyLocked(std::move(lb_policy_args)); } + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, "[grpclb %p] Updating RR policy %p", this, + rr_policy_.get()); + } + rr_policy_->UpdateLocked(*args, nullptr); grpc_channel_args_destroy(args); } diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index bf1c5bd7914..58bf3d89f21 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -132,7 +132,6 @@ class PickFirst : public LoadBalancingPolicy { void ShutdownLocked() override; - void StartPickingLocked(); void UpdateChildRefsLocked(); // All our subchannels. @@ -141,8 +140,8 @@ class PickFirst : public LoadBalancingPolicy { OrphanablePtr latest_pending_subchannel_list_; // Selected subchannel in \a subchannel_list_. PickFirstSubchannelData* selected_ = nullptr; - // Have we started picking? - bool started_picking_ = false; + // Are we in IDLE state? + bool idle_ = false; // Are we shut down? bool shutdown_ = false; @@ -158,12 +157,6 @@ PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) { if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p created.", this); } - // Initialize channel with a picker that will start us connecting upon - // the first pick. - channel_control_helper()->UpdateState( - GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, - UniquePtr(New(Ref()))); - UpdateLocked(*args.args, args.lb_config); } PickFirst::~PickFirst() { @@ -185,17 +178,14 @@ void PickFirst::ShutdownLocked() { latest_pending_subchannel_list_.reset(); } -void PickFirst::StartPickingLocked() { - started_picking_ = true; - if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels() > 0) { - subchannel_list_->subchannel(0) - ->CheckConnectivityStateAndStartWatchingLocked(); - } -} - void PickFirst::ExitIdleLocked() { - if (!started_picking_) { - StartPickingLocked(); + if (idle_) { + idle_ = false; + if (subchannel_list_ != nullptr && + subchannel_list_->num_subchannels() > 0) { + subchannel_list_->subchannel(0) + ->CheckConnectivityStateAndStartWatchingLocked(); + } } } @@ -289,6 +279,8 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args, // currently selected subchannel is also present in the update. It // can also happen if one of the subchannels in the update is already // in the subchannel index because it's in use by another channel. + // TODO(roth): If we're in IDLE state, we should probably defer this + // check and instead do it in ExitIdleLocked(). for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { PickFirstSubchannelData* sd = subchannel_list->subchannel(i); grpc_error* error = GRPC_ERROR_NONE; @@ -305,7 +297,7 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args, // Make sure that subsequent calls to ExitIdleLocked() don't cause // us to start watching a subchannel other than the one we've // selected. - started_picking_ = true; + idle_ = false; return; } } @@ -313,17 +305,17 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args, // We don't yet have a selected subchannel, so replace the current // subchannel list immediately. subchannel_list_ = std::move(subchannel_list); - // If we've started picking, start trying to connect to the first + // If we're not in IDLE state, start trying to connect to the first // subchannel in the new list. - if (started_picking_) { + if (!idle_) { // Note: No need to use CheckConnectivityStateAndStartWatchingLocked() // here, since we've already checked the initial connectivity // state of all subchannels above. subchannel_list_->subchannel(0)->StartConnectivityWatchLocked(); } } else { - // We do have a selected subchannel, so keep using it until one of - // the subchannels in the new list reports READY. + // We do have a selected subchannel (which means it's READY), so keep + // using it until one of the subchannels in the new list reports READY. if (latest_pending_subchannel_list_ != nullptr) { if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, @@ -334,9 +326,9 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args, } } latest_pending_subchannel_list_ = std::move(subchannel_list); - // If we've started picking, start trying to connect to the first + // If we're not in IDLE state, start trying to connect to the first // subchannel in the new list. - if (started_picking_) { + if (!idle_) { // Note: No need to use CheckConnectivityStateAndStartWatchingLocked() // here, since we've already checked the initial connectivity // state of all subchannels above. @@ -385,11 +377,11 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( } else { if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // If the selected subchannel goes bad, request a re-resolution. We also - // set the channel state to IDLE and reset started_picking_. The reason + // set the channel state to IDLE and reset idle_. The reason // is that if the new state is TRANSIENT_FAILURE due to a GOAWAY // reception we don't want to connect to the re-resolved backends until // we leave the IDLE state. - p->started_picking_ = false; + p->idle_ = true; p->channel_control_helper()->RequestReresolution(); // In transient failure. Rely on re-resolution to recover. p->selected_ = nullptr; diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 0406efb71d3..f92c2d4ba59 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -63,7 +63,6 @@ class RoundRobin : public LoadBalancingPolicy { void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; - void ExitIdleLocked() override; void ResetBackoffLocked() override; void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels, channelz::ChildRefsList* ignored) override; @@ -181,7 +180,6 @@ class RoundRobin : public LoadBalancingPolicy { void ShutdownLocked() override; - void StartPickingLocked(); void UpdateChildRefsLocked(); /** list of subchannels */ @@ -192,8 +190,6 @@ class RoundRobin : public LoadBalancingPolicy { * racing callbacks that reference outdated subchannel lists won't perform any * update. */ OrphanablePtr latest_pending_subchannel_list_; - /** have we started picking? */ - bool started_picking_ = false; /** are we shutting down? */ bool shutdown_ = false; /// Lock and data used to capture snapshots of this channel's child @@ -254,11 +250,6 @@ RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Created", this); } - // Initialize channel with a picker that will start us connecting. - channel_control_helper()->UpdateState( - GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, - UniquePtr(New(Ref()))); - UpdateLocked(*args.args, args.lb_config); } RoundRobin::~RoundRobin() { @@ -280,17 +271,6 @@ void RoundRobin::ShutdownLocked() { latest_pending_subchannel_list_.reset(); } -void RoundRobin::StartPickingLocked() { - started_picking_ = true; - subchannel_list_->StartWatchingLocked(); -} - -void RoundRobin::ExitIdleLocked() { - if (!started_picking_) { - StartPickingLocked(); - } -} - void RoundRobin::ResetBackoffLocked() { subchannel_list_->ResetBackoffLocked(); if (latest_pending_subchannel_list_ != nullptr) { @@ -526,19 +506,22 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args, } latest_pending_subchannel_list_ = MakeOrphanable( this, &grpc_lb_round_robin_trace, *addresses, combiner(), args); - // If we haven't started picking yet or the new list is empty, - // immediately promote the new list to the current list. - if (!started_picking_ || - latest_pending_subchannel_list_->num_subchannels() == 0) { - if (latest_pending_subchannel_list_->num_subchannels() == 0) { - grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"); - channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), - UniquePtr(New(error))); - } + if (latest_pending_subchannel_list_->num_subchannels() == 0) { + // If the new list is empty, immediately promote the new list to the + // current list and transition to TRANSIENT_FAILURE. + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"); + channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), + UniquePtr(New(error))); + subchannel_list_ = std::move(latest_pending_subchannel_list_); + } else if (subchannel_list_ == nullptr) { + // If there is no current list, immediately promote the new list to + // the current list and start watching it. subchannel_list_ = std::move(latest_pending_subchannel_list_); + subchannel_list_->StartWatchingLocked(); } else { - // If we've started picking, start watching the new list. + // Start watching the pending list. It will get swapped into the + // current list when it reports READY. latest_pending_subchannel_list_->StartWatchingLocked(); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 4b3f2882424..a1d2002079d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -26,14 +26,13 @@ /// channel that uses pick_first to select from the list of balancer /// addresses. /// -/// The first time the xDS policy gets a request for a pick or to exit the idle -/// state, \a StartPickingLocked() is called. This method is responsible for -/// instantiating the internal *streaming* call to the LB server (whichever -/// address pick_first chose). The call will be complete when either the -/// balancer sends status or when we cancel the call (e.g., because we are -/// shutting down). In needed, we retry the call. If we received at least one -/// valid message from the server, a new call attempt will be made immediately; -/// otherwise, we apply back-off delays between attempts. +/// When we get our initial update, we instantiate the internal *streaming* +/// call to the LB server (whichever address pick_first chose). The call +/// will be complete when either the balancer sends status or when we cancel +/// the call (e.g., because we are shutting down). In needed, we retry the +/// call. If we received at least one valid message from the server, a new +/// call attempt will be made immediately; otherwise, we apply back-off +/// delays between attempts. /// /// We maintain an internal child policy (round_robin) instance for distributing /// requests across backends. Whenever we receive a new serverlist from @@ -124,7 +123,6 @@ class XdsLb : public LoadBalancingPolicy { void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override; - void ExitIdleLocked() override; void ResetBackoffLocked() override; void FillChildRefsForChannelz( channelz::ChildRefsList* child_subchannels, @@ -239,7 +237,7 @@ class XdsLb : public LoadBalancingPolicy { void ShutdownLocked() override; - // Helper function used in ctor and UpdateLocked(). + // Helper function used in UpdateLocked(). void ProcessChannelArgsLocked(const grpc_channel_args& args); // Parses the xds config given the JSON node of the first child of XdsConfig. @@ -249,7 +247,6 @@ class XdsLb : public LoadBalancingPolicy { void ParseLbConfig(grpc_json* xds_config_json); // Methods for dealing with the balancer channel and call. - void StartPickingLocked(); void StartBalancerCallLocked(); static void OnFallbackTimerLocked(void* arg, grpc_error* error); void StartBalancerCallRetryTimerLocked(); @@ -272,7 +269,6 @@ class XdsLb : public LoadBalancingPolicy { grpc_channel_args* args_ = nullptr; // Internal state. - bool started_picking_ = false; bool shutting_down_ = false; // The channel for communicating with the LB server. @@ -992,14 +988,6 @@ XdsLb::XdsLb(LoadBalancingPolicy::Args args) arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS); lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer( arg, {GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX}); - // Parse the LB config. - ParseLbConfig(args.lb_config); - // Process channel args. - ProcessChannelArgsLocked(*args.args); - // Initialize channel with a picker that will start us connecting. - channel_control_helper()->UpdateState( - GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, - UniquePtr(New(Ref()))); } XdsLb::~XdsLb() { @@ -1037,12 +1025,6 @@ void XdsLb::ShutdownLocked() { // public methods // -void XdsLb::ExitIdleLocked() { - if (!started_picking_) { - StartPickingLocked(); - } -} - void XdsLb::ResetBackoffLocked() { if (lb_channel_ != nullptr) { grpc_channel_reset_connect_backoff(lb_channel_); @@ -1137,6 +1119,7 @@ void XdsLb::ParseLbConfig(grpc_json* xds_config_json) { } void XdsLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) { + const bool is_initial_update = lb_channel_ == nullptr; ParseLbConfig(lb_config); // TODO(juanlishen): Pass fallback policy config update after fallback policy // is added. @@ -1150,9 +1133,26 @@ void XdsLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) { // TODO(vpowar): Handle the fallback_address changes when we add support for // fallback in xDS. if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked(); - // Start watching the LB channel connectivity for connection, if not - // already doing so. - if (!watching_lb_channel_) { + // If this is the initial update, start the fallback timer. + if (is_initial_update) { + if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr && + !fallback_timer_callback_pending_) { + grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; + // TODO(roth): We currently track this ref manually. Once the + // ClosureRef API is ready, we should pass the RefCountedPtr<> along + // with the callback. + auto self = Ref(DEBUG_LOCATION, "on_fallback_timer"); + self.release(); + GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this, + grpc_combiner_scheduler(combiner())); + fallback_timer_callback_pending_ = true; + grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); + } + StartBalancerCallLocked(); + } else if (!watching_lb_channel_) { + // If this is not the initial update and we're not already watching + // the LB channel's connectivity state, start a watch now. This + // ensures that we'll know when to switch to a new balancer call. lb_channel_connectivity_ = grpc_channel_check_connectivity_state( lb_channel_, true /* try to connect */); grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( @@ -1176,25 +1176,6 @@ void XdsLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) { // code for balancer channel and call // -void XdsLb::StartPickingLocked() { - // Start a timer to fall back. - if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr && - !fallback_timer_callback_pending_) { - grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; - // TODO(roth): We currently track this ref manually. Once the - // ClosureRef API is ready, we should pass the RefCountedPtr<> along - // with the callback. - auto self = Ref(DEBUG_LOCATION, "on_fallback_timer"); - self.release(); - GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this, - grpc_combiner_scheduler(combiner())); - fallback_timer_callback_pending_ = true; - grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); - } - started_picking_ = true; - StartBalancerCallLocked(); -} - void XdsLb::StartBalancerCallLocked() { GPR_ASSERT(lb_channel_ != nullptr); if (shutting_down_) return; @@ -1293,13 +1274,11 @@ void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg, case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_READY: xdslb_policy->lb_calld_.reset(); - if (xdslb_policy->started_picking_) { - if (xdslb_policy->retry_timer_callback_pending_) { - grpc_timer_cancel(&xdslb_policy->lb_call_retry_timer_); - } - xdslb_policy->lb_call_backoff_.Reset(); - xdslb_policy->StartBalancerCallLocked(); + if (xdslb_policy->retry_timer_callback_pending_) { + grpc_timer_cancel(&xdslb_policy->lb_call_retry_timer_); } + xdslb_policy->lb_call_backoff_.Reset(); + xdslb_policy->StartBalancerCallLocked(); // Fall through. case GRPC_CHANNEL_SHUTDOWN: done: @@ -1326,7 +1305,6 @@ void XdsLb::CreateChildPolicyLocked(const char* name, Args args) { // xDS LB, which in turn is tied to the application's call. grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(), interested_parties()); - child_policy_->ExitIdleLocked(); } grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() { @@ -1375,25 +1353,23 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() { child_policy_name = "round_robin"; } // TODO(juanlishen): Switch policy according to child_policy_config->key. - if (child_policy_ != nullptr) { - if (grpc_lb_xds_trace.enabled()) { - gpr_log(GPR_INFO, "[xdslb %p] Updating the child policy %p", this, - child_policy_.get()); - } - child_policy_->UpdateLocked(*args, child_policy_config); - } else { + if (child_policy_ == nullptr) { LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = combiner(); lb_policy_args.args = args; lb_policy_args.channel_control_helper = UniquePtr(New(Ref())); - lb_policy_args.lb_config = child_policy_config; CreateChildPolicyLocked(child_policy_name, std::move(lb_policy_args)); if (grpc_lb_xds_trace.enabled()) { gpr_log(GPR_INFO, "[xdslb %p] Created a new child policy %p", this, child_policy_.get()); } } + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, "[xdslb %p] Updating child policy %p", this, + child_policy_.get()); + } + child_policy_->UpdateLocked(*args, child_policy_config); grpc_channel_args_destroy(args); grpc_json_destroy(child_policy_json); } diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/src/core/ext/filters/client_channel/resolving_lb_policy.cc index ad9720fdda9..22050cba59e 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.cc +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.cc @@ -262,14 +262,12 @@ void ResolvingLoadBalancingPolicy::OnResolverShutdownLocked(grpc_error* error) { // Creates a new LB policy, replacing any previous one. // Updates trace_strings to indicate what was done. void ResolvingLoadBalancingPolicy::CreateNewLbPolicyLocked( - const char* lb_policy_name, grpc_json* lb_config, - TraceStringVector* trace_strings) { + const char* lb_policy_name, TraceStringVector* trace_strings) { LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = combiner(); lb_policy_args.channel_control_helper = UniquePtr(New(Ref())); lb_policy_args.args = resolver_result_; - lb_policy_args.lb_config = lb_config; OrphanablePtr new_lb_policy = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( lb_policy_name, std::move(lb_policy_args)); @@ -307,7 +305,6 @@ void ResolvingLoadBalancingPolicy::CreateNewLbPolicyLocked( lb_policy_ = std::move(new_lb_policy); grpc_pollset_set_add_pollset_set(lb_policy_->interested_parties(), interested_parties()); - lb_policy_->ExitIdleLocked(); } } @@ -417,27 +414,22 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked( lb_policy_config = self->child_lb_config_; } GPR_ASSERT(lb_policy_name != nullptr); - // Check to see if we're already using the right LB policy. - const bool lb_policy_name_changed = - self->lb_policy_ == nullptr || - strcmp(self->lb_policy_->name(), lb_policy_name) != 0; - if (self->lb_policy_ != nullptr && !lb_policy_name_changed) { - // Continue using the same LB policy. Update with new addresses. - if (self->tracer_->enabled()) { - gpr_log(GPR_INFO, - "resolving_lb=%p: updating existing LB policy \"%s\" (%p)", - self, lb_policy_name, self->lb_policy_.get()); - } - self->lb_policy_->UpdateLocked(*self->resolver_result_, lb_policy_config); - } else { - // Instantiate new LB policy. + // If we're not already using the right LB policy name, instantiate + // a new one. + if (self->lb_policy_ == nullptr || + strcmp(self->lb_policy_->name(), lb_policy_name) != 0) { if (self->tracer_->enabled()) { gpr_log(GPR_INFO, "resolving_lb=%p: creating new LB policy \"%s\"", self, lb_policy_name); } - self->CreateNewLbPolicyLocked(lb_policy_name, lb_policy_config, - &trace_strings); + self->CreateNewLbPolicyLocked(lb_policy_name, &trace_strings); + } + // Update the LB policy with the new addresses and config. + if (self->tracer_->enabled()) { + gpr_log(GPR_INFO, "resolving_lb=%p: updating LB policy \"%s\" (%p)", self, + lb_policy_name, self->lb_policy_.get()); } + self->lb_policy_->UpdateLocked(*self->resolver_result_, lb_policy_config); // Add channel trace event. if (self->channelz_node() != nullptr) { if (service_config_changed) { diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.h b/src/core/ext/filters/client_channel/resolving_lb_policy.h index c302ae5d975..19ca62fc556 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.h +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.h @@ -77,10 +77,8 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { virtual const char* name() const override { return "resolving_lb"; } // No-op -- should never get updates from the channel. - // TODO(roth): Need to support updating child LB policy's config. - // For xds policy, will also need to support updating config - // independently of args from resolver, since they will be coming from - // different places. Maybe change LB policy API to support that? + // TODO(roth): Need to support updating child LB policy's config for xds + // use case. void UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) override {} @@ -104,7 +102,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { void StartResolvingLocked(); void OnResolverShutdownLocked(grpc_error* error); - void CreateNewLbPolicyLocked(const char* lb_policy_name, grpc_json* lb_config, + void CreateNewLbPolicyLocked(const char* lb_policy_name, TraceStringVector* trace_strings); void MaybeAddTraceMessagesForAddressChangesLocked( TraceStringVector* trace_strings); diff --git a/test/core/util/test_lb_policies.cc b/test/core/util/test_lb_policies.cc index 77b354740e5..bfdd7441563 100644 --- a/test/core/util/test_lb_policies.cc +++ b/test/core/util/test_lb_policies.cc @@ -56,7 +56,6 @@ class ForwardingLoadBalancingPolicy : public LoadBalancingPolicy { delegate_args.combiner = combiner(); delegate_args.channel_control_helper = std::move(delegating_helper); delegate_args.args = args.args; - delegate_args.lb_config = args.lb_config; delegate_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( delegate_policy_name.c_str(), std::move(delegate_args)); grpc_pollset_set_add_pollset_set(delegate_->interested_parties(),