Merge pull request #22101 from markdroth/lb_graceful_switcher

Refactor code for gracefully switching child LB policies.
pull/22187/head
Mark D. Roth 5 years ago committed by GitHub
commit 5e98ad4077
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 2
      BUILD.gn
  3. 2
      CMakeLists.txt
  4. 2
      Makefile
  5. 4
      build_autogenerated.yaml
  6. 2
      config.m4
  7. 1
      config.w32
  8. 2
      gRPC-C++.podspec
  9. 3
      gRPC-Core.podspec
  10. 2
      grpc.gemspec
  11. 2
      grpc.gyp
  12. 2
      package.xml
  13. 280
      src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
  14. 66
      src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h
  15. 215
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  16. 430
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  17. 192
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  18. 6
      src/core/ext/filters/client_channel/resolving_lb_policy.h
  19. 1
      src/python/grpcio/grpc_core_dependencies.py
  20. 2
      tools/doxygen/Doxyfile.c++.internal
  21. 2
      tools/doxygen/Doxyfile.core.internal

@ -1053,6 +1053,7 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/http_connect_handshaker.cc",
"src/core/ext/filters/client_channel/http_proxy.cc",
"src/core/ext/filters/client_channel/lb_policy.cc",
"src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc",
"src/core/ext/filters/client_channel/lb_policy_registry.cc",
"src/core/ext/filters/client_channel/local_subchannel_pool.cc",
"src/core/ext/filters/client_channel/parse_address.cc",
@ -1079,6 +1080,7 @@ grpc_cc_library(
"src/core/ext/filters/client_channel/http_connect_handshaker.h",
"src/core/ext/filters/client_channel/http_proxy.h",
"src/core/ext/filters/client_channel/lb_policy.h",
"src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h",
"src/core/ext/filters/client_channel/lb_policy_factory.h",
"src/core/ext/filters/client_channel/lb_policy_registry.h",
"src/core/ext/filters/client_channel/local_subchannel_pool.h",

@ -223,6 +223,8 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/http_proxy.h",
"src/core/ext/filters/client_channel/lb_policy.cc",
"src/core/ext/filters/client_channel/lb_policy.h",
"src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc",
"src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc",

@ -1310,6 +1310,7 @@ add_library(grpc
src/core/ext/filters/client_channel/http_connect_handshaker.cc
src/core/ext/filters/client_channel/http_proxy.cc
src/core/ext/filters/client_channel/lb_policy.cc
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc
@ -1961,6 +1962,7 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/http_connect_handshaker.cc
src/core/ext/filters/client_channel/http_proxy.cc
src/core/ext/filters/client_channel/lb_policy.cc
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc

@ -3651,6 +3651,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
src/core/ext/filters/client_channel/http_proxy.cc \
src/core/ext/filters/client_channel/lb_policy.cc \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc \
@ -4277,6 +4278,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
src/core/ext/filters/client_channel/http_proxy.cc \
src/core/ext/filters/client_channel/lb_policy.cc \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc \

@ -382,6 +382,7 @@ libs:
- src/core/ext/filters/client_channel/http_connect_handshaker.h
- src/core/ext/filters/client_channel/http_proxy.h
- src/core/ext/filters/client_channel/lb_policy.h
- src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
@ -738,6 +739,7 @@ libs:
- src/core/ext/filters/client_channel/http_connect_handshaker.cc
- src/core/ext/filters/client_channel/http_proxy.cc
- src/core/ext/filters/client_channel/lb_policy.cc
- src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc
@ -1273,6 +1275,7 @@ libs:
- src/core/ext/filters/client_channel/http_connect_handshaker.h
- src/core/ext/filters/client_channel/http_proxy.h
- src/core/ext/filters/client_channel/lb_policy.h
- src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
@ -1564,6 +1567,7 @@ libs:
- src/core/ext/filters/client_channel/http_connect_handshaker.cc
- src/core/ext/filters/client_channel/http_proxy.cc
- src/core/ext/filters/client_channel/lb_policy.cc
- src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc

@ -50,6 +50,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/http_connect_handshaker.cc \
src/core/ext/filters/client_channel/http_proxy.cc \
src/core/ext/filters/client_channel/lb_policy.cc \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc \
@ -815,6 +816,7 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/census)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/health)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/grpclb)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/pick_first)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/round_robin)

@ -19,6 +19,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\http_connect_handshaker.cc " +
"src\\core\\ext\\filters\\client_channel\\http_proxy.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\child_policy_handler.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\grpclb\\client_load_reporting_filter.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\grpclb\\grpclb.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\grpclb\\grpclb_channel_secure.cc " +

@ -233,6 +233,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/http_connect_handshaker.h',
'src/core/ext/filters/client_channel/http_proxy.h',
'src/core/ext/filters/client_channel/lb_policy.h',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h',
@ -681,6 +682,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/http_connect_handshaker.h',
'src/core/ext/filters/client_channel/http_proxy.h',
'src/core/ext/filters/client_channel/lb_policy.h',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h',

@ -206,6 +206,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/http_proxy.h',
'src/core/ext/filters/client_channel/lb_policy.cc',
'src/core/ext/filters/client_channel/lb_policy.h',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc',
@ -1027,6 +1029,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/http_connect_handshaker.h',
'src/core/ext/filters/client_channel/http_proxy.h',
'src/core/ext/filters/client_channel/lb_policy.h',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h',

@ -128,6 +128,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/http_proxy.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc )

@ -442,6 +442,7 @@
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
'src/core/ext/filters/client_channel/http_proxy.cc',
'src/core/ext/filters/client_channel/lb_policy.cc',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc',
@ -929,6 +930,7 @@
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
'src/core/ext/filters/client_channel/http_proxy.cc',
'src/core/ext/filters/client_channel/lb_policy.cc',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc',

@ -108,6 +108,8 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/http_proxy.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc" role="src" />

