diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index d6dde0d7a79..184215a3da9 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -307,7 +307,7 @@ class GrpcLb : public LoadBalancingPolicy { // Methods for dealing with the child policy. grpc_channel_args* CreateChildPolicyArgsLocked(); OrphanablePtr CreateChildPolicyLocked( - const char* name, grpc_channel_args* args); + const char* name, const grpc_channel_args* args); void CreateOrUpdateChildPolicyLocked(); // Who the client is trying to communicate with. @@ -685,7 +685,7 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, void GrpcLb::Helper::RequestReresolution() { if (parent_->shutting_down_) return; // If there is a pending child policy, ignore re-resolution requests - // from the current child policy (or any outdated pending child). + // from the current child policy (or any outdated child). if (parent_->pending_child_policy_ != nullptr && !CalledByPendingChild()) { return; } @@ -1608,7 +1608,7 @@ grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked() { } OrphanablePtr GrpcLb::CreateChildPolicyLocked( - const char* name, grpc_channel_args* args) { + const char* name, const grpc_channel_args* args) { Helper* helper = New(Ref()); LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = combiner(); diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index a9ca34e5a52..4b386d37797 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -278,8 +278,14 @@ class XdsLb : public LoadBalancingPolicy { UniquePtr picker) override; void RequestReresolution() override; + void set_child(LoadBalancingPolicy* child) { child_ = child; } + private: + bool CalledByPendingChild() const; + bool CalledByCurrentChild() const; + RefCountedPtr parent_; + LoadBalancingPolicy* child_ = nullptr; }; ~XdsLb(); @@ -306,7 +312,8 @@ class XdsLb : public LoadBalancingPolicy { // Methods for dealing with the child policy. void CreateOrUpdateChildPolicyLocked(); grpc_channel_args* CreateChildPolicyArgsLocked(); - void CreateChildPolicyLocked(const char* name, Args args); + OrphanablePtr CreateChildPolicyLocked( + const char* name, const grpc_channel_args* args); // Who the client is trying to communicate with. const char* server_name_ = nullptr; @@ -349,6 +356,10 @@ class XdsLb : public LoadBalancingPolicy { // The policy to use for the backends. RefCountedPtr child_policy_config_; OrphanablePtr child_policy_; + OrphanablePtr pending_child_policy_; + // Lock held when modifying the value of child_policy_ or + // pending_child_policy_. + gpr_mu child_policy_mu_; }; // @@ -372,14 +383,30 @@ XdsLb::Picker::PickResult XdsLb::Picker::Pick(PickState* pick, // XdsLb::Helper // +bool XdsLb::Helper::CalledByPendingChild() const { + GPR_ASSERT(child_ != nullptr); + return child_ == parent_->pending_child_policy_.get(); +} + +bool XdsLb::Helper::CalledByCurrentChild() const { + GPR_ASSERT(child_ != nullptr); + return child_ == parent_->child_policy_.get(); +} + Subchannel* XdsLb::Helper::CreateSubchannel(const grpc_channel_args& args) { - if (parent_->shutting_down_) return nullptr; + if (parent_->shutting_down_ || + (!CalledByPendingChild() && !CalledByCurrentChild())) { + return nullptr; + } return parent_->channel_control_helper()->CreateSubchannel(args); } grpc_channel* XdsLb::Helper::CreateChannel(const char* target, const grpc_channel_args& args) { - if (parent_->shutting_down_) return nullptr; + if (parent_->shutting_down_ || + (!CalledByPendingChild() && !CalledByCurrentChild())) { + return nullptr; + } return parent_->channel_control_helper()->CreateChannel(target, args); } @@ -390,6 +417,26 @@ void XdsLb::Helper::UpdateState(grpc_connectivity_state state, GRPC_ERROR_UNREF(state_error); return; } + // If this request is from the pending child policy, ignore it until + // it reports READY, at which point we swap it into place. + if (CalledByPendingChild()) { + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, + "[xdslb %p helper %p] pending child policy %p reports state=%s", + parent_.get(), this, parent_->pending_child_policy_.get(), + grpc_connectivity_state_name(state)); + } + if (state != GRPC_CHANNEL_READY) { + GRPC_ERROR_UNREF(state_error); + return; + } + MutexLock lock(&parent_->child_policy_mu_); + parent_->child_policy_ = std::move(parent_->pending_child_policy_); + } else if (!CalledByCurrentChild()) { + // This request is from an outdated child, so ignore it. + GRPC_ERROR_UNREF(state_error); + return; + } // TODO(juanlishen): When in fallback mode, pass the child picker // through without wrapping it. (Or maybe use a different helper for // the fallback policy?) @@ -406,6 +453,11 @@ void XdsLb::Helper::UpdateState(grpc_connectivity_state state, void XdsLb::Helper::RequestReresolution() { if (parent_->shutting_down_) return; + // If there is a pending child policy, ignore re-resolution requests + // from the current child policy (or any outdated child). + if (parent_->pending_child_policy_ != nullptr && !CalledByPendingChild()) { + return; + } if (grpc_lb_xds_trace.enabled()) { gpr_log(GPR_INFO, "[xdslb %p] Re-resolution requested from the internal RR policy " @@ -1064,6 +1116,7 @@ grpc_channel_args* BuildBalancerChannelArgs(const grpc_channel_args* args) { XdsLb::XdsLb(Args args) : LoadBalancingPolicy(std::move(args)) { gpr_mu_init(&lb_chand_mu_); + gpr_mu_init(&child_policy_mu_); // Record server name. const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); const char* server_uri = grpc_channel_arg_get_string(arg); @@ -1093,6 +1146,7 @@ XdsLb::~XdsLb() { if (serverlist_ != nullptr) { xds_grpclb_destroy_serverlist(serverlist_); } + gpr_mu_destroy(&child_policy_mu_); } void XdsLb::ShutdownLocked() { @@ -1100,7 +1154,11 @@ void XdsLb::ShutdownLocked() { if (fallback_timer_callback_pending_) { grpc_timer_cancel(&lb_fallback_timer_); } - child_policy_.reset(); + { + MutexLock lock(&child_policy_mu_); + child_policy_.reset(); + pending_child_policy_.reset(); + } // We destroy the LB channel here instead of in our destructor because // destroying the channel triggers a last callback to // OnBalancerChannelConnectivityChangedLocked(), and we need to be @@ -1126,12 +1184,27 @@ void XdsLb::ResetBackoffLocked() { if (child_policy_ != nullptr) { child_policy_->ResetBackoffLocked(); } + if (pending_child_policy_ != nullptr) { + pending_child_policy_->ResetBackoffLocked(); + } } void XdsLb::FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels, channelz::ChildRefsList* child_channels) { - // Delegate to the child_policy_ to fill the children subchannels. - child_policy_->FillChildRefsForChannelz(child_subchannels, child_channels); + { + // Delegate to the child_policy_ to fill the children subchannels. + // This must be done holding child_policy_mu_, since this method does not + // run in the combiner. + MutexLock lock(&child_policy_mu_); + if (child_policy_ != nullptr) { + child_policy_->FillChildRefsForChannelz(child_subchannels, + child_channels); + } + if (pending_child_policy_ != nullptr) { + pending_child_policy_->FillChildRefsForChannelz(child_subchannels, + child_channels); + } + } MutexLock lock(&lb_chand_mu_); if (lb_chand_ != nullptr) { grpc_core::channelz::ChannelNode* channel_node = @@ -1312,48 +1385,136 @@ grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() { GPR_ARRAY_SIZE(args_to_add)); } -void XdsLb::CreateChildPolicyLocked(const char* name, Args args) { - GPR_ASSERT(child_policy_ == nullptr); - child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( - name, std::move(args)); - if (GPR_UNLIKELY(child_policy_ == nullptr)) { - gpr_log(GPR_ERROR, "[xdslb %p] Failure creating a child policy", this); - return; +OrphanablePtr XdsLb::CreateChildPolicyLocked( + const char* name, const grpc_channel_args* args) { + Helper* helper = New(Ref()); + LoadBalancingPolicy::Args lb_policy_args; + lb_policy_args.combiner = combiner(); + lb_policy_args.args = args; + lb_policy_args.channel_control_helper = + UniquePtr(helper); + OrphanablePtr lb_policy = + LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( + name, std::move(lb_policy_args)); + if (GPR_UNLIKELY(lb_policy == nullptr)) { + gpr_log(GPR_ERROR, "[xdslb %p] Failure creating child policy %s", this, + name); + return nullptr; + } + helper->set_child(lb_policy.get()); + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, "[xdslb %p] Created new child policy %s (%p)", this, name, + lb_policy.get()); } // Add the xDS's interested_parties pollset_set to that of the newly created - // child policy. This will make the child policy progress upon activity on - // xDS LB, which in turn is tied to the application's call. - grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(), + // child policy. This will make the child policy progress upon activity on xDS + // LB, which in turn is tied to the application's call. + grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), interested_parties()); + return lb_policy; } void XdsLb::CreateOrUpdateChildPolicyLocked() { if (shutting_down_) return; grpc_channel_args* args = CreateChildPolicyArgsLocked(); GPR_ASSERT(args != nullptr); + // If the child policy name changes, we need to create a new child + // policy. When this happens, we leave child_policy_ as-is and store + // the new child policy in pending_child_policy_. Once the new child + // policy transitions into state READY, we swap it into child_policy_, + // replacing the original child policy. So pending_child_policy_ is + // non-null only between when we apply an update that changes the child + // policy name and when the new child reports state READY. + // + // Updates can arrive at any point during this transition. We always + // apply updates relative to the most recently created child policy, + // even if the most recent one is still in pending_child_policy_. This + // is true both when applying the updates to an existing child policy + // and when determining whether we need to create a new policy. + // + // As a result of this, there are several cases to consider here: + // + // 1. We have no existing child policy (i.e., we have started up but + // have not yet received a serverlist from the balancer or gone + // into fallback mode; in this case, both child_policy_ and + // pending_child_policy_ are null). In this case, we create a + // new child policy and store it in child_policy_. + // + // 2. We have an existing child policy and have no pending child policy + // from a previous update (i.e., either there has not been a + // previous update that changed the policy name, or we have already + // finished swapping in the new policy; in this case, child_policy_ + // is non-null but pending_child_policy_ is null). In this case: + // a. If child_policy_->name() equals child_policy_name, then we + // update the existing child policy. + // b. If child_policy_->name() does not equal child_policy_name, + // we create a new policy. The policy will be stored in + // pending_child_policy_ and will later be swapped into + // child_policy_ by the helper when the new child transitions + // into state READY. + // + // 3. We have an existing child policy and have a pending child policy + // from a previous update (i.e., a previous update set + // pending_child_policy_ as per case 2b above and that policy has + // not yet transitioned into state READY and been swapped into + // child_policy_; in this case, both child_policy_ and + // pending_child_policy_ are non-null). In this case: + // a. If pending_child_policy_->name() equals child_policy_name, + // then we update the existing pending child policy. + // b. If pending_child_policy->name() does not equal + // child_policy_name, then we create a new policy. The new + // policy is stored in pending_child_policy_ (replacing the one + // that was there before, which will be immediately shut down) + // and will later be swapped into child_policy_ by the helper + // when the new child transitions into state READY. // TODO(juanlishen): If the child policy is not configured via service config, // use whatever algorithm is specified by the balancer. - // TODO(juanlishen): Switch policy according to child_policy_config_->name(). - if (child_policy_ == nullptr) { - LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.combiner = combiner(); - lb_policy_args.args = args; - lb_policy_args.channel_control_helper = - UniquePtr(New(Ref())); - CreateChildPolicyLocked(child_policy_config_ == nullptr - ? "round_robin" - : child_policy_config_->name(), - std::move(lb_policy_args)); + const char* child_policy_name = child_policy_config_ == nullptr + ? "round_robin" + : child_policy_config_->name(); + const bool create_policy = + // case 1 + child_policy_ == nullptr || + // case 2b + (pending_child_policy_ == nullptr && + strcmp(child_policy_->name(), child_policy_name) != 0) || + // case 3b + (pending_child_policy_ != nullptr && + strcmp(pending_child_policy_->name(), child_policy_name) != 0); + LoadBalancingPolicy* policy_to_update = nullptr; + if (create_policy) { + // Cases 1, 2b, and 3b: create a new child policy. + // If child_policy_ is null, we set it (case 1), else we set + // pending_child_policy_ (cases 2b and 3b). if (grpc_lb_xds_trace.enabled()) { - gpr_log(GPR_INFO, "[xdslb %p] Created a new child policy %p", this, - child_policy_.get()); + gpr_log(GPR_INFO, "[xdslb %p] Creating new %schild policy %s", this, + child_policy_ == nullptr ? "" : "pending ", child_policy_name); + } + auto new_policy = CreateChildPolicyLocked(child_policy_name, args); + auto& lb_policy = + child_policy_ == nullptr ? child_policy_ : pending_child_policy_; + { + MutexLock lock(&child_policy_mu_); + lb_policy = std::move(new_policy); } + policy_to_update = lb_policy.get(); + } else { + // Cases 2a and 3a: update an existing policy. + // If we have a pending child policy, send the update to the pending + // policy (case 3a), else send it to the current policy (case 2a). + policy_to_update = pending_child_policy_ != nullptr + ? pending_child_policy_.get() + : child_policy_.get(); } + GPR_ASSERT(policy_to_update != nullptr); + // Update the policy. if (grpc_lb_xds_trace.enabled()) { - gpr_log(GPR_INFO, "[xdslb %p] Updating child policy %p", this, - child_policy_.get()); + gpr_log(GPR_INFO, "[xdslb %p] Updating %schild policy %p", this, + policy_to_update == pending_child_policy_.get() ? "pending " : "", + policy_to_update); } - child_policy_->UpdateLocked(*args, child_policy_config_); + policy_to_update->UpdateLocked(*args, child_policy_config_); + // Clean up. grpc_channel_args_destroy(args); } diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/src/core/ext/filters/client_channel/resolving_lb_policy.cc index a02a7e8acdb..0dd51e8bc4c 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.cc +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.cc @@ -47,6 +47,7 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" @@ -77,12 +78,14 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper Subchannel* CreateSubchannel(const grpc_channel_args& args) override { if (parent_->resolver_ == nullptr) return nullptr; // Shutting down. + if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr; return parent_->channel_control_helper()->CreateSubchannel(args); } grpc_channel* CreateChannel(const char* target, const grpc_channel_args& args) override { if (parent_->resolver_ == nullptr) return nullptr; // Shutting down. + if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr; return parent_->channel_control_helper()->CreateChannel(target, args); } @@ -93,11 +96,37 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper GRPC_ERROR_UNREF(state_error); return; } + // If this request is from the pending child policy, ignore it until + // it reports READY, at which point we swap it into place. + if (CalledByPendingChild()) { + if (parent_->tracer_->enabled()) { + gpr_log(GPR_INFO, + "resolving_lb=%p helper=%p: pending child policy %p reports " + "state=%s", + parent_.get(), this, child_, + grpc_connectivity_state_name(state)); + } + if (state != GRPC_CHANNEL_READY) { + GRPC_ERROR_UNREF(state_error); + return; + } + MutexLock lock(&parent_->lb_policy_mu_); + parent_->lb_policy_ = std::move(parent_->pending_lb_policy_); + } else if (!CalledByCurrentChild()) { + // This request is from an outdated child, so ignore it. + GRPC_ERROR_UNREF(state_error); + return; + } parent_->channel_control_helper()->UpdateState(state, state_error, std::move(picker)); } void RequestReresolution() override { + // If there is a pending child policy, ignore re-resolution requests + // from the current child policy (or any outdated child). + if (parent_->pending_lb_policy_ != nullptr && !CalledByPendingChild()) { + return; + } if (parent_->tracer_->enabled()) { gpr_log(GPR_INFO, "resolving_lb=%p: started name re-resolving", parent_.get()); @@ -107,8 +136,21 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper } } + void set_child(LoadBalancingPolicy* child) { child_ = child; } + private: + bool CalledByPendingChild() const { + GPR_ASSERT(child_ != nullptr); + return child_ == parent_->pending_lb_policy_.get(); + } + + bool CalledByCurrentChild() const { + GPR_ASSERT(child_ != nullptr); + return child_ == parent_->lb_policy_.get(); + }; + RefCountedPtr parent_; + LoadBalancingPolicy* child_ = nullptr; }; // @@ -146,6 +188,7 @@ ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy( process_resolver_result_(process_resolver_result), process_resolver_result_user_data_(process_resolver_result_user_data) { GPR_ASSERT(process_resolver_result != nullptr); + gpr_mu_init(&lb_policy_mu_); *error = Init(*args.args); } @@ -169,22 +212,38 @@ grpc_error* ResolvingLoadBalancingPolicy::Init(const grpc_channel_args& args) { ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() { GPR_ASSERT(resolver_ == nullptr); GPR_ASSERT(lb_policy_ == nullptr); + gpr_mu_destroy(&lb_policy_mu_); } void ResolvingLoadBalancingPolicy::ShutdownLocked() { if (resolver_ != nullptr) { resolver_.reset(); + MutexLock lock(&lb_policy_mu_); if (lb_policy_ != nullptr) { + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this, + lb_policy_.get()); + } grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), interested_parties()); lb_policy_.reset(); } + if (pending_lb_policy_ != nullptr) { + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "resolving_lb=%p: shutting down pending lb_policy=%p", + this, pending_lb_policy_.get()); + } + grpc_pollset_set_del_pollset_set(pending_lb_policy_->interested_parties(), + interested_parties()); + pending_lb_policy_.reset(); + } } } void ResolvingLoadBalancingPolicy::ExitIdleLocked() { if (lb_policy_ != nullptr) { lb_policy_->ExitIdleLocked(); + if (pending_lb_policy_ != nullptr) pending_lb_policy_->ExitIdleLocked(); } else { if (!started_resolving_ && resolver_ != nullptr) { StartResolvingLocked(); @@ -197,17 +256,24 @@ void ResolvingLoadBalancingPolicy::ResetBackoffLocked() { resolver_->ResetBackoffLocked(); resolver_->RequestReresolutionLocked(); } - if (lb_policy_ != nullptr) { - lb_policy_->ResetBackoffLocked(); - } + if (lb_policy_ != nullptr) lb_policy_->ResetBackoffLocked(); + if (pending_lb_policy_ != nullptr) pending_lb_policy_->ResetBackoffLocked(); } void ResolvingLoadBalancingPolicy::FillChildRefsForChannelz( channelz::ChildRefsList* child_subchannels, channelz::ChildRefsList* child_channels) { + // Delegate to the lb_policy_ to fill the children subchannels. + // This must be done holding lb_policy_mu_, since this method does not + // run in the combiner. + MutexLock lock(&lb_policy_mu_); if (lb_policy_ != nullptr) { lb_policy_->FillChildRefsForChannelz(child_subchannels, child_channels); } + if (pending_lb_policy_ != nullptr) { + pending_lb_policy_->FillChildRefsForChannelz(child_subchannels, + child_channels); + } } void ResolvingLoadBalancingPolicy::StartResolvingLocked() { @@ -229,14 +295,26 @@ void ResolvingLoadBalancingPolicy::OnResolverShutdownLocked(grpc_error* error) { if (tracer_->enabled()) { gpr_log(GPR_INFO, "resolving_lb=%p: shutting down", this); } - if (lb_policy_ != nullptr) { - if (tracer_->enabled()) { - gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this, - lb_policy_.get()); + { + MutexLock lock(&lb_policy_mu_); + if (lb_policy_ != nullptr) { + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this, + lb_policy_.get()); + } + grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), + interested_parties()); + lb_policy_.reset(); + } + if (pending_lb_policy_ != nullptr) { + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "resolving_lb=%p: shutting down pending lb_policy=%p", + this, pending_lb_policy_.get()); + } + grpc_pollset_set_del_pollset_set(pending_lb_policy_->interested_parties(), + interested_parties()); + pending_lb_policy_.reset(); } - grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), - interested_parties()); - lb_policy_.reset(); } if (resolver_ != nullptr) { // This should never happen; it can only be triggered by a resolver @@ -260,53 +338,142 @@ void ResolvingLoadBalancingPolicy::OnResolverShutdownLocked(grpc_error* error) { Unref(); } -// Creates a new LB policy, replacing any previous one. +void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked( + const char* lb_policy_name, RefCountedPtr lb_policy_config, + TraceStringVector* trace_strings) { + // If the child policy name changes, we need to create a new child + // policy. When this happens, we leave child_policy_ as-is and store + // the new child policy in pending_child_policy_. Once the new child + // policy transitions into state READY, we swap it into child_policy_, + // replacing the original child policy. So pending_child_policy_ is + // non-null only between when we apply an update that changes the child + // policy name and when the new child reports state READY. + // + // Updates can arrive at any point during this transition. We always + // apply updates relative to the most recently created child policy, + // even if the most recent one is still in pending_child_policy_. This + // is true both when applying the updates to an existing child policy + // and when determining whether we need to create a new policy. + // + // As a result of this, there are several cases to consider here: + // + // 1. We have no existing child policy (i.e., we have started up but + // have not yet received a serverlist from the balancer or gone + // into fallback mode; in this case, both child_policy_ and + // pending_child_policy_ are null). In this case, we create a + // new child policy and store it in child_policy_. + // + // 2. We have an existing child policy and have no pending child policy + // from a previous update (i.e., either there has not been a + // previous update that changed the policy name, or we have already + // finished swapping in the new policy; in this case, child_policy_ + // is non-null but pending_child_policy_ is null). In this case: + // a. If child_policy_->name() equals child_policy_name, then we + // update the existing child policy. + // b. If child_policy_->name() does not equal child_policy_name, + // we create a new policy. The policy will be stored in + // pending_child_policy_ and will later be swapped into + // child_policy_ by the helper when the new child transitions + // into state READY. + // + // 3. We have an existing child policy and have a pending child policy + // from a previous update (i.e., a previous update set + // pending_child_policy_ as per case 2b above and that policy has + // not yet transitioned into state READY and been swapped into + // child_policy_; in this case, both child_policy_ and + // pending_child_policy_ are non-null). In this case: + // a. If pending_child_policy_->name() equals child_policy_name, + // then we update the existing pending child policy. + // b. If pending_child_policy->name() does not equal + // child_policy_name, then we create a new policy. The new + // policy is stored in pending_child_policy_ (replacing the one + // that was there before, which will be immediately shut down) + // and will later be swapped into child_policy_ by the helper + // when the new child transitions into state READY. + const bool create_policy = + // case 1 + lb_policy_ == nullptr || + // case 2b + (pending_lb_policy_ == nullptr && + strcmp(lb_policy_->name(), lb_policy_name) != 0) || + // case 3b + (pending_lb_policy_ != nullptr && + strcmp(pending_lb_policy_->name(), lb_policy_name) != 0); + LoadBalancingPolicy* policy_to_update = nullptr; + if (create_policy) { + // Cases 1, 2b, and 3b: create a new child policy. + // If lb_policy_ is null, we set it (case 1), else we set + // pending_lb_policy_ (cases 2b and 3b). + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "resolving_lb=%p: Creating new %schild policy %s", this, + lb_policy_ == nullptr ? "" : "pending ", lb_policy_name); + } + auto new_policy = CreateLbPolicyLocked(lb_policy_name, trace_strings); + auto& lb_policy = lb_policy_ == nullptr ? lb_policy_ : pending_lb_policy_; + { + MutexLock lock(&lb_policy_mu_); + lb_policy = std::move(new_policy); + } + policy_to_update = lb_policy.get(); + } else { + // Cases 2a and 3a: update an existing policy. + // If we have a pending child policy, send the update to the pending + // policy (case 3a), else send it to the current policy (case 2a). + policy_to_update = pending_lb_policy_ != nullptr ? pending_lb_policy_.get() + : lb_policy_.get(); + } + GPR_ASSERT(policy_to_update != nullptr); + // Update the policy. + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "resolving_lb=%p: Updating %schild policy %p", this, + policy_to_update == pending_lb_policy_.get() ? "pending " : "", + policy_to_update); + } + policy_to_update->UpdateLocked(*resolver_result_, + std::move(lb_policy_config)); +} + +// Creates a new LB policy. // Updates trace_strings to indicate what was done. -void ResolvingLoadBalancingPolicy::CreateNewLbPolicyLocked( +OrphanablePtr +ResolvingLoadBalancingPolicy::CreateLbPolicyLocked( const char* lb_policy_name, TraceStringVector* trace_strings) { + ResolvingControlHelper* helper = New(Ref()); LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = combiner(); lb_policy_args.channel_control_helper = - UniquePtr(New(Ref())); + UniquePtr(helper); lb_policy_args.args = resolver_result_; - OrphanablePtr new_lb_policy = + OrphanablePtr lb_policy = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( lb_policy_name, std::move(lb_policy_args)); - if (GPR_UNLIKELY(new_lb_policy == nullptr)) { + if (GPR_UNLIKELY(lb_policy == nullptr)) { gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); if (channelz_node() != nullptr) { char* str; gpr_asprintf(&str, "Could not create LB policy \"%s\"", lb_policy_name); trace_strings->push_back(str); } - } else { - if (tracer_->enabled()) { - gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy \"%s\" (%p)", - this, lb_policy_name, new_lb_policy.get()); - } - if (channelz_node() != nullptr) { - char* str; - gpr_asprintf(&str, "Created new LB policy \"%s\"", lb_policy_name); - trace_strings->push_back(str); - } - // Propagate channelz node. - auto* channelz = channelz_node(); - if (channelz != nullptr) { - new_lb_policy->set_channelz_node(channelz->Ref()); - } - // Swap out the LB policy and update the fds in interested_parties_. - if (lb_policy_ != nullptr) { - if (tracer_->enabled()) { - gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this, - lb_policy_.get()); - } - grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), - interested_parties()); - } - lb_policy_ = std::move(new_lb_policy); - grpc_pollset_set_add_pollset_set(lb_policy_->interested_parties(), - interested_parties()); + return nullptr; + } + helper->set_child(lb_policy.get()); + if (tracer_->enabled()) { + gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy \"%s\" (%p)", + this, lb_policy_name, lb_policy.get()); + } + if (channelz_node() != nullptr) { + char* str; + gpr_asprintf(&str, "Created new LB policy \"%s\"", lb_policy_name); + trace_strings->push_back(str); + } + // Propagate channelz node. + auto* channelz = channelz_node(); + if (channelz != nullptr) { + lb_policy->set_channelz_node(channelz->Ref()); } + grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), + interested_parties()); + return lb_policy; } void ResolvingLoadBalancingPolicy::MaybeAddTraceMessagesForAddressChangesLocked( @@ -415,23 +582,8 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked( lb_policy_config = self->child_lb_config_; } GPR_ASSERT(lb_policy_name != nullptr); - // If we're not already using the right LB policy name, instantiate - // a new one. - if (self->lb_policy_ == nullptr || - strcmp(self->lb_policy_->name(), lb_policy_name) != 0) { - if (self->tracer_->enabled()) { - gpr_log(GPR_INFO, "resolving_lb=%p: creating new LB policy \"%s\"", - self, lb_policy_name); - } - self->CreateNewLbPolicyLocked(lb_policy_name, &trace_strings); - } - // Update the LB policy with the new addresses and config. - if (self->tracer_->enabled()) { - gpr_log(GPR_INFO, "resolving_lb=%p: updating LB policy \"%s\" (%p)", self, - lb_policy_name, self->lb_policy_.get()); - } - self->lb_policy_->UpdateLocked(*self->resolver_result_, - std::move(lb_policy_config)); + self->CreateOrUpdateLbPolicyLocked( + lb_policy_name, std::move(lb_policy_config), &trace_strings); // Add channel trace event. if (self->channelz_node() != nullptr) { if (service_config_changed) { diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.h b/src/core/ext/filters/client_channel/resolving_lb_policy.h index d068a41f96f..b8f406da1b6 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.h +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.h @@ -102,8 +102,11 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { void StartResolvingLocked(); void OnResolverShutdownLocked(grpc_error* error); - void CreateNewLbPolicyLocked(const char* lb_policy_name, - TraceStringVector* trace_strings); + void CreateOrUpdateLbPolicyLocked(const char* lb_policy_name, + RefCountedPtr, + TraceStringVector* trace_strings); + OrphanablePtr CreateLbPolicyLocked( + const char* lb_policy_name, TraceStringVector* trace_strings); void MaybeAddTraceMessagesForAddressChangesLocked( TraceStringVector* trace_strings); void ConcatenateAndAddChannelTraceLocked( @@ -125,8 +128,12 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { bool previous_resolution_contained_addresses_ = false; grpc_closure on_resolver_result_changed_; - // Child LB policy and associated state. + // Child LB policy. OrphanablePtr lb_policy_; + OrphanablePtr pending_lb_policy_; + // Lock held when modifying the value of child_policy_ or + // pending_child_policy_. + gpr_mu lb_policy_mu_; }; } // namespace grpc_core