Swap in new LB policy when it's ready

pull/18101/head
Juanli Shen 6 years ago
parent 2779322af7
commit d15605c0e5
  1. 6
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  2. 225
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  3. 270
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  4. 13
      src/core/ext/filters/client_channel/resolving_lb_policy.h

@ -307,7 +307,7 @@ class GrpcLb : public LoadBalancingPolicy {
// Methods for dealing with the child policy.
grpc_channel_args* CreateChildPolicyArgsLocked();
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const char* name, grpc_channel_args* args);
const char* name, const grpc_channel_args* args);
void CreateOrUpdateChildPolicyLocked();
// Who the client is trying to communicate with.
@ -685,7 +685,7 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
void GrpcLb::Helper::RequestReresolution() {
if (parent_->shutting_down_) return;
// If there is a pending child policy, ignore re-resolution requests
// from the current child policy (or any outdated pending child).
// from the current child policy (or any outdated child).
if (parent_->pending_child_policy_ != nullptr && !CalledByPendingChild()) {
return;
}
@ -1608,7 +1608,7 @@ grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked() {
}
OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
const char* name, grpc_channel_args* args) {
const char* name, const grpc_channel_args* args) {
Helper* helper = New<Helper>(Ref());
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();

@ -278,8 +278,14 @@ class XdsLb : public LoadBalancingPolicy {
UniquePtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
void set_child(LoadBalancingPolicy* child) { child_ = child; }
private:
bool CalledByPendingChild() const;
bool CalledByCurrentChild() const;
RefCountedPtr<XdsLb> parent_;
LoadBalancingPolicy* child_ = nullptr;
};
~XdsLb();
@ -306,7 +312,8 @@ class XdsLb : public LoadBalancingPolicy {
// Methods for dealing with the child policy.
void CreateOrUpdateChildPolicyLocked();
grpc_channel_args* CreateChildPolicyArgsLocked();
void CreateChildPolicyLocked(const char* name, Args args);
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const char* name, const grpc_channel_args* args);
// Who the client is trying to communicate with.
const char* server_name_ = nullptr;
@ -349,6 +356,10 @@ class XdsLb : public LoadBalancingPolicy {
// The policy to use for the backends.
RefCountedPtr<Config> child_policy_config_;
OrphanablePtr<LoadBalancingPolicy> child_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
// Lock held when modifying the value of child_policy_ or
// pending_child_policy_.
gpr_mu child_policy_mu_;
};
//
@ -372,14 +383,30 @@ XdsLb::Picker::PickResult XdsLb::Picker::Pick(PickState* pick,
// XdsLb::Helper
//
bool XdsLb::Helper::CalledByPendingChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->pending_child_policy_.get();
}
bool XdsLb::Helper::CalledByCurrentChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->child_policy_.get();
}
Subchannel* XdsLb::Helper::CreateSubchannel(const grpc_channel_args& args) {
if (parent_->shutting_down_) return nullptr;
if (parent_->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) {
return nullptr;
}
return parent_->channel_control_helper()->CreateSubchannel(args);
}
grpc_channel* XdsLb::Helper::CreateChannel(const char* target,
const grpc_channel_args& args) {
if (parent_->shutting_down_) return nullptr;
if (parent_->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) {
return nullptr;
}
return parent_->channel_control_helper()->CreateChannel(target, args);
}
@ -390,6 +417,26 @@ void XdsLb::Helper::UpdateState(grpc_connectivity_state state,
GRPC_ERROR_UNREF(state_error);
return;
}
// If this request is from the pending child policy, ignore it until
// it reports READY, at which point we swap it into place.
if (CalledByPendingChild()) {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p helper %p] pending child policy %p reports state=%s",
parent_.get(), this, parent_->pending_child_policy_.get(),
grpc_connectivity_state_name(state));
}
if (state != GRPC_CHANNEL_READY) {
GRPC_ERROR_UNREF(state_error);
return;
}
MutexLock lock(&parent_->child_policy_mu_);
parent_->child_policy_ = std::move(parent_->pending_child_policy_);
} else if (!CalledByCurrentChild()) {
// This request is from an outdated child, so ignore it.
GRPC_ERROR_UNREF(state_error);
return;
}
// TODO(juanlishen): When in fallback mode, pass the child picker
// through without wrapping it. (Or maybe use a different helper for
// the fallback policy?)
@ -406,6 +453,11 @@ void XdsLb::Helper::UpdateState(grpc_connectivity_state state,
void XdsLb::Helper::RequestReresolution() {
if (parent_->shutting_down_) return;
// If there is a pending child policy, ignore re-resolution requests
// from the current child policy (or any outdated child).
if (parent_->pending_child_policy_ != nullptr && !CalledByPendingChild()) {
return;
}
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p] Re-resolution requested from the internal RR policy "
@ -1064,6 +1116,7 @@ grpc_channel_args* BuildBalancerChannelArgs(const grpc_channel_args* args) {
XdsLb::XdsLb(Args args) : LoadBalancingPolicy(std::move(args)) {
gpr_mu_init(&lb_chand_mu_);
gpr_mu_init(&child_policy_mu_);
// Record server name.
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
const char* server_uri = grpc_channel_arg_get_string(arg);
@ -1093,6 +1146,7 @@ XdsLb::~XdsLb() {
if (serverlist_ != nullptr) {
xds_grpclb_destroy_serverlist(serverlist_);
}
gpr_mu_destroy(&child_policy_mu_);
}
void XdsLb::ShutdownLocked() {
@ -1100,7 +1154,11 @@ void XdsLb::ShutdownLocked() {
if (fallback_timer_callback_pending_) {
grpc_timer_cancel(&lb_fallback_timer_);
}
child_policy_.reset();
{
MutexLock lock(&child_policy_mu_);
child_policy_.reset();
pending_child_policy_.reset();
}
// We destroy the LB channel here instead of in our destructor because
// destroying the channel triggers a last callback to
// OnBalancerChannelConnectivityChangedLocked(), and we need to be
@ -1126,12 +1184,27 @@ void XdsLb::ResetBackoffLocked() {
if (child_policy_ != nullptr) {
child_policy_->ResetBackoffLocked();
}
if (pending_child_policy_ != nullptr) {
pending_child_policy_->ResetBackoffLocked();
}
}
void XdsLb::FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) {
// Delegate to the child_policy_ to fill the children subchannels.
child_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
{
// Delegate to the child_policy_ to fill the children subchannels.
// This must be done holding child_policy_mu_, since this method does not
// run in the combiner.
MutexLock lock(&child_policy_mu_);
if (child_policy_ != nullptr) {
child_policy_->FillChildRefsForChannelz(child_subchannels,
child_channels);
}
if (pending_child_policy_ != nullptr) {
pending_child_policy_->FillChildRefsForChannelz(child_subchannels,
child_channels);
}
}
MutexLock lock(&lb_chand_mu_);
if (lb_chand_ != nullptr) {
grpc_core::channelz::ChannelNode* channel_node =
@ -1312,48 +1385,136 @@ grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() {
GPR_ARRAY_SIZE(args_to_add));
}
void XdsLb::CreateChildPolicyLocked(const char* name, Args args) {
GPR_ASSERT(child_policy_ == nullptr);
child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
name, std::move(args));
if (GPR_UNLIKELY(child_policy_ == nullptr)) {
gpr_log(GPR_ERROR, "[xdslb %p] Failure creating a child policy", this);
return;
OrphanablePtr<LoadBalancingPolicy> XdsLb::CreateChildPolicyLocked(
const char* name, const grpc_channel_args* args) {
Helper* helper = New<Helper>(Ref());
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
UniquePtr<ChannelControlHelper>(helper);
OrphanablePtr<LoadBalancingPolicy> lb_policy =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
name, std::move(lb_policy_args));
if (GPR_UNLIKELY(lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "[xdslb %p] Failure creating child policy %s", this,
name);
return nullptr;
}
helper->set_child(lb_policy.get());
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] Created new child policy %s (%p)", this, name,
lb_policy.get());
}
// Add the xDS's interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
// xDS LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
// child policy. This will make the child policy progress upon activity on xDS
// LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties());
return lb_policy;
}
void XdsLb::CreateOrUpdateChildPolicyLocked() {
if (shutting_down_) return;
grpc_channel_args* args = CreateChildPolicyArgsLocked();
GPR_ASSERT(args != nullptr);
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
// policy transitions into state READY, we swap it into child_policy_,
// replacing the original child policy. So pending_child_policy_ is
// non-null only between when we apply an update that changes the child
// policy name and when the new child reports state READY.
//
// Updates can arrive at any point during this transition. We always
// apply updates relative to the most recently created child policy,
// even if the most recent one is still in pending_child_policy_. This
// is true both when applying the updates to an existing child policy
// and when determining whether we need to create a new policy.
//
// As a result of this, there are several cases to consider here:
//
// 1. We have no existing child policy (i.e., we have started up but
// have not yet received a serverlist from the balancer or gone
// into fallback mode; in this case, both child_policy_ and
// pending_child_policy_ are null). In this case, we create a
// new child policy and store it in child_policy_.
//
// 2. We have an existing child policy and have no pending child policy
// from a previous update (i.e., either there has not been a
// previous update that changed the policy name, or we have already
// finished swapping in the new policy; in this case, child_policy_
// is non-null but pending_child_policy_ is null). In this case:
// a. If child_policy_->name() equals child_policy_name, then we
// update the existing child policy.
// b. If child_policy_->name() does not equal child_policy_name,
// we create a new policy. The policy will be stored in
// pending_child_policy_ and will later be swapped into
// child_policy_ by the helper when the new child transitions
// into state READY.
//
// 3. We have an existing child policy and have a pending child policy
// from a previous update (i.e., a previous update set
// pending_child_policy_ as per case 2b above and that policy has
// not yet transitioned into state READY and been swapped into
// child_policy_; in this case, both child_policy_ and
// pending_child_policy_ are non-null). In this case:
// a. If pending_child_policy_->name() equals child_policy_name,
// then we update the existing pending child policy.
// b. If pending_child_policy->name() does not equal
// child_policy_name, then we create a new policy. The new
// policy is stored in pending_child_policy_ (replacing the one
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
// TODO(juanlishen): If the child policy is not configured via service config,
// use whatever algorithm is specified by the balancer.
// TODO(juanlishen): Switch policy according to child_policy_config_->name().
if (child_policy_ == nullptr) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
UniquePtr<ChannelControlHelper>(New<Helper>(Ref()));
CreateChildPolicyLocked(child_policy_config_ == nullptr
? "round_robin"
: child_policy_config_->name(),
std::move(lb_policy_args));
const char* child_policy_name = child_policy_config_ == nullptr
? "round_robin"
: child_policy_config_->name();
const bool create_policy =
// case 1
child_policy_ == nullptr ||
// case 2b
(pending_child_policy_ == nullptr &&
strcmp(child_policy_->name(), child_policy_name) != 0) ||
// case 3b
(pending_child_policy_ != nullptr &&
strcmp(pending_child_policy_->name(), child_policy_name) != 0);
LoadBalancingPolicy* policy_to_update = nullptr;
if (create_policy) {
// Cases 1, 2b, and 3b: create a new child policy.
// If child_policy_ is null, we set it (case 1), else we set
// pending_child_policy_ (cases 2b and 3b).
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] Created a new child policy %p", this,
child_policy_.get());
gpr_log(GPR_INFO, "[xdslb %p] Creating new %schild policy %s", this,
child_policy_ == nullptr ? "" : "pending ", child_policy_name);
}
auto new_policy = CreateChildPolicyLocked(child_policy_name, args);
auto& lb_policy =
child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
{
MutexLock lock(&child_policy_mu_);
lb_policy = std::move(new_policy);
}
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
// If we have a pending child policy, send the update to the pending
// policy (case 3a), else send it to the current policy (case 2a).
policy_to_update = pending_child_policy_ != nullptr
? pending_child_policy_.get()
: child_policy_.get();
}
GPR_ASSERT(policy_to_update != nullptr);
// Update the policy.
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] Updating child policy %p", this,
child_policy_.get());
gpr_log(GPR_INFO, "[xdslb %p] Updating %schild policy %p", this,
policy_to_update == pending_child_policy_.get() ? "pending " : "",
policy_to_update);
}
child_policy_->UpdateLocked(*args, child_policy_config_);
policy_to_update->UpdateLocked(*args, child_policy_config_);
// Clean up.
grpc_channel_args_destroy(args);
}

@ -47,6 +47,7 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
@ -77,12 +78,14 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
return parent_->channel_control_helper()->CreateSubchannel(args);
}
grpc_channel* CreateChannel(const char* target,
const grpc_channel_args& args) override {
if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
return parent_->channel_control_helper()->CreateChannel(target, args);
}
@ -93,11 +96,37 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
GRPC_ERROR_UNREF(state_error);
return;
}
// If this request is from the pending child policy, ignore it until
// it reports READY, at which point we swap it into place.
if (CalledByPendingChild()) {
if (parent_->tracer_->enabled()) {
gpr_log(GPR_INFO,
"resolving_lb=%p helper=%p: pending child policy %p reports "
"state=%s",
parent_.get(), this, child_,
grpc_connectivity_state_name(state));
}
if (state != GRPC_CHANNEL_READY) {
GRPC_ERROR_UNREF(state_error);
return;
}
MutexLock lock(&parent_->lb_policy_mu_);
parent_->lb_policy_ = std::move(parent_->pending_lb_policy_);
} else if (!CalledByCurrentChild()) {
// This request is from an outdated child, so ignore it.
GRPC_ERROR_UNREF(state_error);
return;
}
parent_->channel_control_helper()->UpdateState(state, state_error,
std::move(picker));
}
void RequestReresolution() override {
// If there is a pending child policy, ignore re-resolution requests
// from the current child policy (or any outdated child).
if (parent_->pending_lb_policy_ != nullptr && !CalledByPendingChild()) {
return;
}
if (parent_->tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: started name re-resolving",
parent_.get());
@ -107,8 +136,21 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
}
}
void set_child(LoadBalancingPolicy* child) { child_ = child; }
private:
bool CalledByPendingChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->pending_lb_policy_.get();
}
bool CalledByCurrentChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->lb_policy_.get();
};
RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
LoadBalancingPolicy* child_ = nullptr;
};
//
@ -146,6 +188,7 @@ ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
process_resolver_result_(process_resolver_result),
process_resolver_result_user_data_(process_resolver_result_user_data) {
GPR_ASSERT(process_resolver_result != nullptr);
gpr_mu_init(&lb_policy_mu_);
*error = Init(*args.args);
}
@ -169,22 +212,38 @@ grpc_error* ResolvingLoadBalancingPolicy::Init(const grpc_channel_args& args) {
ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() {
GPR_ASSERT(resolver_ == nullptr);
GPR_ASSERT(lb_policy_ == nullptr);
gpr_mu_destroy(&lb_policy_mu_);
}
void ResolvingLoadBalancingPolicy::ShutdownLocked() {
if (resolver_ != nullptr) {
resolver_.reset();
MutexLock lock(&lb_policy_mu_);
if (lb_policy_ != nullptr) {
if (tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this,
lb_policy_.get());
}
grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
interested_parties());
lb_policy_.reset();
}
if (pending_lb_policy_ != nullptr) {
if (tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: shutting down pending lb_policy=%p",
this, pending_lb_policy_.get());
}
grpc_pollset_set_del_pollset_set(pending_lb_policy_->interested_parties(),
interested_parties());
pending_lb_policy_.reset();
}
}
}
void ResolvingLoadBalancingPolicy::ExitIdleLocked() {
if (lb_policy_ != nullptr) {
lb_policy_->ExitIdleLocked();
if (pending_lb_policy_ != nullptr) pending_lb_policy_->ExitIdleLocked();
} else {
if (!started_resolving_ && resolver_ != nullptr) {
StartResolvingLocked();
@ -197,17 +256,24 @@ void ResolvingLoadBalancingPolicy::ResetBackoffLocked() {
resolver_->ResetBackoffLocked();
resolver_->RequestReresolutionLocked();
}
if (lb_policy_ != nullptr) {
lb_policy_->ResetBackoffLocked();
}
if (lb_policy_ != nullptr) lb_policy_->ResetBackoffLocked();
if (pending_lb_policy_ != nullptr) pending_lb_policy_->ResetBackoffLocked();
}
void ResolvingLoadBalancingPolicy::FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) {
// Delegate to the lb_policy_ to fill the children subchannels.
// This must be done holding lb_policy_mu_, since this method does not
// run in the combiner.
MutexLock lock(&lb_policy_mu_);
if (lb_policy_ != nullptr) {
lb_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
}
if (pending_lb_policy_ != nullptr) {
pending_lb_policy_->FillChildRefsForChannelz(child_subchannels,
child_channels);
}
}
void ResolvingLoadBalancingPolicy::StartResolvingLocked() {
@ -229,14 +295,26 @@ void ResolvingLoadBalancingPolicy::OnResolverShutdownLocked(grpc_error* error) {
if (tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: shutting down", this);
}
if (lb_policy_ != nullptr) {
if (tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this,
lb_policy_.get());
{
MutexLock lock(&lb_policy_mu_);
if (lb_policy_ != nullptr) {
if (tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this,
lb_policy_.get());
}
grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
interested_parties());
lb_policy_.reset();
}
if (pending_lb_policy_ != nullptr) {
if (tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: shutting down pending lb_policy=%p",
this, pending_lb_policy_.get());
}
grpc_pollset_set_del_pollset_set(pending_lb_policy_->interested_parties(),
interested_parties());
pending_lb_policy_.reset();
}
grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
interested_parties());
lb_policy_.reset();
}
if (resolver_ != nullptr) {
// This should never happen; it can only be triggered by a resolver
@ -260,53 +338,142 @@ void ResolvingLoadBalancingPolicy::OnResolverShutdownLocked(grpc_error* error) {
Unref();
}
// Creates a new LB policy, replacing any previous one.
void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
const char* lb_policy_name, RefCountedPtr<Config> lb_policy_config,
TraceStringVector* trace_strings) {
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
// policy transitions into state READY, we swap it into child_policy_,
// replacing the original child policy. So pending_child_policy_ is
// non-null only between when we apply an update that changes the child
// policy name and when the new child reports state READY.
//
// Updates can arrive at any point during this transition. We always
// apply updates relative to the most recently created child policy,
// even if the most recent one is still in pending_child_policy_. This
// is true both when applying the updates to an existing child policy
// and when determining whether we need to create a new policy.
//
// As a result of this, there are several cases to consider here:
//
// 1. We have no existing child policy (i.e., we have started up but
// have not yet received a serverlist from the balancer or gone
// into fallback mode; in this case, both child_policy_ and
// pending_child_policy_ are null). In this case, we create a
// new child policy and store it in child_policy_.
//
// 2. We have an existing child policy and have no pending child policy
// from a previous update (i.e., either there has not been a
// previous update that changed the policy name, or we have already
// finished swapping in the new policy; in this case, child_policy_
// is non-null but pending_child_policy_ is null). In this case:
// a. If child_policy_->name() equals child_policy_name, then we
// update the existing child policy.
// b. If child_policy_->name() does not equal child_policy_name,
// we create a new policy. The policy will be stored in
// pending_child_policy_ and will later be swapped into
// child_policy_ by the helper when the new child transitions
// into state READY.
//
// 3. We have an existing child policy and have a pending child policy
// from a previous update (i.e., a previous update set
// pending_child_policy_ as per case 2b above and that policy has
// not yet transitioned into state READY and been swapped into
// child_policy_; in this case, both child_policy_ and
// pending_child_policy_ are non-null). In this case:
// a. If pending_child_policy_->name() equals child_policy_name,
// then we update the existing pending child policy.
// b. If pending_child_policy->name() does not equal
// child_policy_name, then we create a new policy. The new
// policy is stored in pending_child_policy_ (replacing the one
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
const bool create_policy =
// case 1
lb_policy_ == nullptr ||
// case 2b
(pending_lb_policy_ == nullptr &&
strcmp(lb_policy_->name(), lb_policy_name) != 0) ||
// case 3b
(pending_lb_policy_ != nullptr &&
strcmp(pending_lb_policy_->name(), lb_policy_name) != 0);
LoadBalancingPolicy* policy_to_update = nullptr;
if (create_policy) {
// Cases 1, 2b, and 3b: create a new child policy.
// If lb_policy_ is null, we set it (case 1), else we set
// pending_lb_policy_ (cases 2b and 3b).
if (tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: Creating new %schild policy %s", this,
lb_policy_ == nullptr ? "" : "pending ", lb_policy_name);
}
auto new_policy = CreateLbPolicyLocked(lb_policy_name, trace_strings);
auto& lb_policy = lb_policy_ == nullptr ? lb_policy_ : pending_lb_policy_;
{
MutexLock lock(&lb_policy_mu_);
lb_policy = std::move(new_policy);
}
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
// If we have a pending child policy, send the update to the pending
// policy (case 3a), else send it to the current policy (case 2a).
policy_to_update = pending_lb_policy_ != nullptr ? pending_lb_policy_.get()
: lb_policy_.get();
}
GPR_ASSERT(policy_to_update != nullptr);
// Update the policy.
if (tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: Updating %schild policy %p", this,
policy_to_update == pending_lb_policy_.get() ? "pending " : "",
policy_to_update);
}
policy_to_update->UpdateLocked(*resolver_result_,
std::move(lb_policy_config));
}
// Creates a new LB policy.
// Updates trace_strings to indicate what was done.
void ResolvingLoadBalancingPolicy::CreateNewLbPolicyLocked(
OrphanablePtr<LoadBalancingPolicy>
ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
const char* lb_policy_name, TraceStringVector* trace_strings) {
ResolvingControlHelper* helper = New<ResolvingControlHelper>(Ref());
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.channel_control_helper =
UniquePtr<ChannelControlHelper>(New<ResolvingControlHelper>(Ref()));
UniquePtr<ChannelControlHelper>(helper);
lb_policy_args.args = resolver_result_;
OrphanablePtr<LoadBalancingPolicy> new_lb_policy =
OrphanablePtr<LoadBalancingPolicy> lb_policy =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
lb_policy_name, std::move(lb_policy_args));
if (GPR_UNLIKELY(new_lb_policy == nullptr)) {
if (GPR_UNLIKELY(lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
if (channelz_node() != nullptr) {
char* str;
gpr_asprintf(&str, "Could not create LB policy \"%s\"", lb_policy_name);
trace_strings->push_back(str);
}
} else {
if (tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy \"%s\" (%p)",
this, lb_policy_name, new_lb_policy.get());
}
if (channelz_node() != nullptr) {
char* str;
gpr_asprintf(&str, "Created new LB policy \"%s\"", lb_policy_name);
trace_strings->push_back(str);
}
// Propagate channelz node.
auto* channelz = channelz_node();
if (channelz != nullptr) {
new_lb_policy->set_channelz_node(channelz->Ref());
}
// Swap out the LB policy and update the fds in interested_parties_.
if (lb_policy_ != nullptr) {
if (tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this,
lb_policy_.get());
}
grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
interested_parties());
}
lb_policy_ = std::move(new_lb_policy);
grpc_pollset_set_add_pollset_set(lb_policy_->interested_parties(),
interested_parties());
return nullptr;
}
helper->set_child(lb_policy.get());
if (tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy \"%s\" (%p)",
this, lb_policy_name, lb_policy.get());
}
if (channelz_node() != nullptr) {
char* str;
gpr_asprintf(&str, "Created new LB policy \"%s\"", lb_policy_name);
trace_strings->push_back(str);
}
// Propagate channelz node.
auto* channelz = channelz_node();
if (channelz != nullptr) {
lb_policy->set_channelz_node(channelz->Ref());
}
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties());
return lb_policy;
}
void ResolvingLoadBalancingPolicy::MaybeAddTraceMessagesForAddressChangesLocked(
@ -415,23 +582,8 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
lb_policy_config = self->child_lb_config_;
}
GPR_ASSERT(lb_policy_name != nullptr);
// If we're not already using the right LB policy name, instantiate
// a new one.
if (self->lb_policy_ == nullptr ||
strcmp(self->lb_policy_->name(), lb_policy_name) != 0) {
if (self->tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: creating new LB policy \"%s\"",
self, lb_policy_name);
}
self->CreateNewLbPolicyLocked(lb_policy_name, &trace_strings);
}
// Update the LB policy with the new addresses and config.
if (self->tracer_->enabled()) {
gpr_log(GPR_INFO, "resolving_lb=%p: updating LB policy \"%s\" (%p)", self,
lb_policy_name, self->lb_policy_.get());
}
self->lb_policy_->UpdateLocked(*self->resolver_result_,
std::move(lb_policy_config));
self->CreateOrUpdateLbPolicyLocked(
lb_policy_name, std::move(lb_policy_config), &trace_strings);
// Add channel trace event.
if (self->channelz_node() != nullptr) {
if (service_config_changed) {

@ -102,8 +102,11 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
void StartResolvingLocked();
void OnResolverShutdownLocked(grpc_error* error);
void CreateNewLbPolicyLocked(const char* lb_policy_name,
TraceStringVector* trace_strings);
void CreateOrUpdateLbPolicyLocked(const char* lb_policy_name,
RefCountedPtr<Config>,
TraceStringVector* trace_strings);
OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
const char* lb_policy_name, TraceStringVector* trace_strings);
void MaybeAddTraceMessagesForAddressChangesLocked(
TraceStringVector* trace_strings);
void ConcatenateAndAddChannelTraceLocked(
@ -125,8 +128,12 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
bool previous_resolution_contained_addresses_ = false;
grpc_closure on_resolver_result_changed_;
// Child LB policy and associated state.
// Child LB policy.
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_lb_policy_;
// Lock held when modifying the value of child_policy_ or
// pending_child_policy_.
gpr_mu lb_policy_mu_;
};
} // namespace grpc_core

Loading…
Cancel
Save