@ -0,0 +1,280 @@
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "absl/strings/str_cat.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
namespace grpc_core {
//
// ChildPolicyHandler::Helper
//
class ChildPolicyHandler::Helper
: public LoadBalancingPolicy::ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<ChildPolicyHandler> parent)
: parent_(std::move(parent)) {}
~Helper() { parent_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override {
if (parent_->shutting_down_) return nullptr;
if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
return parent_->channel_control_helper()->CreateSubchannel(args);
}
void UpdateState(grpc_connectivity_state state,
std::unique_ptr<SubchannelPicker> picker) override {
if (parent_->shutting_down_) return;
// If this request is from the pending child policy, ignore it until
// it reports READY, at which point we swap it into place.
if (CalledByPendingChild()) {
if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
gpr_log(GPR_INFO,
"[child_policy_handler %p] helper %p: pending child policy %p "
"reports state=%s",
parent_.get(), this, child_, ConnectivityStateName(state));
}
if (state != GRPC_CHANNEL_READY) return;
grpc_pollset_set_del_pollset_set(
parent_->child_policy_->interested_parties(),
parent_->interested_parties());
parent_->child_policy_ = std::move(parent_->pending_child_policy_);
} else if (!CalledByCurrentChild()) {
// This request is from an outdated child, so ignore it.
return;
}
parent_->channel_control_helper()->UpdateState(state, std::move(picker));
}
void RequestReresolution() override {
if (parent_->shutting_down_) return;
// Only forward re-resolution requests from the most recent child,
// since that's the one that will be receiving any update we receive
// from the resolver.
const LoadBalancingPolicy* latest_child_policy =
parent_->pending_child_policy_ != nullptr
? parent_->pending_child_policy_.get()
: parent_->child_policy_.get();
if (child_ != latest_child_policy) return;
if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
gpr_log(GPR_INFO, "[child_policy_handler %p] started name re-resolving",
parent_.get());
}
parent_->channel_control_helper()->RequestReresolution();
}
void AddTraceEvent(TraceSeverity severity, StringView message) override {
if (parent_->shutting_down_) return;
if (!CalledByPendingChild() && !CalledByCurrentChild()) return;
parent_->channel_control_helper()->AddTraceEvent(severity, message);
}
void set_child(LoadBalancingPolicy* child) { child_ = child; }
private:
bool CalledByPendingChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->pending_child_policy_.get();
}
bool CalledByCurrentChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->child_policy_.get();
};
RefCountedPtr<ChildPolicyHandler> parent_;
LoadBalancingPolicy* child_ = nullptr;
};
//
// ChildPolicyHandler
//
void ChildPolicyHandler::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "[child_policy_handler %p] shutting down", this);
}
shutting_down_ = true;
if (child_policy_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "[child_policy_handler %p] shutting down lb_policy %p",
this, child_policy_.get());
}
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
interested_parties());
child_policy_.reset();
}
if (pending_child_policy_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO,
"[child_policy_handler %p] shutting down pending lb_policy %p",
this, pending_child_policy_.get());
}
grpc_pollset_set_del_pollset_set(
pending_child_policy_->interested_parties(), interested_parties());
pending_child_policy_.reset();
}
}
void ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
// The name of the policy that this update wants us to use.
const char* child_policy_name = args.config->name();
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
// policy transitions into state READY, we swap it into child_policy_,
// replacing the original child policy. So pending_child_policy_ is
// non-null only between when we apply an update that changes the child
// policy name and when the new child reports state READY.
//
// Updates can arrive at any point during this transition. We always
// apply updates relative to the most recently created child policy,
// even if the most recent one is still in pending_child_policy_. This
// is true both when applying the updates to an existing child policy
// and when determining whether we need to create a new policy.
//
// As a result of this, there are several cases to consider here:
//
// 1. We have no existing child policy (i.e., this is the first update
// we receive after being created; in this case, both child_policy_
// and pending_child_policy_ are null). In this case, we create a
// new child policy and store it in child_policy_.
//
// 2. We have an existing child policy and have no pending child policy
// from a previous update (i.e., either there has not been a
// previous update that changed the policy name, or we have already
// finished swapping in the new policy; in this case, child_policy_
// is non-null but pending_child_policy_ is null). In this case:
// a. If child_policy_->name() equals child_policy_name, then we
// update the existing child policy.
// b. If child_policy_->name() does not equal child_policy_name,
// we create a new policy. The policy will be stored in
// pending_child_policy_ and will later be swapped into
// child_policy_ by the helper when the new child transitions
// into state READY.
//
// 3. We have an existing child policy and have a pending child policy
// from a previous update (i.e., a previous update set
// pending_child_policy_ as per case 2b above and that policy has
// not yet transitioned into state READY and been swapped into
// child_policy_; in this case, both child_policy_ and
// pending_child_policy_ are non-null). In this case:
// a. If pending_child_policy_->name() equals child_policy_name,
// then we update the existing pending child policy.
// b. If pending_child_policy->name() does not equal
// child_policy_name, then we create a new policy. The new
// policy is stored in pending_child_policy_ (replacing the one
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
const bool create_policy =
// case 1
child_policy_ == nullptr ||
// case 2b
(pending_child_policy_ == nullptr &&
strcmp(child_policy_->name(), child_policy_name) != 0) ||
// case 3b
(pending_child_policy_ != nullptr &&
strcmp(pending_child_policy_->name(), child_policy_name) != 0);
LoadBalancingPolicy* policy_to_update = nullptr;
if (create_policy) {
// Cases 1, 2b, and 3b: create a new child policy.
// If child_policy_ is null, we set it (case 1), else we set
// pending_child_policy_ (cases 2b and 3b).
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO,
"[child_policy_handler %p] creating new %schild policy %s", this,
child_policy_ == nullptr ? "" : "pending ", child_policy_name);
}
auto& lb_policy =
child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
lb_policy = CreateChildPolicy(child_policy_name, *args.args);
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
// If we have a pending child policy, send the update to the pending
// policy (case 3a), else send it to the current policy (case 2a).
policy_to_update = pending_child_policy_ != nullptr
? pending_child_policy_.get()
: child_policy_.get();
}
GPR_ASSERT(policy_to_update != nullptr);
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "[child_policy_handler %p] updating %schild policy %p",
this,
policy_to_update == pending_child_policy_.get() ? "pending " : "",
policy_to_update);
}
policy_to_update->UpdateLocked(std::move(args));
}
void ChildPolicyHandler::ExitIdleLocked() {
if (child_policy_ != nullptr) {
child_policy_->ExitIdleLocked();
if (pending_child_policy_ != nullptr) {
pending_child_policy_->ExitIdleLocked();
}
}
}
void ChildPolicyHandler::ResetBackoffLocked() {
if (child_policy_ != nullptr) {
child_policy_->ResetBackoffLocked();
if (pending_child_policy_ != nullptr) {
pending_child_policy_->ResetBackoffLocked();
}
}
}
OrphanablePtr<LoadBalancingPolicy> ChildPolicyHandler::CreateChildPolicy(
const char* child_policy_name, const grpc_channel_args& args) {
Helper* helper = new Helper(Ref(DEBUG_LOCATION, "Helper"));
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.channel_control_helper =
std::unique_ptr<ChannelControlHelper>(helper);
lb_policy_args.args = &args;
OrphanablePtr<LoadBalancingPolicy> lb_policy =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
child_policy_name, std::move(lb_policy_args));
if (GPR_UNLIKELY(lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", child_policy_name);
return nullptr;
}
helper->set_child(lb_policy.get());
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO,
"[child_policy_handler %p] created new LB policy \"%s\" (%p)", this,
child_policy_name, lb_policy.get());
}
channel_control_helper()->AddTraceEvent(
ChannelControlHelper::TRACE_INFO,
absl::StrCat("Created new LB policy \"", child_policy_name, "\""));
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties());
return lb_policy;
}
} // namespace grpc_core

@ -0,0 +1,66 @@
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_CHILD_POLICY_HANDLER_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_CHILD_POLICY_HANDLER_H
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/orphanable.h"
namespace grpc_core {
// A class that makes it easy to gracefully switch child policies.
//
// Callers should instantiate this instead of using
// LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(). Once
// instantiated, this object will automatically take care of
// constructing the child policy as needed upon receiving an update.
class ChildPolicyHandler : public LoadBalancingPolicy {
public:
ChildPolicyHandler(Args args, TraceFlag* tracer)
: LoadBalancingPolicy(std::move(args)), tracer_(tracer) {}
virtual const char* name() const override { return "child_policy_handler"; }
void UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
private:
class Helper;
void ShutdownLocked() override;
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicy(
const char* child_policy_name, const grpc_channel_args& args);
// Passed in from caller at construction time.
TraceFlag* tracer_;
bool shutting_down_ = false;
// Child LB policy.
OrphanablePtr<LoadBalancingPolicy> child_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_CHILD_POLICY_HANDLER_H \
*/

@ -71,6 +71,7 @@
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
@ -296,14 +297,8 @@ class GrpcLb : public LoadBalancingPolicy {
void RequestReresolution() override;
void AddTraceEvent(TraceSeverity severity, StringView message) override;
void set_child(LoadBalancingPolicy* child) { child_ = child; }
private:
bool CalledByPendingChild() const;
bool CalledByCurrentChild() const;
RefCountedPtr<GrpcLb> parent_;
LoadBalancingPolicy* child_ = nullptr;
};
~GrpcLb();
@ -334,7 +329,7 @@ class GrpcLb : public LoadBalancingPolicy {
grpc_channel_args* CreateChildPolicyArgsLocked(
bool is_backend_from_grpclb_load_balancer);
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const char* name, const grpc_channel_args* args);
const grpc_channel_args* args);
void CreateOrUpdateChildPolicyLocked();
// Who the client is trying to communicate with.
@ -385,9 +380,6 @@ class GrpcLb : public LoadBalancingPolicy {
// The child policy to use for the backends.
OrphanablePtr<LoadBalancingPolicy> child_policy_;
// When switching child policies, the new policy will be stored here
// until it reports READY, at which point it will be moved to child_policy_.
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
// The child policy config.
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config_;
// Child policy in state READY.
@ -629,46 +621,15 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
// GrpcLb::Helper
//
bool GrpcLb::Helper::CalledByPendingChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->pending_child_policy_.get();
}
bool GrpcLb::Helper::CalledByCurrentChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->child_policy_.get();
}
RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
const grpc_channel_args& args) {
if (parent_->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) {
return nullptr;
}
if (parent_->shutting_down_) return nullptr;
return parent_->channel_control_helper()->CreateSubchannel(args);
}
void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
std::unique_ptr<SubchannelPicker> picker) {
if (parent_->shutting_down_) return;
// If this request is from the pending child policy, ignore it until
// it reports READY, at which point we swap it into place.
if (CalledByPendingChild()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p helper %p] pending child policy %p reports state=%s",
parent_.get(), this, parent_->pending_child_policy_.get(),
ConnectivityStateName(state));
}
if (state != GRPC_CHANNEL_READY) return;
grpc_pollset_set_del_pollset_set(
parent_->child_policy_->interested_parties(),
parent_->interested_parties());
parent_->child_policy_ = std::move(parent_->pending_child_policy_);
} else if (!CalledByCurrentChild()) {
// This request is from an outdated child, so ignore it.
return;
}
// Record whether child policy reports READY.
parent_->child_policy_ready_ = state == GRPC_CHANNEL_READY;
// Enter fallback mode if needed.
@ -721,16 +682,6 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
void GrpcLb::Helper::RequestReresolution() {
if (parent_->shutting_down_) return;
const LoadBalancingPolicy* latest_child_policy =
parent_->pending_child_policy_ != nullptr
? parent_->pending_child_policy_.get()
: parent_->child_policy_.get();
if (child_ != latest_child_policy) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p] Re-resolution requested from %schild policy (%p).",
parent_.get(), CalledByPendingChild() ? "pending " : "", child_);
}
// If we are talking to a balancer, we expect to get updated addresses
// from the balancer, so we can ignore the re-resolution request from
// the child policy. Otherwise, pass the re-resolution request up to the
@ -742,10 +693,7 @@ void GrpcLb::Helper::RequestReresolution() {
}
void GrpcLb::Helper::AddTraceEvent(TraceSeverity severity, StringView message) {
if (parent_->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) {
return;
}
if (parent_->shutting_down_) return;
parent_->channel_control_helper()->AddTraceEvent(severity, message);
}
@ -1411,13 +1359,8 @@ void GrpcLb::ShutdownLocked() {
if (child_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
interested_parties());
child_policy_.reset();
}
if (pending_child_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(
pending_child_policy_->interested_parties(), interested_parties());
}
child_policy_.reset();
pending_child_policy_.reset();
// We destroy the LB channel here instead of in our destructor because
// destroying the channel triggers a last callback to
// OnBalancerChannelConnectivityChangedLocked(), and we need to be
@ -1439,9 +1382,6 @@ void GrpcLb::ResetBackoffLocked() {
if (child_policy_ != nullptr) {
child_policy_->ResetBackoffLocked();
}
if (pending_child_policy_ != nullptr) {
pending_child_policy_->ResetBackoffLocked();
}
}
void GrpcLb::UpdateLocked(UpdateArgs args) {
@ -1727,25 +1667,17 @@ grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked(
}
OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
const char* name, const grpc_channel_args* args) {
Helper* helper = new Helper(Ref());
const grpc_channel_args* args) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
std::unique_ptr<ChannelControlHelper>(helper);
lb_policy_args.channel_control_helper = absl::make_unique<Helper>(Ref());
OrphanablePtr<LoadBalancingPolicy> lb_policy =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
name, std::move(lb_policy_args));
if (GPR_UNLIKELY(lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "[grpclb %p] Failure creating child policy %s", this,
name);
return nullptr;
}
helper->set_child(lb_policy.get());
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_lb_glb_trace);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Created new child policy %s (%p)", this,
name, lb_policy.get());
gpr_log(GPR_INFO, "[grpclb %p] Created new child policy handler (%p)", this,
lb_policy.get());
}
// Add the gRPC LB's interested_parties pollset_set to that of the newly
// created child policy. This will make the child policy progress upon
@ -1776,97 +1708,16 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() {
CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
GPR_ASSERT(update_args.args != nullptr);
update_args.config = child_policy_config_;
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
// policy transitions into state READY, we swap it into child_policy_,
// replacing the original child policy. So pending_child_policy_ is
// non-null only between when we apply an update that changes the child
// policy name and when the new child reports state READY.
//
// Updates can arrive at any point during this transition. We always
// apply updates relative to the most recently created child policy,
// even if the most recent one is still in pending_child_policy_. This
// is true both when applying the updates to an existing child policy
// and when determining whether we need to create a new policy.
//
// As a result of this, there are several cases to consider here:
//
// 1. We have no existing child policy (i.e., we have started up but
// have not yet received a serverlist from the balancer or gone
// into fallback mode; in this case, both child_policy_ and
// pending_child_policy_ are null). In this case, we create a
// new child policy and store it in child_policy_.
//
// 2. We have an existing child policy and have no pending child policy
// from a previous update (i.e., either there has not been a
// previous update that changed the policy name, or we have already
// finished swapping in the new policy; in this case, child_policy_
// is non-null but pending_child_policy_ is null). In this case:
// a. If child_policy_->name() equals child_policy_name, then we
// update the existing child policy.
// b. If child_policy_->name() does not equal child_policy_name,
// we create a new policy. The policy will be stored in
// pending_child_policy_ and will later be swapped into
// child_policy_ by the helper when the new child transitions
// into state READY.
//
// 3. We have an existing child policy and have a pending child policy
// from a previous update (i.e., a previous update set
// pending_child_policy_ as per case 2b above and that policy has
// not yet transitioned into state READY and been swapped into
// child_policy_; in this case, both child_policy_ and
// pending_child_policy_ are non-null). In this case:
// a. If pending_child_policy_->name() equals child_policy_name,
// then we update the existing pending child policy.
// b. If pending_child_policy->name() does not equal
// child_policy_name, then we create a new policy. The new
// policy is stored in pending_child_policy_ (replacing the one
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
const char* child_policy_name = child_policy_config_ == nullptr
? "round_robin"
: child_policy_config_->name();
const bool create_policy =
// case 1
child_policy_ == nullptr ||
// case 2b
(pending_child_policy_ == nullptr &&
strcmp(child_policy_->name(), child_policy_name) != 0) ||
// case 3b
(pending_child_policy_ != nullptr &&
strcmp(pending_child_policy_->name(), child_policy_name) != 0);
LoadBalancingPolicy* policy_to_update = nullptr;
if (create_policy) {
// Cases 1, 2b, and 3b: create a new child policy.
// If child_policy_ is null, we set it (case 1), else we set
// pending_child_policy_ (cases 2b and 3b).
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this,
child_policy_ == nullptr ? "" : "pending ", child_policy_name);
}
// Swap the policy into place.
auto& lb_policy =
child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args);
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
// If we have a pending child policy, send the update to the pending
// policy (case 3a), else send it to the current policy (case 2a).
policy_to_update = pending_child_policy_ != nullptr
? pending_child_policy_.get()
: child_policy_.get();
// Create child policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(update_args.args);
}
GPR_ASSERT(policy_to_update != nullptr);
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Updating %schild policy %p", this,
policy_to_update == pending_child_policy_.get() ? "pending " : "",
policy_to_update);
gpr_log(GPR_INFO, "[grpclb %p] Updating child policy handler %p", this,
child_policy_.get());
}
policy_to_update->UpdateLocked(std::move(update_args));
child_policy_->UpdateLocked(std::move(update_args));
}
//
@ -1889,21 +1740,29 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
return MakeRefCounted<GrpcLbConfig>(nullptr);
}
std::vector<grpc_error*> error_list;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
Json child_policy_config_json_tmp;
const Json* child_policy_config_json;
auto it = json.object_value().find("childPolicy");
if (it != json.object_value().end()) {
grpc_error* parse_error = GRPC_ERROR_NONE;
child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
it->second, &parse_error);
if (parse_error != GRPC_ERROR_NONE) {
std::vector<grpc_error*> child_errors;
child_errors.push_back(parse_error);
error_list.push_back(
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
}
if (it == json.object_value().end()) {
child_policy_config_json_tmp = Json::Array{Json::Object{
{"round_robin", Json::Object()},
}};
child_policy_config_json = &child_policy_config_json_tmp;
} else {
child_policy_config_json = &it->second;
}
grpc_error* parse_error = GRPC_ERROR_NONE;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config =
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
*child_policy_config_json, &parse_error);
if (parse_error != GRPC_ERROR_NONE) {
std::vector<grpc_error*> child_errors;
child_errors.push_back(parse_error);
error_list.push_back(
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
}
if (error_list.empty()) {
return MakeRefCounted<GrpcLbConfig>(std::move(child_policy));
return MakeRefCounted<GrpcLbConfig>(std::move(child_policy_config));
} else {
*error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list);
return nullptr;

@ -32,6 +32,7 @@
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
@ -198,14 +199,8 @@ class XdsLb : public LoadBalancingPolicy {
void RequestReresolution() override;
void AddTraceEvent(TraceSeverity severity, StringView message) override;
void set_child(LoadBalancingPolicy* child) { child_ = child; }
private:
bool CalledByPendingFallback() const;
bool CalledByCurrentFallback() const;
RefCountedPtr<XdsLb> parent_;
LoadBalancingPolicy* child_ = nullptr;
};
// Each LocalityMap holds a ref to the XdsLb.
@ -262,19 +257,14 @@ class XdsLb : public LoadBalancingPolicy {
// client, which is a watch-based API.
void RequestReresolution() override {}
void AddTraceEvent(TraceSeverity severity, StringView message) override;
void set_child(LoadBalancingPolicy* child) { child_ = child; }
private:
bool CalledByPendingChild() const;
bool CalledByCurrentChild() const;
RefCountedPtr<Locality> locality_;
LoadBalancingPolicy* child_ = nullptr;
};
// Methods for dealing with the child policy.
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const char* name, const grpc_channel_args* args);
const grpc_channel_args* args);
grpc_channel_args* CreateChildPolicyArgsLocked(
const grpc_channel_args* args);
@ -291,7 +281,6 @@ class XdsLb : public LoadBalancingPolicy {
RefCountedPtr<XdsLocalityName> name_;
RefCountedPtr<XdsClusterLocalityStats> stats_;
OrphanablePtr<LoadBalancingPolicy> child_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
RefCountedPtr<RefCountedEndpointPicker> picker_wrapper_;
RefCountedPtr<LoadReportingPicker> load_reporting_picker_;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
@ -403,7 +392,7 @@ class XdsLb : public LoadBalancingPolicy {
static void OnFallbackTimerLocked(void* arg, grpc_error* error);
void UpdateFallbackPolicyLocked();
OrphanablePtr<LoadBalancingPolicy> CreateFallbackPolicyLocked(
const char* name, const grpc_channel_args* args);
const grpc_channel_args* args);
void MaybeExitFallbackMode();
// Server name from target URI.
@ -445,7 +434,6 @@ class XdsLb : public LoadBalancingPolicy {
// Non-null iff we are in fallback mode.
OrphanablePtr<LoadBalancingPolicy> fallback_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_fallback_policy_;
const grpc_millis locality_retention_interval_ms_;
const grpc_millis locality_map_failover_timeout_ms_;
@ -539,71 +527,26 @@ XdsLb::PickResult XdsLb::LocalityPicker::PickFromLocality(const uint32_t key,
// XdsLb::FallbackHelper
//
bool XdsLb::FallbackHelper::CalledByPendingFallback() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->pending_fallback_policy_.get();
}
bool XdsLb::FallbackHelper::CalledByCurrentFallback() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->fallback_policy_.get();
}
RefCountedPtr<SubchannelInterface> XdsLb::FallbackHelper::CreateSubchannel(
const grpc_channel_args& args) {
if (parent_->shutting_down_ ||
(!CalledByPendingFallback() && !CalledByCurrentFallback())) {
return nullptr;
}
if (parent_->shutting_down_) return nullptr;
return parent_->channel_control_helper()->CreateSubchannel(args);
}
void XdsLb::FallbackHelper::UpdateState(
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) {
if (parent_->shutting_down_) return;
// If this request is from the pending fallback policy, ignore it until
// it reports READY, at which point we swap it into place.
if (CalledByPendingFallback()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(
GPR_INFO,
"[xdslb %p helper %p] pending fallback policy %p reports state=%s",
parent_.get(), this, parent_->pending_fallback_policy_.get(),
ConnectivityStateName(state));
}
if (state != GRPC_CHANNEL_READY) return;
grpc_pollset_set_del_pollset_set(
parent_->fallback_policy_->interested_parties(),
parent_->interested_parties());
parent_->fallback_policy_ = std::move(parent_->pending_fallback_policy_);
} else if (!CalledByCurrentFallback()) {
// This request is from an outdated fallback policy, so ignore it.
return;
}
parent_->channel_control_helper()->UpdateState(state, std::move(picker));
}
void XdsLb::FallbackHelper::RequestReresolution() {
if (parent_->shutting_down_) return;
const LoadBalancingPolicy* latest_fallback_policy =
parent_->pending_fallback_policy_ != nullptr
? parent_->pending_fallback_policy_.get()
: parent_->fallback_policy_.get();
if (child_ != latest_fallback_policy) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO,
"[xdslb %p] Re-resolution requested from the fallback policy (%p).",
parent_.get(), child_);
}
parent_->channel_control_helper()->RequestReresolution();
}
void XdsLb::FallbackHelper::AddTraceEvent(TraceSeverity severity,
StringView message) {
if (parent_->shutting_down_ ||
(!CalledByPendingFallback() && !CalledByCurrentFallback())) {
return;
}
if (parent_->shutting_down_) return;
parent_->channel_control_helper()->AddTraceEvent(severity, message);
}
@ -737,13 +680,8 @@ void XdsLb::ShutdownLocked() {
if (fallback_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(fallback_policy_->interested_parties(),
interested_parties());
fallback_policy_.reset();
}
if (pending_fallback_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(
pending_fallback_policy_->interested_parties(), interested_parties());
}
fallback_policy_.reset();
pending_fallback_policy_.reset();
// Cancel the endpoint watch here instead of in our dtor if we are using the
// XdsResolver, because the watcher holds a ref to us and we might not be
// destroying the Xds client leading to a situation where the Xds lb policy is
@ -771,9 +709,6 @@ void XdsLb::ResetBackoffLocked() {
if (fallback_policy_ != nullptr) {
fallback_policy_->ResetBackoffLocked();
}
if (pending_fallback_policy_ != nullptr) {
pending_fallback_policy_->ResetBackoffLocked();
}
}
void XdsLb::UpdateLocked(UpdateArgs args) {
@ -896,127 +831,37 @@ void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
void XdsLb::UpdateFallbackPolicyLocked() {
if (shutting_down_) return;
// Construct update args.
// Create policy if needed.
if (fallback_policy_ == nullptr) {
fallback_policy_ = CreateFallbackPolicyLocked(args_);
GPR_ASSERT(fallback_policy_ != nullptr);
}
// Perform update.
UpdateArgs update_args;
update_args.addresses = fallback_backend_addresses_;
update_args.config = config_->fallback_policy();
update_args.args = grpc_channel_args_copy(args_);
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
// policy transitions into state READY, we swap it into child_policy_,
// replacing the original child policy. So pending_child_policy_ is
// non-null only between when we apply an update that changes the child
// policy name and when the new child reports state READY.
//
// Updates can arrive at any point during this transition. We always
// apply updates relative to the most recently created child policy,
// even if the most recent one is still in pending_child_policy_. This
// is true both when applying the updates to an existing child policy
// and when determining whether we need to create a new policy.
//
// As a result of this, there are several cases to consider here:
//
// 1. We have no existing child policy (i.e., we have started up but
// have not yet received a serverlist from the balancer or gone
// into fallback mode; in this case, both child_policy_ and
// pending_child_policy_ are null). In this case, we create a
// new child policy and store it in child_policy_.
//
// 2. We have an existing child policy and have no pending child policy
// from a previous update (i.e., either there has not been a
// previous update that changed the policy name, or we have already
// finished swapping in the new policy; in this case, child_policy_
// is non-null but pending_child_policy_ is null). In this case:
// a. If child_policy_->name() equals child_policy_name, then we
// update the existing child policy.
// b. If child_policy_->name() does not equal child_policy_name,
// we create a new policy. The policy will be stored in
// pending_child_policy_ and will later be swapped into
// child_policy_ by the helper when the new child transitions
// into state READY.
//
// 3. We have an existing child policy and have a pending child policy
// from a previous update (i.e., a previous update set
// pending_child_policy_ as per case 2b above and that policy has
// not yet transitioned into state READY and been swapped into
// child_policy_; in this case, both child_policy_ and
// pending_child_policy_ are non-null). In this case:
// a. If pending_child_policy_->name() equals child_policy_name,
// then we update the existing pending child policy.
// b. If pending_child_policy->name() does not equal
// child_policy_name, then we create a new policy. The new
// policy is stored in pending_child_policy_ (replacing the one
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
const char* fallback_policy_name = update_args.config == nullptr
? "round_robin"
: update_args.config->name();
const bool create_policy =
// case 1
fallback_policy_ == nullptr ||
// case 2b
(pending_fallback_policy_ == nullptr &&
strcmp(fallback_policy_->name(), fallback_policy_name) != 0) ||
// case 3b
(pending_fallback_policy_ != nullptr &&
strcmp(pending_fallback_policy_->name(), fallback_policy_name) != 0);
LoadBalancingPolicy* policy_to_update = nullptr;
if (create_policy) {
// Cases 1, 2b, and 3b: create a new child policy.
// If child_policy_ is null, we set it (case 1), else we set
// pending_child_policy_ (cases 2b and 3b).
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Creating new %sfallback policy %s", this,
fallback_policy_ == nullptr ? "" : "pending ",
fallback_policy_name);
}
auto& lb_policy = fallback_policy_ == nullptr ? fallback_policy_
: pending_fallback_policy_;
lb_policy =
CreateFallbackPolicyLocked(fallback_policy_name, update_args.args);
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
// If we have a pending child policy, send the update to the pending
// policy (case 3a), else send it to the current policy (case 2a).
policy_to_update = pending_fallback_policy_ != nullptr
? pending_fallback_policy_.get()
: fallback_policy_.get();
}
GPR_ASSERT(policy_to_update != nullptr);
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(
GPR_INFO, "[xdslb %p] Updating %sfallback policy %p", this,
policy_to_update == pending_fallback_policy_.get() ? "pending " : "",
policy_to_update);
gpr_log(GPR_INFO, "[xdslb %p] Updating fallback child policy handler %p",
this, fallback_policy_.get());
}
policy_to_update->UpdateLocked(std::move(update_args));
fallback_policy_->UpdateLocked(std::move(update_args));
}
OrphanablePtr<LoadBalancingPolicy> XdsLb::CreateFallbackPolicyLocked(
const char* name, const grpc_channel_args* args) {
FallbackHelper* helper =
new FallbackHelper(Ref(DEBUG_LOCATION, "FallbackHelper"));
const grpc_channel_args* args) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
std::unique_ptr<ChannelControlHelper>(helper);
absl::make_unique<FallbackHelper>(Ref(DEBUG_LOCATION, "FallbackHelper"));
OrphanablePtr<LoadBalancingPolicy> lb_policy =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
name, std::move(lb_policy_args));
if (GPR_UNLIKELY(lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "[xdslb %p] Failure creating fallback policy %s", this,
name);
return nullptr;
}
helper->set_child(lb_policy.get());
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_lb_xds_trace);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Created new fallback policy %s (%p)", this,
name, lb_policy.get());
gpr_log(GPR_INFO,
"[xdslb %p] Created new fallback child policy handler (%p)", this,
lb_policy.get());
}
// Add the xDS's interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on xDS
@ -1030,7 +875,6 @@ void XdsLb::MaybeExitFallbackMode() {
if (fallback_policy_ == nullptr) return;
gpr_log(GPR_INFO, "[xdslb %p] Exiting fallback mode", this);
fallback_policy_.reset();
pending_fallback_policy_.reset();
}
//
@ -1513,27 +1357,19 @@ grpc_channel_args* XdsLb::LocalityMap::Locality::CreateChildPolicyArgsLocked(
OrphanablePtr<LoadBalancingPolicy>
XdsLb::LocalityMap::Locality::CreateChildPolicyLocked(
const char* name, const grpc_channel_args* args) {
Helper* helper = new Helper(this->Ref(DEBUG_LOCATION, "Helper"));
const grpc_channel_args* args) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = xds_policy()->combiner();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
std::unique_ptr<ChannelControlHelper>(helper);
absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
OrphanablePtr<LoadBalancingPolicy> lb_policy =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
name, std::move(lb_policy_args));
if (GPR_UNLIKELY(lb_policy == nullptr)) {
gpr_log(GPR_ERROR,
"[xdslb %p] Locality %p %s: failure creating child policy %s",
xds_policy(), this, name_->AsHumanReadableString(), name);
return nullptr;
}
helper->set_child(lb_policy.get());
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_lb_xds_trace);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO,
"[xdslb %p] Locality %p %s: Created new child policy %s (%p)",
xds_policy(), this, name_->AsHumanReadableString(), name,
"[xdslb %p] Locality %p %s: Created new child policy handler (%p)",
xds_policy(), this, name_->AsHumanReadableString(),
lb_policy.get());
}
// Add the xDS's interested_parties pollset_set to that of the newly created
@ -1560,101 +1396,19 @@ void XdsLb::LocalityMap::Locality::UpdateLocked(uint32_t locality_weight,
update_args.addresses = std::move(serverlist);
update_args.config = xds_policy()->config_->child_policy();
update_args.args = CreateChildPolicyArgsLocked(xds_policy()->args_);
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
// policy transitions into state READY, we swap it into child_policy_,
// replacing the original child policy. So pending_child_policy_ is
// non-null only between when we apply an update that changes the child
// policy name and when the new child reports state READY.
//
// Updates can arrive at any point during this transition. We always
// apply updates relative to the most recently created child policy,
// even if the most recent one is still in pending_child_policy_. This
// is true both when applying the updates to an existing child policy
// and when determining whether we need to create a new policy.
//
// As a result of this, there are several cases to consider here:
//
// 1. We have no existing child policy (i.e., we have started up but
// have not yet received a serverlist from the balancer or gone
// into fallback mode; in this case, both child_policy_ and
// pending_child_policy_ are null). In this case, we create a
// new child policy and store it in child_policy_.
//
// 2. We have an existing child policy and have no pending child policy
// from a previous update (i.e., either there has not been a
// previous update that changed the policy name, or we have already
// finished swapping in the new policy; in this case, child_policy_
// is non-null but pending_child_policy_ is null). In this case:
// a. If child_policy_->name() equals child_policy_name, then we
// update the existing child policy.
// b. If child_policy_->name() does not equal child_policy_name,
// we create a new policy. The policy will be stored in
// pending_child_policy_ and will later be swapped into
// child_policy_ by the helper when the new child transitions
// into state READY.
//
// 3. We have an existing child policy and have a pending child policy
// from a previous update (i.e., a previous update set
// pending_child_policy_ as per case 2b above and that policy has
// not yet transitioned into state READY and been swapped into
// child_policy_; in this case, both child_policy_ and
// pending_child_policy_ are non-null). In this case:
// a. If pending_child_policy_->name() equals child_policy_name,
// then we update the existing pending child policy.
// b. If pending_child_policy->name() does not equal
// child_policy_name, then we create a new policy. The new
// policy is stored in pending_child_policy_ (replacing the one
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
// TODO(juanlishen): If the child policy is not configured via service config,
// use whatever algorithm is specified by the balancer.
const char* child_policy_name = update_args.config == nullptr
? "round_robin"
: update_args.config->name();
const bool create_policy =
// case 1
child_policy_ == nullptr ||
// case 2b
(pending_child_policy_ == nullptr &&
strcmp(child_policy_->name(), child_policy_name) != 0) ||
// case 3b
(pending_child_policy_ != nullptr &&
strcmp(pending_child_policy_->name(), child_policy_name) != 0);
LoadBalancingPolicy* policy_to_update = nullptr;
if (create_policy) {
// Cases 1, 2b, and 3b: create a new child policy.
// If child_policy_ is null, we set it (case 1), else we set
// pending_child_policy_ (cases 2b and 3b).
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO,
"[xdslb %p] Locality %p %s: Creating new %schild policy %s",
xds_policy(), this, name_->AsHumanReadableString(),
child_policy_ == nullptr ? "" : "pending ", child_policy_name);
}
auto& lb_policy =
child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args);
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
// If we have a pending child policy, send the update to the pending
// policy (case 3a), else send it to the current policy (case 2a).
policy_to_update = pending_child_policy_ != nullptr
? pending_child_policy_.get()
: child_policy_.get();
}
GPR_ASSERT(policy_to_update != nullptr);
// Create child policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(update_args.args);
GPR_ASSERT(child_policy_ != nullptr);
}
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: Updating %schild policy %p",
gpr_log(GPR_INFO,
"[xdslb %p] Locality %p %s: Updating child policy handler %p",
xds_policy(), this, name_->AsHumanReadableString(),
policy_to_update == pending_child_policy_.get() ? "pending " : "",
policy_to_update);
child_policy_.get());
}
policy_to_update->UpdateLocked(std::move(update_args));
child_policy_->UpdateLocked(std::move(update_args));
}
void XdsLb::LocalityMap::Locality::ShutdownLocked() {
@ -1668,12 +1422,6 @@ void XdsLb::LocalityMap::Locality::ShutdownLocked() {
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
xds_policy()->interested_parties());
child_policy_.reset();
if (pending_child_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(
pending_child_policy_->interested_parties(),
xds_policy()->interested_parties());
pending_child_policy_.reset();
}
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
load_reporting_picker_.reset();
@ -1686,9 +1434,6 @@ void XdsLb::LocalityMap::Locality::ShutdownLocked() {
void XdsLb::LocalityMap::Locality::ResetBackoffLocked() {
child_policy_->ResetBackoffLocked();
if (pending_child_policy_ != nullptr) {
pending_child_policy_->ResetBackoffLocked();
}
}
void XdsLb::LocalityMap::Locality::Orphan() {
@ -1736,23 +1481,10 @@ void XdsLb::LocalityMap::Locality::OnDelayedRemovalTimerLocked(
// XdsLb::LocalityMap::Locality::Helper
//
bool XdsLb::LocalityMap::Locality::Helper::CalledByPendingChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == locality_->pending_child_policy_.get();
}
bool XdsLb::LocalityMap::Locality::Helper::CalledByCurrentChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == locality_->child_policy_.get();
}
RefCountedPtr<SubchannelInterface>
XdsLb::LocalityMap::Locality::Helper::CreateSubchannel(
const grpc_channel_args& args) {
if (locality_->xds_policy()->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) {
return nullptr;
}
if (locality_->xds_policy()->shutting_down_) return nullptr;
return locality_->xds_policy()->channel_control_helper()->CreateSubchannel(
args);
}
@ -1760,25 +1492,6 @@ XdsLb::LocalityMap::Locality::Helper::CreateSubchannel(
void XdsLb::LocalityMap::Locality::Helper::UpdateState(
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) {
if (locality_->xds_policy()->shutting_down_) return;
// If this request is from the pending child policy, ignore it until
// it reports READY, at which point we swap it into place.
if (CalledByPendingChild()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO,
"[xdslb %p helper %p] pending child policy %p reports state=%s",
locality_->xds_policy(), this,
locality_->pending_child_policy_.get(),
ConnectivityStateName(state));
}
if (state != GRPC_CHANNEL_READY) return;
grpc_pollset_set_del_pollset_set(
locality_->child_policy_->interested_parties(),
locality_->xds_policy()->interested_parties());
locality_->child_policy_ = std::move(locality_->pending_child_policy_);
} else if (!CalledByCurrentChild()) {
// This request is from an outdated child, so ignore it.
return;
}
// Cache the state and picker in the locality.
locality_->connectivity_state_ = state;
locality_->picker_wrapper_ =
@ -1789,10 +1502,7 @@ void XdsLb::LocalityMap::Locality::Helper::UpdateState(
void XdsLb::LocalityMap::Locality::Helper::AddTraceEvent(TraceSeverity severity,
StringView message) {
if (locality_->xds_policy()->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) {
return;
}
if (locality_->xds_policy()->shutting_down_) return;
locality_->xds_policy()->channel_control_helper()->AddTraceEvent(severity,
message);
}
@ -1823,34 +1533,48 @@ class XdsFactory : public LoadBalancingPolicyFactory {
}
std::vector<grpc_error*> error_list;
// Child policy.
RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
Json json_tmp;
const Json* child_policy_json;
auto it = json.object_value().find("childPolicy");
if (it != json.object_value().end()) {
grpc_error* parse_error = GRPC_ERROR_NONE;
child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
it->second, &parse_error);
if (child_policy == nullptr) {
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
std::vector<grpc_error*> child_errors;
child_errors.push_back(parse_error);
error_list.push_back(
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
}
if (it == json.object_value().end()) {
json_tmp = Json::Array{Json::Object{
{"round_robin", Json::Object()},
}};
child_policy_json = &json_tmp;
} else {
child_policy_json = &it->second;
}
grpc_error* parse_error = GRPC_ERROR_NONE;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy =
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
*child_policy_json, &parse_error);
if (child_policy == nullptr) {
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
std::vector<grpc_error*> child_errors;
child_errors.push_back(parse_error);
error_list.push_back(
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
}
// Fallback policy.
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy;
const Json* fallback_policy_json;
it = json.object_value().find("fallbackPolicy");
if (it != json.object_value().end()) {
grpc_error* parse_error = GRPC_ERROR_NONE;
fallback_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
it->second, &parse_error);
if (fallback_policy == nullptr) {
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
std::vector<grpc_error*> child_errors;
child_errors.push_back(parse_error);
error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR(
"field:fallbackPolicy", &child_errors));
}
if (it == json.object_value().end()) {
json_tmp = Json::Array{Json::Object{
{"round_robin", Json::Object()},
}};
fallback_policy_json = &json_tmp;
} else {
fallback_policy_json = &it->second;
}
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy =
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
*fallback_policy_json, &parse_error);
if (fallback_policy == nullptr) {
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
std::vector<grpc_error*> child_errors;
child_errors.push_back(parse_error);
error_list.push_back(
GRPC_ERROR_CREATE_FROM_VECTOR("field:fallbackPolicy", &child_errors));
}
// EDS service name.
const char* eds_service_name = nullptr;

@ -33,6 +33,7 @@
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
@ -109,67 +110,31 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override {
if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
return parent_->channel_control_helper()->CreateSubchannel(args);
}
void UpdateState(grpc_connectivity_state state,
std::unique_ptr<SubchannelPicker> picker) override {
if (parent_->resolver_ == nullptr) return; // Shutting down.
// If this request is from the pending child policy, ignore it until
// it reports READY, at which point we swap it into place.
if (CalledByPendingChild()) {
if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
gpr_log(GPR_INFO,
"resolving_lb=%p helper=%p: pending child policy %p reports "
"state=%s",
parent_.get(), this, child_, ConnectivityStateName(state));
}
if (state != GRPC_CHANNEL_READY) return;
grpc_pollset_set_del_pollset_set(
parent_->lb_policy_->interested_parties(),
parent_->interested_parties());
parent_->lb_policy_ = std::move(parent_->pending_lb_policy_);
} else if (!CalledByCurrentChild()) {
// This request is from an outdated child, so ignore it.
return;
}
parent_->channel_control_helper()->UpdateState(state, std::move(picker));
}
void RequestReresolution() override {
// If there is a pending child policy, ignore re-resolution requests
// from the current child policy (or any outdated child).
if (parent_->pending_lb_policy_ != nullptr && !CalledByPendingChild()) {
return;
}
if (parent_->resolver_ == nullptr) return; // Shutting down.
if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
gpr_log(GPR_INFO, "resolving_lb=%p: started name re-resolving",
parent_.get());
}
if (parent_->resolver_ != nullptr) {
parent_->resolver_->RequestReresolutionLocked();
}
parent_->resolver_->RequestReresolutionLocked();
}
void AddTraceEvent(TraceSeverity /*severity*/,
StringView /*message*/) override {}
void set_child(LoadBalancingPolicy* child) { child_ = child; }
private:
bool CalledByPendingChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->pending_lb_policy_.get();
void AddTraceEvent(TraceSeverity severity, StringView message) override {
if (parent_->resolver_ == nullptr) return; // Shutting down.
parent_->channel_control_helper()->AddTraceEvent(severity, message);
}
bool CalledByCurrentChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->lb_policy_.get();
};
private:
RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
LoadBalancingPolicy* child_ = nullptr;
};
//
@ -217,23 +182,11 @@ void ResolvingLoadBalancingPolicy::ShutdownLocked() {
interested_parties());
lb_policy_.reset();
}
if (pending_lb_policy_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "resolving_lb=%p: shutting down pending lb_policy=%p",
this, pending_lb_policy_.get());
}
grpc_pollset_set_del_pollset_set(pending_lb_policy_->interested_parties(),
interested_parties());
pending_lb_policy_.reset();
}
}
}
void ResolvingLoadBalancingPolicy::ExitIdleLocked() {
if (lb_policy_ != nullptr) {
lb_policy_->ExitIdleLocked();
if (pending_lb_policy_ != nullptr) pending_lb_policy_->ExitIdleLocked();
}
if (lb_policy_ != nullptr) lb_policy_->ExitIdleLocked();
}
void ResolvingLoadBalancingPolicy::ResetBackoffLocked() {
@ -242,7 +195,6 @@ void ResolvingLoadBalancingPolicy::ResetBackoffLocked() {
resolver_->RequestReresolutionLocked();
}
if (lb_policy_ != nullptr) lb_policy_->ResetBackoffLocked();
if (pending_lb_policy_ != nullptr) pending_lb_policy_->ResetBackoffLocked();
}
void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
@ -269,132 +221,42 @@ void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
Resolver::Result result, TraceStringVector* trace_strings) {
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
// policy transitions into state READY, we swap it into child_policy_,
// replacing the original child policy. So pending_child_policy_ is
// non-null only between when we apply an update that changes the child
// policy name and when the new child reports state READY.
//
// Updates can arrive at any point during this transition. We always
// apply updates relative to the most recently created child policy,
// even if the most recent one is still in pending_child_policy_. This
// is true both when applying the updates to an existing child policy
// and when determining whether we need to create a new policy.
//
// As a result of this, there are several cases to consider here:
//
// 1. We have no existing child policy (i.e., we have started up but
// have not yet received a serverlist from the balancer or gone
// into fallback mode; in this case, both child_policy_ and
// pending_child_policy_ are null). In this case, we create a
// new child policy and store it in child_policy_.
//
// 2. We have an existing child policy and have no pending child policy
// from a previous update (i.e., either there has not been a
// previous update that changed the policy name, or we have already
// finished swapping in the new policy; in this case, child_policy_
// is non-null but pending_child_policy_ is null). In this case:
// a. If child_policy_->name() equals child_policy_name, then we
// update the existing child policy.
// b. If child_policy_->name() does not equal child_policy_name,
// we create a new policy. The policy will be stored in
// pending_child_policy_ and will later be swapped into
// child_policy_ by the helper when the new child transitions
// into state READY.
//
// 3. We have an existing child policy and have a pending child policy
// from a previous update (i.e., a previous update set
// pending_child_policy_ as per case 2b above and that policy has
// not yet transitioned into state READY and been swapped into
// child_policy_; in this case, both child_policy_ and
// pending_child_policy_ are non-null). In this case:
// a. If pending_child_policy_->name() equals child_policy_name,
// then we update the existing pending child policy.
// b. If pending_child_policy->name() does not equal
// child_policy_name, then we create a new policy. The new
// policy is stored in pending_child_policy_ (replacing the one
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
const char* lb_policy_name = lb_policy_config->name();
const bool create_policy =
// case 1
lb_policy_ == nullptr ||
// case 2b
(pending_lb_policy_ == nullptr &&
strcmp(lb_policy_->name(), lb_policy_name) != 0) ||
// case 3b
(pending_lb_policy_ != nullptr &&
strcmp(pending_lb_policy_->name(), lb_policy_name) != 0);
LoadBalancingPolicy* policy_to_update = nullptr;
if (create_policy) {
// Cases 1, 2b, and 3b: create a new child policy.
// If lb_policy_ is null, we set it (case 1), else we set
// pending_lb_policy_ (cases 2b and 3b).
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "resolving_lb=%p: Creating new %schild policy %s", this,
lb_policy_ == nullptr ? "" : "pending ", lb_policy_name);
}
auto& lb_policy = lb_policy_ == nullptr ? lb_policy_ : pending_lb_policy_;
lb_policy =
CreateLbPolicyLocked(lb_policy_name, *result.args, trace_strings);
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
// If we have a pending child policy, send the update to the pending
// policy (case 3a), else send it to the current policy (case 2a).
policy_to_update = pending_lb_policy_ != nullptr ? pending_lb_policy_.get()
: lb_policy_.get();
}
GPR_ASSERT(policy_to_update != nullptr);
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "resolving_lb=%p: Updating %schild policy %p", this,
policy_to_update == pending_lb_policy_.get() ? "pending " : "",
policy_to_update);
}
Resolver::Result result) {
// Construct update.
UpdateArgs update_args;
update_args.addresses = std::move(result.addresses);
update_args.config = std::move(lb_policy_config);
// TODO(roth): Once channel args is converted to C++, use std::move() here.
update_args.args = result.args;
result.args = nullptr;
policy_to_update->UpdateLocked(std::move(update_args));
// Create policy if needed.
if (lb_policy_ == nullptr) {
lb_policy_ = CreateLbPolicyLocked(*update_args.args);
}
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "resolving_lb=%p: Updating child policy %p", this,
lb_policy_.get());
}
lb_policy_->UpdateLocked(std::move(update_args));
}
// Creates a new LB policy.
// Updates trace_strings to indicate what was done.
OrphanablePtr<LoadBalancingPolicy>
ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
const char* lb_policy_name, const grpc_channel_args& args,
TraceStringVector* trace_strings) {
ResolvingControlHelper* helper = new ResolvingControlHelper(Ref());
const grpc_channel_args& args) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.channel_control_helper =
std::unique_ptr<ChannelControlHelper>(helper);
absl::make_unique<ResolvingControlHelper>(Ref());
lb_policy_args.args = &args;
OrphanablePtr<LoadBalancingPolicy> lb_policy =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
lb_policy_name, std::move(lb_policy_args));
if (GPR_UNLIKELY(lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
char* str;
gpr_asprintf(&str, "Could not create LB policy \"%s\"", lb_policy_name);
trace_strings->push_back(str);
return nullptr;
}
helper->set_child(lb_policy.get());
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), tracer_);
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy \"%s\" (%p)",
this, lb_policy_name, lb_policy.get());
gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy %p", this,
lb_policy.get());
}
char* str;
gpr_asprintf(&str, "Created new LB policy \"%s\"", lb_policy_name);
trace_strings->push_back(str);
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties());
return lb_policy;
@ -476,8 +338,8 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
}
if (lb_policy_config != nullptr) {
// Create or update LB policy, as needed.
CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config), std::move(result),
&trace_strings);
CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config),
std::move(result));
}
// Add channel trace event.
if (service_config_changed) {

@ -92,10 +92,9 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
void OnResolverError(grpc_error* error);
void CreateOrUpdateLbPolicyLocked(
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
Resolver::Result result, TraceStringVector* trace_strings);
Resolver::Result result);
OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
const char* lb_policy_name, const grpc_channel_args& args,
TraceStringVector* trace_strings);
const grpc_channel_args& args);
void MaybeAddTraceMessagesForAddressChangesLocked(
bool resolution_contains_addresses, TraceStringVector* trace_strings);
void ConcatenateAndAddChannelTraceLocked(
@ -116,7 +115,6 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
// Child LB policy.
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_lb_policy_;
};
} // namespace grpc_core

@ -28,6 +28,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/http_connect_handshaker.cc',
'src/core/ext/filters/client_channel/http_proxy.cc',
'src/core/ext/filters/client_channel/lb_policy.cc',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc',

@ -1091,6 +1091,8 @@ src/core/ext/filters/client_channel/http_proxy.cc \
src/core/ext/filters/client_channel/http_proxy.h \
src/core/ext/filters/client_channel/lb_policy.cc \
src/core/ext/filters/client_channel/lb_policy.h \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \

@ -888,6 +888,8 @@ src/core/ext/filters/client_channel/http_proxy.cc \
src/core/ext/filters/client_channel/http_proxy.h \
src/core/ext/filters/client_channel/lb_policy.cc \
src/core/ext/filters/client_channel/lb_policy.h \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \

Loading…
Cancel
Save