From 19e28b1353fce7af3a78f80af72732359ac6e0e8 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 28 Feb 2020 08:17:54 -0800 Subject: [PATCH] Refactor code for gracefully switching child LB policies. --- BUILD | 2 + BUILD.gn | 2 + CMakeLists.txt | 2 + Makefile | 2 + build_autogenerated.yaml | 4 + config.m4 | 2 + config.w32 | 1 + gRPC-C++.podspec | 2 + gRPC-Core.podspec | 3 + grpc.gemspec | 2 + grpc.gyp | 2 + package.xml | 2 + .../lb_policy/child_policy_handler.cc | 280 ++++++++++++ .../lb_policy/child_policy_handler.h | 66 +++ .../client_channel/lb_policy/grpclb/grpclb.cc | 215 ++------- .../client_channel/lb_policy/xds/xds.cc | 430 ++++-------------- .../client_channel/resolving_lb_policy.cc | 192 ++------ .../client_channel/resolving_lb_policy.h | 6 +- src/python/grpcio/grpc_core_dependencies.py | 1 + tools/doxygen/Doxyfile.c++.internal | 2 + tools/doxygen/Doxyfile.core.internal | 2 + 21 files changed, 520 insertions(+), 700 deletions(-) create mode 100644 src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc create mode 100644 src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h diff --git a/BUILD b/BUILD index 8d96fda0eac..75c1fd61543 100644 --- a/BUILD +++ b/BUILD @@ -1053,6 +1053,7 @@ grpc_cc_library( "src/core/ext/filters/client_channel/http_connect_handshaker.cc", "src/core/ext/filters/client_channel/http_proxy.cc", "src/core/ext/filters/client_channel/lb_policy.cc", + "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc", "src/core/ext/filters/client_channel/lb_policy_registry.cc", "src/core/ext/filters/client_channel/local_subchannel_pool.cc", "src/core/ext/filters/client_channel/parse_address.cc", @@ -1079,6 +1080,7 @@ grpc_cc_library( "src/core/ext/filters/client_channel/http_connect_handshaker.h", "src/core/ext/filters/client_channel/http_proxy.h", "src/core/ext/filters/client_channel/lb_policy.h", + "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h", "src/core/ext/filters/client_channel/lb_policy_factory.h", "src/core/ext/filters/client_channel/lb_policy_registry.h", "src/core/ext/filters/client_channel/local_subchannel_pool.h", diff --git a/BUILD.gn b/BUILD.gn index 8b1ef20bb94..1110b9a0b34 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -223,6 +223,8 @@ config("grpc_config") { "src/core/ext/filters/client_channel/http_proxy.h", "src/core/ext/filters/client_channel/lb_policy.cc", "src/core/ext/filters/client_channel/lb_policy.h", + "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc", + "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc", "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index 7f2c77cacad..e6030870b4f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1310,6 +1310,7 @@ add_library(grpc src/core/ext/filters/client_channel/http_connect_handshaker.cc src/core/ext/filters/client_channel/http_proxy.cc src/core/ext/filters/client_channel/lb_policy.cc + src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc @@ -1961,6 +1962,7 @@ add_library(grpc_unsecure src/core/ext/filters/client_channel/http_connect_handshaker.cc src/core/ext/filters/client_channel/http_proxy.cc src/core/ext/filters/client_channel/lb_policy.cc + src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc diff --git a/Makefile b/Makefile index 36d369404ae..f0bd9c056c6 100644 --- a/Makefile +++ b/Makefile @@ -3651,6 +3651,7 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ src/core/ext/filters/client_channel/http_proxy.cc \ src/core/ext/filters/client_channel/lb_policy.cc \ + src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc \ @@ -4277,6 +4278,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/client_channel/http_connect_handshaker.cc \ src/core/ext/filters/client_channel/http_proxy.cc \ src/core/ext/filters/client_channel/lb_policy.cc \ + src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 9f9ba0b53c5..23934d6fdb1 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -382,6 +382,7 @@ libs: - src/core/ext/filters/client_channel/http_connect_handshaker.h - src/core/ext/filters/client_channel/http_proxy.h - src/core/ext/filters/client_channel/lb_policy.h + - src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h @@ -738,6 +739,7 @@ libs: - src/core/ext/filters/client_channel/http_connect_handshaker.cc - src/core/ext/filters/client_channel/http_proxy.cc - src/core/ext/filters/client_channel/lb_policy.cc + - src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc @@ -1273,6 +1275,7 @@ libs: - src/core/ext/filters/client_channel/http_connect_handshaker.h - src/core/ext/filters/client_channel/http_proxy.h - src/core/ext/filters/client_channel/lb_policy.h + - src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h @@ -1564,6 +1567,7 @@ libs: - src/core/ext/filters/client_channel/http_connect_handshaker.cc - src/core/ext/filters/client_channel/http_proxy.cc - src/core/ext/filters/client_channel/lb_policy.cc + - src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc diff --git a/config.m4 b/config.m4 index 19c6b7ca332..dc8592b9d1f 100644 --- a/config.m4 +++ b/config.m4 @@ -50,6 +50,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/http_connect_handshaker.cc \ src/core/ext/filters/client_channel/http_proxy.cc \ src/core/ext/filters/client_channel/lb_policy.cc \ + src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc \ @@ -815,6 +816,7 @@ if test "$PHP_GRPC" != "no"; then PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/census) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/health) + PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/grpclb) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/pick_first) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/round_robin) diff --git a/config.w32 b/config.w32 index 8c7e03d6cb3..518ebb87e67 100644 --- a/config.w32 +++ b/config.w32 @@ -19,6 +19,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\http_connect_handshaker.cc " + "src\\core\\ext\\filters\\client_channel\\http_proxy.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy.cc " + + "src\\core\\ext\\filters\\client_channel\\lb_policy\\child_policy_handler.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\grpclb\\client_load_reporting_filter.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\grpclb\\grpclb.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\grpclb\\grpclb_channel_secure.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index aa71ac8a628..06a5a2762b9 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -233,6 +233,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/http_connect_handshaker.h', 'src/core/ext/filters/client_channel/http_proxy.h', 'src/core/ext/filters/client_channel/lb_policy.h', + 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h', @@ -681,6 +682,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/http_connect_handshaker.h', 'src/core/ext/filters/client_channel/http_proxy.h', 'src/core/ext/filters/client_channel/lb_policy.h', + 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index f4b630bf278..c6f26956635 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -206,6 +206,8 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/http_proxy.h', 'src/core/ext/filters/client_channel/lb_policy.cc', 'src/core/ext/filters/client_channel/lb_policy.h', + 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc', + 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc', @@ -1027,6 +1029,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/http_connect_handshaker.h', 'src/core/ext/filters/client_channel/http_proxy.h', 'src/core/ext/filters/client_channel/lb_policy.h', + 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h', diff --git a/grpc.gemspec b/grpc.gemspec index 9601d877211..243c8352652 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -128,6 +128,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/http_proxy.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy.h ) + s.files += %w( src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc ) + s.files += %w( src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc ) diff --git a/grpc.gyp b/grpc.gyp index 3c5911f830c..6f7ba7fe658 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -442,6 +442,7 @@ 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', 'src/core/ext/filters/client_channel/http_proxy.cc', 'src/core/ext/filters/client_channel/lb_policy.cc', + 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc', @@ -929,6 +930,7 @@ 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', 'src/core/ext/filters/client_channel/http_proxy.cc', 'src/core/ext/filters/client_channel/lb_policy.cc', + 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc', diff --git a/package.xml b/package.xml index 2f359bb8a0b..951ad89d023 100644 --- a/package.xml +++ b/package.xml @@ -108,6 +108,8 @@ + + diff --git a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc new file mode 100644 index 00000000000..14b26404b44 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc @@ -0,0 +1,280 @@ +// +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" + +#include "absl/strings/str_cat.h" + +#include "src/core/ext/filters/client_channel/lb_policy_registry.h" + +namespace grpc_core { + +// +// ChildPolicyHandler::Helper +// + +class ChildPolicyHandler::Helper + : public LoadBalancingPolicy::ChannelControlHelper { + public: + explicit Helper(RefCountedPtr parent) + : parent_(std::move(parent)) {} + + ~Helper() { parent_.reset(DEBUG_LOCATION, "Helper"); } + + RefCountedPtr CreateSubchannel( + const grpc_channel_args& args) override { + if (parent_->shutting_down_) return nullptr; + if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr; + return parent_->channel_control_helper()->CreateSubchannel(args); + } + + void UpdateState(grpc_connectivity_state state, + std::unique_ptr picker) override { + if (parent_->shutting_down_) return; + // If this request is from the pending child policy, ignore it until + // it reports READY, at which point we swap it into place. + if (CalledByPendingChild()) { + if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) { + gpr_log(GPR_INFO, + "[child_policy_handler %p] helper %p: pending child policy %p " + "reports state=%s", + parent_.get(), this, child_, ConnectivityStateName(state)); + } + if (state != GRPC_CHANNEL_READY) return; + grpc_pollset_set_del_pollset_set( + parent_->child_policy_->interested_parties(), + parent_->interested_parties()); + parent_->child_policy_ = std::move(parent_->pending_child_policy_); + } else if (!CalledByCurrentChild()) { + // This request is from an outdated child, so ignore it. + return; + } + parent_->channel_control_helper()->UpdateState(state, std::move(picker)); + } + + void RequestReresolution() override { + if (parent_->shutting_down_) return; + // Only forward re-resolution requests from the most recent child, + // since that's the one that will be receiving any update we receive + // from the resolver. + const LoadBalancingPolicy* latest_child_policy = + parent_->pending_child_policy_ != nullptr + ? parent_->pending_child_policy_.get() + : parent_->child_policy_.get(); + if (child_ != latest_child_policy) return; + if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) { + gpr_log(GPR_INFO, "[child_policy_handler %p] started name re-resolving", + parent_.get()); + } + parent_->channel_control_helper()->RequestReresolution(); + } + + void AddTraceEvent(TraceSeverity severity, StringView message) override { + if (parent_->shutting_down_) return; + if (!CalledByPendingChild() && !CalledByCurrentChild()) return; + parent_->channel_control_helper()->AddTraceEvent(severity, message); + } + + void set_child(LoadBalancingPolicy* child) { child_ = child; } + + private: + bool CalledByPendingChild() const { + GPR_ASSERT(child_ != nullptr); + return child_ == parent_->pending_child_policy_.get(); + } + + bool CalledByCurrentChild() const { + GPR_ASSERT(child_ != nullptr); + return child_ == parent_->child_policy_.get(); + }; + + RefCountedPtr parent_; + LoadBalancingPolicy* child_ = nullptr; +}; + +// +// ChildPolicyHandler +// + +void ChildPolicyHandler::ShutdownLocked() { + if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { + gpr_log(GPR_INFO, "[child_policy_handler %p] shutting down", this); + } + shutting_down_ = true; + if (child_policy_ != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { + gpr_log(GPR_INFO, "[child_policy_handler %p] shutting down lb_policy %p", + this, child_policy_.get()); + } + grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), + interested_parties()); + child_policy_.reset(); + } + if (pending_child_policy_ != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { + gpr_log(GPR_INFO, + "[child_policy_handler %p] shutting down pending lb_policy %p", + this, pending_child_policy_.get()); + } + grpc_pollset_set_del_pollset_set( + pending_child_policy_->interested_parties(), interested_parties()); + pending_child_policy_.reset(); + } +} + +void ChildPolicyHandler::UpdateLocked(UpdateArgs args) { + // The name of the policy that this update wants us to use. + const char* child_policy_name = args.config->name(); + // If the child policy name changes, we need to create a new child + // policy. When this happens, we leave child_policy_ as-is and store + // the new child policy in pending_child_policy_. Once the new child + // policy transitions into state READY, we swap it into child_policy_, + // replacing the original child policy. So pending_child_policy_ is + // non-null only between when we apply an update that changes the child + // policy name and when the new child reports state READY. + // + // Updates can arrive at any point during this transition. We always + // apply updates relative to the most recently created child policy, + // even if the most recent one is still in pending_child_policy_. This + // is true both when applying the updates to an existing child policy + // and when determining whether we need to create a new policy. + // + // As a result of this, there are several cases to consider here: + // + // 1. We have no existing child policy (i.e., this is the first update + // we receive after being created; in this case, both child_policy_ + // and pending_child_policy_ are null). In this case, we create a + // new child policy and store it in child_policy_. + // + // 2. We have an existing child policy and have no pending child policy + // from a previous update (i.e., either there has not been a + // previous update that changed the policy name, or we have already + // finished swapping in the new policy; in this case, child_policy_ + // is non-null but pending_child_policy_ is null). In this case: + // a. If child_policy_->name() equals child_policy_name, then we + // update the existing child policy. + // b. If child_policy_->name() does not equal child_policy_name, + // we create a new policy. The policy will be stored in + // pending_child_policy_ and will later be swapped into + // child_policy_ by the helper when the new child transitions + // into state READY. + // + // 3. We have an existing child policy and have a pending child policy + // from a previous update (i.e., a previous update set + // pending_child_policy_ as per case 2b above and that policy has + // not yet transitioned into state READY and been swapped into + // child_policy_; in this case, both child_policy_ and + // pending_child_policy_ are non-null). In this case: + // a. If pending_child_policy_->name() equals child_policy_name, + // then we update the existing pending child policy. + // b. If pending_child_policy->name() does not equal + // child_policy_name, then we create a new policy. The new + // policy is stored in pending_child_policy_ (replacing the one + // that was there before, which will be immediately shut down) + // and will later be swapped into child_policy_ by the helper + // when the new child transitions into state READY. + const bool create_policy = + // case 1 + child_policy_ == nullptr || + // case 2b + (pending_child_policy_ == nullptr && + strcmp(child_policy_->name(), child_policy_name) != 0) || + // case 3b + (pending_child_policy_ != nullptr && + strcmp(pending_child_policy_->name(), child_policy_name) != 0); + LoadBalancingPolicy* policy_to_update = nullptr; + if (create_policy) { + // Cases 1, 2b, and 3b: create a new child policy. + // If child_policy_ is null, we set it (case 1), else we set + // pending_child_policy_ (cases 2b and 3b). + if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { + gpr_log(GPR_INFO, + "[child_policy_handler %p] creating new %schild policy %s", this, + child_policy_ == nullptr ? "" : "pending ", child_policy_name); + } + auto& lb_policy = + child_policy_ == nullptr ? child_policy_ : pending_child_policy_; + lb_policy = CreateChildPolicy(child_policy_name, *args.args); + policy_to_update = lb_policy.get(); + } else { + // Cases 2a and 3a: update an existing policy. + // If we have a pending child policy, send the update to the pending + // policy (case 3a), else send it to the current policy (case 2a). + policy_to_update = pending_child_policy_ != nullptr + ? pending_child_policy_.get() + : child_policy_.get(); + } + GPR_ASSERT(policy_to_update != nullptr); + // Update the policy. + if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { + gpr_log(GPR_INFO, "[child_policy_handler %p] updating %schild policy %p", + this, + policy_to_update == pending_child_policy_.get() ? "pending " : "", + policy_to_update); + } + policy_to_update->UpdateLocked(std::move(args)); +} + +void ChildPolicyHandler::ExitIdleLocked() { + if (child_policy_ != nullptr) { + child_policy_->ExitIdleLocked(); + if (pending_child_policy_ != nullptr) { + pending_child_policy_->ExitIdleLocked(); + } + } +} + +void ChildPolicyHandler::ResetBackoffLocked() { + if (child_policy_ != nullptr) { + child_policy_->ResetBackoffLocked(); + if (pending_child_policy_ != nullptr) { + pending_child_policy_->ResetBackoffLocked(); + } + } +} + +OrphanablePtr ChildPolicyHandler::CreateChildPolicy( + const char* child_policy_name, const grpc_channel_args& args) { + Helper* helper = new Helper(Ref(DEBUG_LOCATION, "Helper")); + LoadBalancingPolicy::Args lb_policy_args; + lb_policy_args.combiner = combiner(); + lb_policy_args.channel_control_helper = + std::unique_ptr(helper); + lb_policy_args.args = &args; + OrphanablePtr lb_policy = + LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( + child_policy_name, std::move(lb_policy_args)); + if (GPR_UNLIKELY(lb_policy == nullptr)) { + gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", child_policy_name); + return nullptr; + } + helper->set_child(lb_policy.get()); + if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { + gpr_log(GPR_INFO, + "[child_policy_handler %p] created new LB policy \"%s\" (%p)", this, + child_policy_name, lb_policy.get()); + } + channel_control_helper()->AddTraceEvent( + ChannelControlHelper::TRACE_INFO, + absl::StrCat("Created new LB policy \"", child_policy_name, "\"")); + grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), + interested_parties()); + return lb_policy; +} + +} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h new file mode 100644 index 00000000000..7a32722d367 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h @@ -0,0 +1,66 @@ +// +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_CHILD_POLICY_HANDLER_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_CHILD_POLICY_HANDLER_H + +#include + +#include "src/core/ext/filters/client_channel/lb_policy.h" +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/gprpp/orphanable.h" + +namespace grpc_core { + +// A class that makes it easy to gracefully switch child policies. +// +// Callers should instantiate this instead of using +// LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(). Once +// instantiated, this object will automatically take care of +// constructing the child policy as needed upon receiving an update. +class ChildPolicyHandler : public LoadBalancingPolicy { + public: + ChildPolicyHandler(Args args, TraceFlag* tracer) + : LoadBalancingPolicy(std::move(args)), tracer_(tracer) {} + + virtual const char* name() const override { return "child_policy_handler"; } + + void UpdateLocked(UpdateArgs args) override; + void ExitIdleLocked() override; + void ResetBackoffLocked() override; + + private: + class Helper; + + void ShutdownLocked() override; + + OrphanablePtr CreateChildPolicy( + const char* child_policy_name, const grpc_channel_args& args); + + // Passed in from caller at construction time. + TraceFlag* tracer_; + + bool shutting_down_ = false; + + // Child LB policy. + OrphanablePtr child_policy_; + OrphanablePtr pending_child_policy_; +}; + +} // namespace grpc_core + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_CHILD_POLICY_HANDLER_H \ + */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 0e2d6cc7fb5..7896be28687 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -71,6 +71,7 @@ #include #include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h" @@ -296,14 +297,8 @@ class GrpcLb : public LoadBalancingPolicy { void RequestReresolution() override; void AddTraceEvent(TraceSeverity severity, StringView message) override; - void set_child(LoadBalancingPolicy* child) { child_ = child; } - private: - bool CalledByPendingChild() const; - bool CalledByCurrentChild() const; - RefCountedPtr parent_; - LoadBalancingPolicy* child_ = nullptr; }; ~GrpcLb(); @@ -334,7 +329,7 @@ class GrpcLb : public LoadBalancingPolicy { grpc_channel_args* CreateChildPolicyArgsLocked( bool is_backend_from_grpclb_load_balancer); OrphanablePtr CreateChildPolicyLocked( - const char* name, const grpc_channel_args* args); + const grpc_channel_args* args); void CreateOrUpdateChildPolicyLocked(); // Who the client is trying to communicate with. @@ -385,9 +380,6 @@ class GrpcLb : public LoadBalancingPolicy { // The child policy to use for the backends. OrphanablePtr child_policy_; - // When switching child policies, the new policy will be stored here - // until it reports READY, at which point it will be moved to child_policy_. - OrphanablePtr pending_child_policy_; // The child policy config. RefCountedPtr child_policy_config_; // Child policy in state READY. @@ -629,46 +621,15 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) { // GrpcLb::Helper // -bool GrpcLb::Helper::CalledByPendingChild() const { - GPR_ASSERT(child_ != nullptr); - return child_ == parent_->pending_child_policy_.get(); -} - -bool GrpcLb::Helper::CalledByCurrentChild() const { - GPR_ASSERT(child_ != nullptr); - return child_ == parent_->child_policy_.get(); -} - RefCountedPtr GrpcLb::Helper::CreateSubchannel( const grpc_channel_args& args) { - if (parent_->shutting_down_ || - (!CalledByPendingChild() && !CalledByCurrentChild())) { - return nullptr; - } + if (parent_->shutting_down_) return nullptr; return parent_->channel_control_helper()->CreateSubchannel(args); } void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, std::unique_ptr picker) { if (parent_->shutting_down_) return; - // If this request is from the pending child policy, ignore it until - // it reports READY, at which point we swap it into place. - if (CalledByPendingChild()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, - "[grpclb %p helper %p] pending child policy %p reports state=%s", - parent_.get(), this, parent_->pending_child_policy_.get(), - ConnectivityStateName(state)); - } - if (state != GRPC_CHANNEL_READY) return; - grpc_pollset_set_del_pollset_set( - parent_->child_policy_->interested_parties(), - parent_->interested_parties()); - parent_->child_policy_ = std::move(parent_->pending_child_policy_); - } else if (!CalledByCurrentChild()) { - // This request is from an outdated child, so ignore it. - return; - } // Record whether child policy reports READY. parent_->child_policy_ready_ = state == GRPC_CHANNEL_READY; // Enter fallback mode if needed. @@ -721,16 +682,6 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, void GrpcLb::Helper::RequestReresolution() { if (parent_->shutting_down_) return; - const LoadBalancingPolicy* latest_child_policy = - parent_->pending_child_policy_ != nullptr - ? parent_->pending_child_policy_.get() - : parent_->child_policy_.get(); - if (child_ != latest_child_policy) return; - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, - "[grpclb %p] Re-resolution requested from %schild policy (%p).", - parent_.get(), CalledByPendingChild() ? "pending " : "", child_); - } // If we are talking to a balancer, we expect to get updated addresses // from the balancer, so we can ignore the re-resolution request from // the child policy. Otherwise, pass the re-resolution request up to the @@ -742,10 +693,7 @@ void GrpcLb::Helper::RequestReresolution() { } void GrpcLb::Helper::AddTraceEvent(TraceSeverity severity, StringView message) { - if (parent_->shutting_down_ || - (!CalledByPendingChild() && !CalledByCurrentChild())) { - return; - } + if (parent_->shutting_down_) return; parent_->channel_control_helper()->AddTraceEvent(severity, message); } @@ -1411,13 +1359,8 @@ void GrpcLb::ShutdownLocked() { if (child_policy_ != nullptr) { grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), interested_parties()); + child_policy_.reset(); } - if (pending_child_policy_ != nullptr) { - grpc_pollset_set_del_pollset_set( - pending_child_policy_->interested_parties(), interested_parties()); - } - child_policy_.reset(); - pending_child_policy_.reset(); // We destroy the LB channel here instead of in our destructor because // destroying the channel triggers a last callback to // OnBalancerChannelConnectivityChangedLocked(), and we need to be @@ -1439,9 +1382,6 @@ void GrpcLb::ResetBackoffLocked() { if (child_policy_ != nullptr) { child_policy_->ResetBackoffLocked(); } - if (pending_child_policy_ != nullptr) { - pending_child_policy_->ResetBackoffLocked(); - } } void GrpcLb::UpdateLocked(UpdateArgs args) { @@ -1727,25 +1667,17 @@ grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked( } OrphanablePtr GrpcLb::CreateChildPolicyLocked( - const char* name, const grpc_channel_args* args) { - Helper* helper = new Helper(Ref()); + const grpc_channel_args* args) { LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = combiner(); lb_policy_args.args = args; - lb_policy_args.channel_control_helper = - std::unique_ptr(helper); + lb_policy_args.channel_control_helper = absl::make_unique(Ref()); OrphanablePtr lb_policy = - LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( - name, std::move(lb_policy_args)); - if (GPR_UNLIKELY(lb_policy == nullptr)) { - gpr_log(GPR_ERROR, "[grpclb %p] Failure creating child policy %s", this, - name); - return nullptr; - } - helper->set_child(lb_policy.get()); + MakeOrphanable(std::move(lb_policy_args), + &grpc_lb_glb_trace); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "[grpclb %p] Created new child policy %s (%p)", this, - name, lb_policy.get()); + gpr_log(GPR_INFO, "[grpclb %p] Created new child policy handler (%p)", this, + lb_policy.get()); } // Add the gRPC LB's interested_parties pollset_set to that of the newly // created child policy. This will make the child policy progress upon @@ -1776,97 +1708,16 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() { CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer); GPR_ASSERT(update_args.args != nullptr); update_args.config = child_policy_config_; - // If the child policy name changes, we need to create a new child - // policy. When this happens, we leave child_policy_ as-is and store - // the new child policy in pending_child_policy_. Once the new child - // policy transitions into state READY, we swap it into child_policy_, - // replacing the original child policy. So pending_child_policy_ is - // non-null only between when we apply an update that changes the child - // policy name and when the new child reports state READY. - // - // Updates can arrive at any point during this transition. We always - // apply updates relative to the most recently created child policy, - // even if the most recent one is still in pending_child_policy_. This - // is true both when applying the updates to an existing child policy - // and when determining whether we need to create a new policy. - // - // As a result of this, there are several cases to consider here: - // - // 1. We have no existing child policy (i.e., we have started up but - // have not yet received a serverlist from the balancer or gone - // into fallback mode; in this case, both child_policy_ and - // pending_child_policy_ are null). In this case, we create a - // new child policy and store it in child_policy_. - // - // 2. We have an existing child policy and have no pending child policy - // from a previous update (i.e., either there has not been a - // previous update that changed the policy name, or we have already - // finished swapping in the new policy; in this case, child_policy_ - // is non-null but pending_child_policy_ is null). In this case: - // a. If child_policy_->name() equals child_policy_name, then we - // update the existing child policy. - // b. If child_policy_->name() does not equal child_policy_name, - // we create a new policy. The policy will be stored in - // pending_child_policy_ and will later be swapped into - // child_policy_ by the helper when the new child transitions - // into state READY. - // - // 3. We have an existing child policy and have a pending child policy - // from a previous update (i.e., a previous update set - // pending_child_policy_ as per case 2b above and that policy has - // not yet transitioned into state READY and been swapped into - // child_policy_; in this case, both child_policy_ and - // pending_child_policy_ are non-null). In this case: - // a. If pending_child_policy_->name() equals child_policy_name, - // then we update the existing pending child policy. - // b. If pending_child_policy->name() does not equal - // child_policy_name, then we create a new policy. The new - // policy is stored in pending_child_policy_ (replacing the one - // that was there before, which will be immediately shut down) - // and will later be swapped into child_policy_ by the helper - // when the new child transitions into state READY. - const char* child_policy_name = child_policy_config_ == nullptr - ? "round_robin" - : child_policy_config_->name(); - const bool create_policy = - // case 1 - child_policy_ == nullptr || - // case 2b - (pending_child_policy_ == nullptr && - strcmp(child_policy_->name(), child_policy_name) != 0) || - // case 3b - (pending_child_policy_ != nullptr && - strcmp(pending_child_policy_->name(), child_policy_name) != 0); - LoadBalancingPolicy* policy_to_update = nullptr; - if (create_policy) { - // Cases 1, 2b, and 3b: create a new child policy. - // If child_policy_ is null, we set it (case 1), else we set - // pending_child_policy_ (cases 2b and 3b). - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this, - child_policy_ == nullptr ? "" : "pending ", child_policy_name); - } - // Swap the policy into place. - auto& lb_policy = - child_policy_ == nullptr ? child_policy_ : pending_child_policy_; - lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args); - policy_to_update = lb_policy.get(); - } else { - // Cases 2a and 3a: update an existing policy. - // If we have a pending child policy, send the update to the pending - // policy (case 3a), else send it to the current policy (case 2a). - policy_to_update = pending_child_policy_ != nullptr - ? pending_child_policy_.get() - : child_policy_.get(); + // Create child policy if needed. + if (child_policy_ == nullptr) { + child_policy_ = CreateChildPolicyLocked(update_args.args); } - GPR_ASSERT(policy_to_update != nullptr); // Update the policy. if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "[grpclb %p] Updating %schild policy %p", this, - policy_to_update == pending_child_policy_.get() ? "pending " : "", - policy_to_update); + gpr_log(GPR_INFO, "[grpclb %p] Updating child policy handler %p", this, + child_policy_.get()); } - policy_to_update->UpdateLocked(std::move(update_args)); + child_policy_->UpdateLocked(std::move(update_args)); } // @@ -1889,21 +1740,29 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory { return MakeRefCounted(nullptr); } std::vector error_list; - RefCountedPtr child_policy; + Json child_policy_config_json_tmp; + const Json* child_policy_config_json; auto it = json.object_value().find("childPolicy"); - if (it != json.object_value().end()) { - grpc_error* parse_error = GRPC_ERROR_NONE; - child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( - it->second, &parse_error); - if (parse_error != GRPC_ERROR_NONE) { - std::vector child_errors; - child_errors.push_back(parse_error); - error_list.push_back( - GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors)); - } + if (it == json.object_value().end()) { + child_policy_config_json_tmp = Json::Array{Json::Object{ + {"round_robin", Json::Object()}, + }}; + child_policy_config_json = &child_policy_config_json_tmp; + } else { + child_policy_config_json = &it->second; + } + grpc_error* parse_error = GRPC_ERROR_NONE; + RefCountedPtr child_policy_config = + LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( + *child_policy_config_json, &parse_error); + if (parse_error != GRPC_ERROR_NONE) { + std::vector child_errors; + child_errors.push_back(parse_error); + error_list.push_back( + GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors)); } if (error_list.empty()) { - return MakeRefCounted(std::move(child_policy)); + return MakeRefCounted(std::move(child_policy_config)); } else { *error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list); return nullptr; diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 788a90bf1f6..b8164e97cc8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -32,6 +32,7 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/lb_policy.h" +#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" @@ -198,14 +199,8 @@ class XdsLb : public LoadBalancingPolicy { void RequestReresolution() override; void AddTraceEvent(TraceSeverity severity, StringView message) override; - void set_child(LoadBalancingPolicy* child) { child_ = child; } - private: - bool CalledByPendingFallback() const; - bool CalledByCurrentFallback() const; - RefCountedPtr parent_; - LoadBalancingPolicy* child_ = nullptr; }; // Each LocalityMap holds a ref to the XdsLb. @@ -262,19 +257,14 @@ class XdsLb : public LoadBalancingPolicy { // client, which is a watch-based API. void RequestReresolution() override {} void AddTraceEvent(TraceSeverity severity, StringView message) override; - void set_child(LoadBalancingPolicy* child) { child_ = child; } private: - bool CalledByPendingChild() const; - bool CalledByCurrentChild() const; - RefCountedPtr locality_; - LoadBalancingPolicy* child_ = nullptr; }; // Methods for dealing with the child policy. OrphanablePtr CreateChildPolicyLocked( - const char* name, const grpc_channel_args* args); + const grpc_channel_args* args); grpc_channel_args* CreateChildPolicyArgsLocked( const grpc_channel_args* args); @@ -291,7 +281,6 @@ class XdsLb : public LoadBalancingPolicy { RefCountedPtr name_; RefCountedPtr stats_; OrphanablePtr child_policy_; - OrphanablePtr pending_child_policy_; RefCountedPtr picker_wrapper_; RefCountedPtr load_reporting_picker_; grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE; @@ -403,7 +392,7 @@ class XdsLb : public LoadBalancingPolicy { static void OnFallbackTimerLocked(void* arg, grpc_error* error); void UpdateFallbackPolicyLocked(); OrphanablePtr CreateFallbackPolicyLocked( - const char* name, const grpc_channel_args* args); + const grpc_channel_args* args); void MaybeExitFallbackMode(); // Server name from target URI. @@ -445,7 +434,6 @@ class XdsLb : public LoadBalancingPolicy { // Non-null iff we are in fallback mode. OrphanablePtr fallback_policy_; - OrphanablePtr pending_fallback_policy_; const grpc_millis locality_retention_interval_ms_; const grpc_millis locality_map_failover_timeout_ms_; @@ -539,71 +527,26 @@ XdsLb::PickResult XdsLb::LocalityPicker::PickFromLocality(const uint32_t key, // XdsLb::FallbackHelper // -bool XdsLb::FallbackHelper::CalledByPendingFallback() const { - GPR_ASSERT(child_ != nullptr); - return child_ == parent_->pending_fallback_policy_.get(); -} - -bool XdsLb::FallbackHelper::CalledByCurrentFallback() const { - GPR_ASSERT(child_ != nullptr); - return child_ == parent_->fallback_policy_.get(); -} - RefCountedPtr XdsLb::FallbackHelper::CreateSubchannel( const grpc_channel_args& args) { - if (parent_->shutting_down_ || - (!CalledByPendingFallback() && !CalledByCurrentFallback())) { - return nullptr; - } + if (parent_->shutting_down_) return nullptr; return parent_->channel_control_helper()->CreateSubchannel(args); } void XdsLb::FallbackHelper::UpdateState( grpc_connectivity_state state, std::unique_ptr picker) { if (parent_->shutting_down_) return; - // If this request is from the pending fallback policy, ignore it until - // it reports READY, at which point we swap it into place. - if (CalledByPendingFallback()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { - gpr_log( - GPR_INFO, - "[xdslb %p helper %p] pending fallback policy %p reports state=%s", - parent_.get(), this, parent_->pending_fallback_policy_.get(), - ConnectivityStateName(state)); - } - if (state != GRPC_CHANNEL_READY) return; - grpc_pollset_set_del_pollset_set( - parent_->fallback_policy_->interested_parties(), - parent_->interested_parties()); - parent_->fallback_policy_ = std::move(parent_->pending_fallback_policy_); - } else if (!CalledByCurrentFallback()) { - // This request is from an outdated fallback policy, so ignore it. - return; - } parent_->channel_control_helper()->UpdateState(state, std::move(picker)); } void XdsLb::FallbackHelper::RequestReresolution() { if (parent_->shutting_down_) return; - const LoadBalancingPolicy* latest_fallback_policy = - parent_->pending_fallback_policy_ != nullptr - ? parent_->pending_fallback_policy_.get() - : parent_->fallback_policy_.get(); - if (child_ != latest_fallback_policy) return; - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { - gpr_log(GPR_INFO, - "[xdslb %p] Re-resolution requested from the fallback policy (%p).", - parent_.get(), child_); - } parent_->channel_control_helper()->RequestReresolution(); } void XdsLb::FallbackHelper::AddTraceEvent(TraceSeverity severity, StringView message) { - if (parent_->shutting_down_ || - (!CalledByPendingFallback() && !CalledByCurrentFallback())) { - return; - } + if (parent_->shutting_down_) return; parent_->channel_control_helper()->AddTraceEvent(severity, message); } @@ -737,13 +680,8 @@ void XdsLb::ShutdownLocked() { if (fallback_policy_ != nullptr) { grpc_pollset_set_del_pollset_set(fallback_policy_->interested_parties(), interested_parties()); + fallback_policy_.reset(); } - if (pending_fallback_policy_ != nullptr) { - grpc_pollset_set_del_pollset_set( - pending_fallback_policy_->interested_parties(), interested_parties()); - } - fallback_policy_.reset(); - pending_fallback_policy_.reset(); // Cancel the endpoint watch here instead of in our dtor if we are using the // XdsResolver, because the watcher holds a ref to us and we might not be // destroying the Xds client leading to a situation where the Xds lb policy is @@ -771,9 +709,6 @@ void XdsLb::ResetBackoffLocked() { if (fallback_policy_ != nullptr) { fallback_policy_->ResetBackoffLocked(); } - if (pending_fallback_policy_ != nullptr) { - pending_fallback_policy_->ResetBackoffLocked(); - } } void XdsLb::UpdateLocked(UpdateArgs args) { @@ -896,127 +831,37 @@ void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { void XdsLb::UpdateFallbackPolicyLocked() { if (shutting_down_) return; - // Construct update args. + // Create policy if needed. + if (fallback_policy_ == nullptr) { + fallback_policy_ = CreateFallbackPolicyLocked(args_); + GPR_ASSERT(fallback_policy_ != nullptr); + } + // Perform update. UpdateArgs update_args; update_args.addresses = fallback_backend_addresses_; update_args.config = config_->fallback_policy(); update_args.args = grpc_channel_args_copy(args_); - // If the child policy name changes, we need to create a new child - // policy. When this happens, we leave child_policy_ as-is and store - // the new child policy in pending_child_policy_. Once the new child - // policy transitions into state READY, we swap it into child_policy_, - // replacing the original child policy. So pending_child_policy_ is - // non-null only between when we apply an update that changes the child - // policy name and when the new child reports state READY. - // - // Updates can arrive at any point during this transition. We always - // apply updates relative to the most recently created child policy, - // even if the most recent one is still in pending_child_policy_. This - // is true both when applying the updates to an existing child policy - // and when determining whether we need to create a new policy. - // - // As a result of this, there are several cases to consider here: - // - // 1. We have no existing child policy (i.e., we have started up but - // have not yet received a serverlist from the balancer or gone - // into fallback mode; in this case, both child_policy_ and - // pending_child_policy_ are null). In this case, we create a - // new child policy and store it in child_policy_. - // - // 2. We have an existing child policy and have no pending child policy - // from a previous update (i.e., either there has not been a - // previous update that changed the policy name, or we have already - // finished swapping in the new policy; in this case, child_policy_ - // is non-null but pending_child_policy_ is null). In this case: - // a. If child_policy_->name() equals child_policy_name, then we - // update the existing child policy. - // b. If child_policy_->name() does not equal child_policy_name, - // we create a new policy. The policy will be stored in - // pending_child_policy_ and will later be swapped into - // child_policy_ by the helper when the new child transitions - // into state READY. - // - // 3. We have an existing child policy and have a pending child policy - // from a previous update (i.e., a previous update set - // pending_child_policy_ as per case 2b above and that policy has - // not yet transitioned into state READY and been swapped into - // child_policy_; in this case, both child_policy_ and - // pending_child_policy_ are non-null). In this case: - // a. If pending_child_policy_->name() equals child_policy_name, - // then we update the existing pending child policy. - // b. If pending_child_policy->name() does not equal - // child_policy_name, then we create a new policy. The new - // policy is stored in pending_child_policy_ (replacing the one - // that was there before, which will be immediately shut down) - // and will later be swapped into child_policy_ by the helper - // when the new child transitions into state READY. - const char* fallback_policy_name = update_args.config == nullptr - ? "round_robin" - : update_args.config->name(); - const bool create_policy = - // case 1 - fallback_policy_ == nullptr || - // case 2b - (pending_fallback_policy_ == nullptr && - strcmp(fallback_policy_->name(), fallback_policy_name) != 0) || - // case 3b - (pending_fallback_policy_ != nullptr && - strcmp(pending_fallback_policy_->name(), fallback_policy_name) != 0); - LoadBalancingPolicy* policy_to_update = nullptr; - if (create_policy) { - // Cases 1, 2b, and 3b: create a new child policy. - // If child_policy_ is null, we set it (case 1), else we set - // pending_child_policy_ (cases 2b and 3b). - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { - gpr_log(GPR_INFO, "[xdslb %p] Creating new %sfallback policy %s", this, - fallback_policy_ == nullptr ? "" : "pending ", - fallback_policy_name); - } - auto& lb_policy = fallback_policy_ == nullptr ? fallback_policy_ - : pending_fallback_policy_; - lb_policy = - CreateFallbackPolicyLocked(fallback_policy_name, update_args.args); - policy_to_update = lb_policy.get(); - } else { - // Cases 2a and 3a: update an existing policy. - // If we have a pending child policy, send the update to the pending - // policy (case 3a), else send it to the current policy (case 2a). - policy_to_update = pending_fallback_policy_ != nullptr - ? pending_fallback_policy_.get() - : fallback_policy_.get(); - } - GPR_ASSERT(policy_to_update != nullptr); - // Update the policy. if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { - gpr_log( - GPR_INFO, "[xdslb %p] Updating %sfallback policy %p", this, - policy_to_update == pending_fallback_policy_.get() ? "pending " : "", - policy_to_update); + gpr_log(GPR_INFO, "[xdslb %p] Updating fallback child policy handler %p", + this, fallback_policy_.get()); } - policy_to_update->UpdateLocked(std::move(update_args)); + fallback_policy_->UpdateLocked(std::move(update_args)); } OrphanablePtr XdsLb::CreateFallbackPolicyLocked( - const char* name, const grpc_channel_args* args) { - FallbackHelper* helper = - new FallbackHelper(Ref(DEBUG_LOCATION, "FallbackHelper")); + const grpc_channel_args* args) { LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = combiner(); lb_policy_args.args = args; lb_policy_args.channel_control_helper = - std::unique_ptr(helper); + absl::make_unique(Ref(DEBUG_LOCATION, "FallbackHelper")); OrphanablePtr lb_policy = - LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( - name, std::move(lb_policy_args)); - if (GPR_UNLIKELY(lb_policy == nullptr)) { - gpr_log(GPR_ERROR, "[xdslb %p] Failure creating fallback policy %s", this, - name); - return nullptr; - } - helper->set_child(lb_policy.get()); + MakeOrphanable(std::move(lb_policy_args), + &grpc_lb_xds_trace); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { - gpr_log(GPR_INFO, "[xdslb %p] Created new fallback policy %s (%p)", this, - name, lb_policy.get()); + gpr_log(GPR_INFO, + "[xdslb %p] Created new fallback child policy handler (%p)", this, + lb_policy.get()); } // Add the xDS's interested_parties pollset_set to that of the newly created // child policy. This will make the child policy progress upon activity on xDS @@ -1030,7 +875,6 @@ void XdsLb::MaybeExitFallbackMode() { if (fallback_policy_ == nullptr) return; gpr_log(GPR_INFO, "[xdslb %p] Exiting fallback mode", this); fallback_policy_.reset(); - pending_fallback_policy_.reset(); } // @@ -1513,27 +1357,19 @@ grpc_channel_args* XdsLb::LocalityMap::Locality::CreateChildPolicyArgsLocked( OrphanablePtr XdsLb::LocalityMap::Locality::CreateChildPolicyLocked( - const char* name, const grpc_channel_args* args) { - Helper* helper = new Helper(this->Ref(DEBUG_LOCATION, "Helper")); + const grpc_channel_args* args) { LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = xds_policy()->combiner(); lb_policy_args.args = args; lb_policy_args.channel_control_helper = - std::unique_ptr(helper); + absl::make_unique(this->Ref(DEBUG_LOCATION, "Helper")); OrphanablePtr lb_policy = - LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( - name, std::move(lb_policy_args)); - if (GPR_UNLIKELY(lb_policy == nullptr)) { - gpr_log(GPR_ERROR, - "[xdslb %p] Locality %p %s: failure creating child policy %s", - xds_policy(), this, name_->AsHumanReadableString(), name); - return nullptr; - } - helper->set_child(lb_policy.get()); + MakeOrphanable(std::move(lb_policy_args), + &grpc_lb_xds_trace); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, - "[xdslb %p] Locality %p %s: Created new child policy %s (%p)", - xds_policy(), this, name_->AsHumanReadableString(), name, + "[xdslb %p] Locality %p %s: Created new child policy handler (%p)", + xds_policy(), this, name_->AsHumanReadableString(), lb_policy.get()); } // Add the xDS's interested_parties pollset_set to that of the newly created @@ -1560,101 +1396,19 @@ void XdsLb::LocalityMap::Locality::UpdateLocked(uint32_t locality_weight, update_args.addresses = std::move(serverlist); update_args.config = xds_policy()->config_->child_policy(); update_args.args = CreateChildPolicyArgsLocked(xds_policy()->args_); - // If the child policy name changes, we need to create a new child - // policy. When this happens, we leave child_policy_ as-is and store - // the new child policy in pending_child_policy_. Once the new child - // policy transitions into state READY, we swap it into child_policy_, - // replacing the original child policy. So pending_child_policy_ is - // non-null only between when we apply an update that changes the child - // policy name and when the new child reports state READY. - // - // Updates can arrive at any point during this transition. We always - // apply updates relative to the most recently created child policy, - // even if the most recent one is still in pending_child_policy_. This - // is true both when applying the updates to an existing child policy - // and when determining whether we need to create a new policy. - // - // As a result of this, there are several cases to consider here: - // - // 1. We have no existing child policy (i.e., we have started up but - // have not yet received a serverlist from the balancer or gone - // into fallback mode; in this case, both child_policy_ and - // pending_child_policy_ are null). In this case, we create a - // new child policy and store it in child_policy_. - // - // 2. We have an existing child policy and have no pending child policy - // from a previous update (i.e., either there has not been a - // previous update that changed the policy name, or we have already - // finished swapping in the new policy; in this case, child_policy_ - // is non-null but pending_child_policy_ is null). In this case: - // a. If child_policy_->name() equals child_policy_name, then we - // update the existing child policy. - // b. If child_policy_->name() does not equal child_policy_name, - // we create a new policy. The policy will be stored in - // pending_child_policy_ and will later be swapped into - // child_policy_ by the helper when the new child transitions - // into state READY. - // - // 3. We have an existing child policy and have a pending child policy - // from a previous update (i.e., a previous update set - // pending_child_policy_ as per case 2b above and that policy has - // not yet transitioned into state READY and been swapped into - // child_policy_; in this case, both child_policy_ and - // pending_child_policy_ are non-null). In this case: - // a. If pending_child_policy_->name() equals child_policy_name, - // then we update the existing pending child policy. - // b. If pending_child_policy->name() does not equal - // child_policy_name, then we create a new policy. The new - // policy is stored in pending_child_policy_ (replacing the one - // that was there before, which will be immediately shut down) - // and will later be swapped into child_policy_ by the helper - // when the new child transitions into state READY. - // TODO(juanlishen): If the child policy is not configured via service config, - // use whatever algorithm is specified by the balancer. - const char* child_policy_name = update_args.config == nullptr - ? "round_robin" - : update_args.config->name(); - const bool create_policy = - // case 1 - child_policy_ == nullptr || - // case 2b - (pending_child_policy_ == nullptr && - strcmp(child_policy_->name(), child_policy_name) != 0) || - // case 3b - (pending_child_policy_ != nullptr && - strcmp(pending_child_policy_->name(), child_policy_name) != 0); - LoadBalancingPolicy* policy_to_update = nullptr; - if (create_policy) { - // Cases 1, 2b, and 3b: create a new child policy. - // If child_policy_ is null, we set it (case 1), else we set - // pending_child_policy_ (cases 2b and 3b). - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { - gpr_log(GPR_INFO, - "[xdslb %p] Locality %p %s: Creating new %schild policy %s", - xds_policy(), this, name_->AsHumanReadableString(), - child_policy_ == nullptr ? "" : "pending ", child_policy_name); - } - auto& lb_policy = - child_policy_ == nullptr ? child_policy_ : pending_child_policy_; - lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args); - policy_to_update = lb_policy.get(); - } else { - // Cases 2a and 3a: update an existing policy. - // If we have a pending child policy, send the update to the pending - // policy (case 3a), else send it to the current policy (case 2a). - policy_to_update = pending_child_policy_ != nullptr - ? pending_child_policy_.get() - : child_policy_.get(); - } - GPR_ASSERT(policy_to_update != nullptr); + // Create child policy if needed. + if (child_policy_ == nullptr) { + child_policy_ = CreateChildPolicyLocked(update_args.args); + GPR_ASSERT(child_policy_ != nullptr); + } // Update the policy. if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { - gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: Updating %schild policy %p", + gpr_log(GPR_INFO, + "[xdslb %p] Locality %p %s: Updating child policy handler %p", xds_policy(), this, name_->AsHumanReadableString(), - policy_to_update == pending_child_policy_.get() ? "pending " : "", - policy_to_update); + child_policy_.get()); } - policy_to_update->UpdateLocked(std::move(update_args)); + child_policy_->UpdateLocked(std::move(update_args)); } void XdsLb::LocalityMap::Locality::ShutdownLocked() { @@ -1668,12 +1422,6 @@ void XdsLb::LocalityMap::Locality::ShutdownLocked() { grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), xds_policy()->interested_parties()); child_policy_.reset(); - if (pending_child_policy_ != nullptr) { - grpc_pollset_set_del_pollset_set( - pending_child_policy_->interested_parties(), - xds_policy()->interested_parties()); - pending_child_policy_.reset(); - } // Drop our ref to the child's picker, in case it's holding a ref to // the child. load_reporting_picker_.reset(); @@ -1686,9 +1434,6 @@ void XdsLb::LocalityMap::Locality::ShutdownLocked() { void XdsLb::LocalityMap::Locality::ResetBackoffLocked() { child_policy_->ResetBackoffLocked(); - if (pending_child_policy_ != nullptr) { - pending_child_policy_->ResetBackoffLocked(); - } } void XdsLb::LocalityMap::Locality::Orphan() { @@ -1736,23 +1481,10 @@ void XdsLb::LocalityMap::Locality::OnDelayedRemovalTimerLocked( // XdsLb::LocalityMap::Locality::Helper // -bool XdsLb::LocalityMap::Locality::Helper::CalledByPendingChild() const { - GPR_ASSERT(child_ != nullptr); - return child_ == locality_->pending_child_policy_.get(); -} - -bool XdsLb::LocalityMap::Locality::Helper::CalledByCurrentChild() const { - GPR_ASSERT(child_ != nullptr); - return child_ == locality_->child_policy_.get(); -} - RefCountedPtr XdsLb::LocalityMap::Locality::Helper::CreateSubchannel( const grpc_channel_args& args) { - if (locality_->xds_policy()->shutting_down_ || - (!CalledByPendingChild() && !CalledByCurrentChild())) { - return nullptr; - } + if (locality_->xds_policy()->shutting_down_) return nullptr; return locality_->xds_policy()->channel_control_helper()->CreateSubchannel( args); } @@ -1760,25 +1492,6 @@ XdsLb::LocalityMap::Locality::Helper::CreateSubchannel( void XdsLb::LocalityMap::Locality::Helper::UpdateState( grpc_connectivity_state state, std::unique_ptr picker) { if (locality_->xds_policy()->shutting_down_) return; - // If this request is from the pending child policy, ignore it until - // it reports READY, at which point we swap it into place. - if (CalledByPendingChild()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { - gpr_log(GPR_INFO, - "[xdslb %p helper %p] pending child policy %p reports state=%s", - locality_->xds_policy(), this, - locality_->pending_child_policy_.get(), - ConnectivityStateName(state)); - } - if (state != GRPC_CHANNEL_READY) return; - grpc_pollset_set_del_pollset_set( - locality_->child_policy_->interested_parties(), - locality_->xds_policy()->interested_parties()); - locality_->child_policy_ = std::move(locality_->pending_child_policy_); - } else if (!CalledByCurrentChild()) { - // This request is from an outdated child, so ignore it. - return; - } // Cache the state and picker in the locality. locality_->connectivity_state_ = state; locality_->picker_wrapper_ = @@ -1789,10 +1502,7 @@ void XdsLb::LocalityMap::Locality::Helper::UpdateState( void XdsLb::LocalityMap::Locality::Helper::AddTraceEvent(TraceSeverity severity, StringView message) { - if (locality_->xds_policy()->shutting_down_ || - (!CalledByPendingChild() && !CalledByCurrentChild())) { - return; - } + if (locality_->xds_policy()->shutting_down_) return; locality_->xds_policy()->channel_control_helper()->AddTraceEvent(severity, message); } @@ -1823,34 +1533,48 @@ class XdsFactory : public LoadBalancingPolicyFactory { } std::vector error_list; // Child policy. - RefCountedPtr child_policy; + Json json_tmp; + const Json* child_policy_json; auto it = json.object_value().find("childPolicy"); - if (it != json.object_value().end()) { - grpc_error* parse_error = GRPC_ERROR_NONE; - child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( - it->second, &parse_error); - if (child_policy == nullptr) { - GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); - std::vector child_errors; - child_errors.push_back(parse_error); - error_list.push_back( - GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors)); - } + if (it == json.object_value().end()) { + json_tmp = Json::Array{Json::Object{ + {"round_robin", Json::Object()}, + }}; + child_policy_json = &json_tmp; + } else { + child_policy_json = &it->second; + } + grpc_error* parse_error = GRPC_ERROR_NONE; + RefCountedPtr child_policy = + LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( + *child_policy_json, &parse_error); + if (child_policy == nullptr) { + GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); + std::vector child_errors; + child_errors.push_back(parse_error); + error_list.push_back( + GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors)); } // Fallback policy. - RefCountedPtr fallback_policy; + const Json* fallback_policy_json; it = json.object_value().find("fallbackPolicy"); - if (it != json.object_value().end()) { - grpc_error* parse_error = GRPC_ERROR_NONE; - fallback_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( - it->second, &parse_error); - if (fallback_policy == nullptr) { - GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); - std::vector child_errors; - child_errors.push_back(parse_error); - error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR( - "field:fallbackPolicy", &child_errors)); - } + if (it == json.object_value().end()) { + json_tmp = Json::Array{Json::Object{ + {"round_robin", Json::Object()}, + }}; + fallback_policy_json = &json_tmp; + } else { + fallback_policy_json = &it->second; + } + RefCountedPtr fallback_policy = + LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( + *fallback_policy_json, &parse_error); + if (fallback_policy == nullptr) { + GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); + std::vector child_errors; + child_errors.push_back(parse_error); + error_list.push_back( + GRPC_ERROR_CREATE_FROM_VECTOR("field:fallbackPolicy", &child_errors)); } // EDS service name. const char* eds_service_name = nullptr; diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/src/core/ext/filters/client_channel/resolving_lb_policy.cc index 32ccf60b30c..b05590da45e 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.cc +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.cc @@ -33,6 +33,7 @@ #include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" +#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" @@ -109,67 +110,31 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper RefCountedPtr CreateSubchannel( const grpc_channel_args& args) override { if (parent_->resolver_ == nullptr) return nullptr; // Shutting down. - if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr; return parent_->channel_control_helper()->CreateSubchannel(args); } void UpdateState(grpc_connectivity_state state, std::unique_ptr picker) override { if (parent_->resolver_ == nullptr) return; // Shutting down. - // If this request is from the pending child policy, ignore it until - // it reports READY, at which point we swap it into place. - if (CalledByPendingChild()) { - if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) { - gpr_log(GPR_INFO, - "resolving_lb=%p helper=%p: pending child policy %p reports " - "state=%s", - parent_.get(), this, child_, ConnectivityStateName(state)); - } - if (state != GRPC_CHANNEL_READY) return; - grpc_pollset_set_del_pollset_set( - parent_->lb_policy_->interested_parties(), - parent_->interested_parties()); - parent_->lb_policy_ = std::move(parent_->pending_lb_policy_); - } else if (!CalledByCurrentChild()) { - // This request is from an outdated child, so ignore it. - return; - } parent_->channel_control_helper()->UpdateState(state, std::move(picker)); } void RequestReresolution() override { - // If there is a pending child policy, ignore re-resolution requests - // from the current child policy (or any outdated child). - if (parent_->pending_lb_policy_ != nullptr && !CalledByPendingChild()) { - return; - } + if (parent_->resolver_ == nullptr) return; // Shutting down. if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) { gpr_log(GPR_INFO, "resolving_lb=%p: started name re-resolving", parent_.get()); } - if (parent_->resolver_ != nullptr) { - parent_->resolver_->RequestReresolutionLocked(); - } + parent_->resolver_->RequestReresolutionLocked(); } - void AddTraceEvent(TraceSeverity /*severity*/, - StringView /*message*/) override {} - - void set_child(LoadBalancingPolicy* child) { child_ = child; } - - private: - bool CalledByPendingChild() const { - GPR_ASSERT(child_ != nullptr); - return child_ == parent_->pending_lb_policy_.get(); + void AddTraceEvent(TraceSeverity severity, StringView message) override { + if (parent_->resolver_ == nullptr) return; // Shutting down. + parent_->channel_control_helper()->AddTraceEvent(severity, message); } - bool CalledByCurrentChild() const { - GPR_ASSERT(child_ != nullptr); - return child_ == parent_->lb_policy_.get(); - }; - + private: RefCountedPtr parent_; - LoadBalancingPolicy* child_ = nullptr; }; // @@ -217,23 +182,11 @@ void ResolvingLoadBalancingPolicy::ShutdownLocked() { interested_parties()); lb_policy_.reset(); } - if (pending_lb_policy_ != nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { - gpr_log(GPR_INFO, "resolving_lb=%p: shutting down pending lb_policy=%p", - this, pending_lb_policy_.get()); - } - grpc_pollset_set_del_pollset_set(pending_lb_policy_->interested_parties(), - interested_parties()); - pending_lb_policy_.reset(); - } } } void ResolvingLoadBalancingPolicy::ExitIdleLocked() { - if (lb_policy_ != nullptr) { - lb_policy_->ExitIdleLocked(); - if (pending_lb_policy_ != nullptr) pending_lb_policy_->ExitIdleLocked(); - } + if (lb_policy_ != nullptr) lb_policy_->ExitIdleLocked(); } void ResolvingLoadBalancingPolicy::ResetBackoffLocked() { @@ -242,7 +195,6 @@ void ResolvingLoadBalancingPolicy::ResetBackoffLocked() { resolver_->RequestReresolutionLocked(); } if (lb_policy_ != nullptr) lb_policy_->ResetBackoffLocked(); - if (pending_lb_policy_ != nullptr) pending_lb_policy_->ResetBackoffLocked(); } void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) { @@ -269,132 +221,42 @@ void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) { void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked( RefCountedPtr lb_policy_config, - Resolver::Result result, TraceStringVector* trace_strings) { - // If the child policy name changes, we need to create a new child - // policy. When this happens, we leave child_policy_ as-is and store - // the new child policy in pending_child_policy_. Once the new child - // policy transitions into state READY, we swap it into child_policy_, - // replacing the original child policy. So pending_child_policy_ is - // non-null only between when we apply an update that changes the child - // policy name and when the new child reports state READY. - // - // Updates can arrive at any point during this transition. We always - // apply updates relative to the most recently created child policy, - // even if the most recent one is still in pending_child_policy_. This - // is true both when applying the updates to an existing child policy - // and when determining whether we need to create a new policy. - // - // As a result of this, there are several cases to consider here: - // - // 1. We have no existing child policy (i.e., we have started up but - // have not yet received a serverlist from the balancer or gone - // into fallback mode; in this case, both child_policy_ and - // pending_child_policy_ are null). In this case, we create a - // new child policy and store it in child_policy_. - // - // 2. We have an existing child policy and have no pending child policy - // from a previous update (i.e., either there has not been a - // previous update that changed the policy name, or we have already - // finished swapping in the new policy; in this case, child_policy_ - // is non-null but pending_child_policy_ is null). In this case: - // a. If child_policy_->name() equals child_policy_name, then we - // update the existing child policy. - // b. If child_policy_->name() does not equal child_policy_name, - // we create a new policy. The policy will be stored in - // pending_child_policy_ and will later be swapped into - // child_policy_ by the helper when the new child transitions - // into state READY. - // - // 3. We have an existing child policy and have a pending child policy - // from a previous update (i.e., a previous update set - // pending_child_policy_ as per case 2b above and that policy has - // not yet transitioned into state READY and been swapped into - // child_policy_; in this case, both child_policy_ and - // pending_child_policy_ are non-null). In this case: - // a. If pending_child_policy_->name() equals child_policy_name, - // then we update the existing pending child policy. - // b. If pending_child_policy->name() does not equal - // child_policy_name, then we create a new policy. The new - // policy is stored in pending_child_policy_ (replacing the one - // that was there before, which will be immediately shut down) - // and will later be swapped into child_policy_ by the helper - // when the new child transitions into state READY. - const char* lb_policy_name = lb_policy_config->name(); - const bool create_policy = - // case 1 - lb_policy_ == nullptr || - // case 2b - (pending_lb_policy_ == nullptr && - strcmp(lb_policy_->name(), lb_policy_name) != 0) || - // case 3b - (pending_lb_policy_ != nullptr && - strcmp(pending_lb_policy_->name(), lb_policy_name) != 0); - LoadBalancingPolicy* policy_to_update = nullptr; - if (create_policy) { - // Cases 1, 2b, and 3b: create a new child policy. - // If lb_policy_ is null, we set it (case 1), else we set - // pending_lb_policy_ (cases 2b and 3b). - if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { - gpr_log(GPR_INFO, "resolving_lb=%p: Creating new %schild policy %s", this, - lb_policy_ == nullptr ? "" : "pending ", lb_policy_name); - } - auto& lb_policy = lb_policy_ == nullptr ? lb_policy_ : pending_lb_policy_; - lb_policy = - CreateLbPolicyLocked(lb_policy_name, *result.args, trace_strings); - policy_to_update = lb_policy.get(); - } else { - // Cases 2a and 3a: update an existing policy. - // If we have a pending child policy, send the update to the pending - // policy (case 3a), else send it to the current policy (case 2a). - policy_to_update = pending_lb_policy_ != nullptr ? pending_lb_policy_.get() - : lb_policy_.get(); - } - GPR_ASSERT(policy_to_update != nullptr); - // Update the policy. - if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { - gpr_log(GPR_INFO, "resolving_lb=%p: Updating %schild policy %p", this, - policy_to_update == pending_lb_policy_.get() ? "pending " : "", - policy_to_update); - } + Resolver::Result result) { + // Construct update. UpdateArgs update_args; update_args.addresses = std::move(result.addresses); update_args.config = std::move(lb_policy_config); // TODO(roth): Once channel args is converted to C++, use std::move() here. update_args.args = result.args; result.args = nullptr; - policy_to_update->UpdateLocked(std::move(update_args)); + // Create policy if needed. + if (lb_policy_ == nullptr) { + lb_policy_ = CreateLbPolicyLocked(*update_args.args); + } + // Update the policy. + if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { + gpr_log(GPR_INFO, "resolving_lb=%p: Updating child policy %p", this, + lb_policy_.get()); + } + lb_policy_->UpdateLocked(std::move(update_args)); } // Creates a new LB policy. // Updates trace_strings to indicate what was done. OrphanablePtr ResolvingLoadBalancingPolicy::CreateLbPolicyLocked( - const char* lb_policy_name, const grpc_channel_args& args, - TraceStringVector* trace_strings) { - ResolvingControlHelper* helper = new ResolvingControlHelper(Ref()); + const grpc_channel_args& args) { LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = combiner(); lb_policy_args.channel_control_helper = - std::unique_ptr(helper); + absl::make_unique(Ref()); lb_policy_args.args = &args; OrphanablePtr lb_policy = - LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( - lb_policy_name, std::move(lb_policy_args)); - if (GPR_UNLIKELY(lb_policy == nullptr)) { - gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); - char* str; - gpr_asprintf(&str, "Could not create LB policy \"%s\"", lb_policy_name); - trace_strings->push_back(str); - return nullptr; - } - helper->set_child(lb_policy.get()); + MakeOrphanable(std::move(lb_policy_args), tracer_); if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { - gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy \"%s\" (%p)", - this, lb_policy_name, lb_policy.get()); + gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy %p", this, + lb_policy.get()); } - char* str; - gpr_asprintf(&str, "Created new LB policy \"%s\"", lb_policy_name); - trace_strings->push_back(str); grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), interested_parties()); return lb_policy; @@ -476,8 +338,8 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked( } if (lb_policy_config != nullptr) { // Create or update LB policy, as needed. - CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config), std::move(result), - &trace_strings); + CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config), + std::move(result)); } // Add channel trace event. if (service_config_changed) { diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.h b/src/core/ext/filters/client_channel/resolving_lb_policy.h index 04c0d2d7265..ba53368f219 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.h +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.h @@ -92,10 +92,9 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { void OnResolverError(grpc_error* error); void CreateOrUpdateLbPolicyLocked( RefCountedPtr lb_policy_config, - Resolver::Result result, TraceStringVector* trace_strings); + Resolver::Result result); OrphanablePtr CreateLbPolicyLocked( - const char* lb_policy_name, const grpc_channel_args& args, - TraceStringVector* trace_strings); + const grpc_channel_args& args); void MaybeAddTraceMessagesForAddressChangesLocked( bool resolution_contains_addresses, TraceStringVector* trace_strings); void ConcatenateAndAddChannelTraceLocked( @@ -116,7 +115,6 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { // Child LB policy. OrphanablePtr lb_policy_; - OrphanablePtr pending_lb_policy_; }; } // namespace grpc_core diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 4643b7462a5..06477148ca9 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -28,6 +28,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/http_connect_handshaker.cc', 'src/core/ext/filters/client_channel/http_proxy.cc', 'src/core/ext/filters/client_channel/lb_policy.cc', + 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc', diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 65e96d493c3..2cdd451df64 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1091,6 +1091,8 @@ src/core/ext/filters/client_channel/http_proxy.cc \ src/core/ext/filters/client_channel/http_proxy.h \ src/core/ext/filters/client_channel/lb_policy.cc \ src/core/ext/filters/client_channel/lb_policy.h \ +src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \ +src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 9ece6e610ed..5a643358339 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -888,6 +888,8 @@ src/core/ext/filters/client_channel/http_proxy.cc \ src/core/ext/filters/client_channel/http_proxy.h \ src/core/ext/filters/client_channel/lb_policy.cc \ src/core/ext/filters/client_channel/lb_policy.h \ +src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \ +src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \