From e6bf7c12cf9e73d3da2ff189bed34d0cbac29c4b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 11 Sep 2023 19:42:54 -0700 Subject: [PATCH] Revert "[round_robin] delegate to pick_first as per dualstack design" (#34317) Reverts grpc/grpc#34241 --- CMakeLists.txt | 2 - Makefile | 2 - Package.swift | 2 - build_autogenerated.yaml | 4 - config.m4 | 1 - config.w32 | 1 - gRPC-C++.podspec | 2 - gRPC-Core.podspec | 3 - grpc.gemspec | 2 - grpc.gyp | 2 - package.xml | 2 - src/core/BUILD | 38 +- .../client_channel/lb_policy/endpoint_list.cc | 188 -------- .../client_channel/lb_policy/endpoint_list.h | 214 --------- .../lb_policy/round_robin/round_robin.cc | 438 ++++++++++-------- src/core/lib/experiments/experiments.cc | 18 - src/core/lib/experiments/experiments.h | 10 +- src/core/lib/experiments/experiments.yaml | 7 - src/core/lib/experiments/rollouts.yaml | 2 - src/python/grpcio/grpc_core_dependencies.py | 1 - .../lb_policy/outlier_detection_test.cc | 2 + .../lb_policy/round_robin_test.cc | 4 +- test/cpp/end2end/client_lb_end2end_test.cc | 6 +- tools/doxygen/Doxyfile.c++.internal | 2 - tools/doxygen/Doxyfile.core.internal | 2 - 25 files changed, 262 insertions(+), 693 deletions(-) delete mode 100644 src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc delete mode 100644 src/core/ext/filters/client_channel/lb_policy/endpoint_list.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 5715a8fd3e4..569cf60401b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1711,7 +1711,6 @@ add_library(grpc src/core/ext/filters/client_channel/http_proxy_mapper.cc src/core/ext/filters/client_channel/lb_policy/address_filtering.cc src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc - src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc @@ -2754,7 +2753,6 @@ add_library(grpc_unsecure src/core/ext/filters/client_channel/http_proxy_mapper.cc src/core/ext/filters/client_channel/lb_policy/address_filtering.cc src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc - src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc diff --git a/Makefile b/Makefile index c4eda72e5bd..e9353eb334c 100644 --- a/Makefile +++ b/Makefile @@ -972,7 +972,6 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/http_proxy_mapper.cc \ src/core/ext/filters/client_channel/lb_policy/address_filtering.cc \ src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \ - src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc \ @@ -1876,7 +1875,6 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/client_channel/http_proxy_mapper.cc \ src/core/ext/filters/client_channel/lb_policy/address_filtering.cc \ src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \ - src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc \ diff --git a/Package.swift b/Package.swift index 05576ebc0dc..c2cc9bfac6d 100644 --- a/Package.swift +++ b/Package.swift @@ -149,8 +149,6 @@ let package = Package( "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h", "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc", "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h", - "src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc", - "src/core/ext/filters/client_channel/lb_policy/endpoint_list.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc", "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 3615cc46896..5ae25305e81 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -232,7 +232,6 @@ libs: - src/core/ext/filters/client_channel/lb_policy/address_filtering.h - src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h - src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h - - src/core/ext/filters/client_channel/lb_policy/endpoint_list.h - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h @@ -1048,7 +1047,6 @@ libs: - src/core/ext/filters/client_channel/http_proxy_mapper.cc - src/core/ext/filters/client_channel/lb_policy/address_filtering.cc - src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc - - src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc @@ -1980,7 +1978,6 @@ libs: - src/core/ext/filters/client_channel/lb_policy/address_filtering.h - src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h - src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h - - src/core/ext/filters/client_channel/lb_policy/endpoint_list.h - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h @@ -2412,7 +2409,6 @@ libs: - src/core/ext/filters/client_channel/http_proxy_mapper.cc - src/core/ext/filters/client_channel/lb_policy/address_filtering.cc - src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc - - src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc diff --git a/config.m4 b/config.m4 index 275f65b1401..50cfef0e9ce 100644 --- a/config.m4 +++ b/config.m4 @@ -59,7 +59,6 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/http_proxy_mapper.cc \ src/core/ext/filters/client_channel/lb_policy/address_filtering.cc \ src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \ - src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc \ diff --git a/config.w32 b/config.w32 index 32504b958a3..9b891dabde1 100644 --- a/config.w32 +++ b/config.w32 @@ -24,7 +24,6 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\http_proxy_mapper.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\address_filtering.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\child_policy_handler.cc " + - "src\\core\\ext\\filters\\client_channel\\lb_policy\\endpoint_list.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\grpclb\\client_load_reporting_filter.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\grpclb\\grpclb.cc " + "src\\core\\ext\\filters\\client_channel\\lb_policy\\grpclb\\grpclb_balancer_addresses.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index ca7d8ae57c1..66701102fc0 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -263,7 +263,6 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/address_filtering.h', 'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h', 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h', - 'src/core/ext/filters/client_channel/lb_policy/endpoint_list.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h', @@ -1349,7 +1348,6 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/address_filtering.h', 'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h', 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h', - 'src/core/ext/filters/client_channel/lb_policy/endpoint_list.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index b1b5ac2c4e5..cb23b297197 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -250,8 +250,6 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h', 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc', 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h', - 'src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc', - 'src/core/ext/filters/client_channel/lb_policy/endpoint_list.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc', @@ -2122,7 +2120,6 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/address_filtering.h', 'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h', 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h', - 'src/core/ext/filters/client_channel/lb_policy/endpoint_list.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h', diff --git a/grpc.gemspec b/grpc.gemspec index 9d0846479a7..41ab905785a 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -155,8 +155,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h ) - s.files += %w( src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc ) - s.files += %w( src/core/ext/filters/client_channel/lb_policy/endpoint_list.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc ) diff --git a/grpc.gyp b/grpc.gyp index 2b418e26221..617cb48302e 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -288,7 +288,6 @@ 'src/core/ext/filters/client_channel/http_proxy_mapper.cc', 'src/core/ext/filters/client_channel/lb_policy/address_filtering.cc', 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc', - 'src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc', @@ -1132,7 +1131,6 @@ 'src/core/ext/filters/client_channel/http_proxy_mapper.cc', 'src/core/ext/filters/client_channel/lb_policy/address_filtering.cc', 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc', - 'src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc', diff --git a/package.xml b/package.xml index ceda0c4b8f7..654ff0c29cb 100644 --- a/package.xml +++ b/package.xml @@ -137,8 +137,6 @@ - - diff --git a/src/core/BUILD b/src/core/BUILD index 3577f63855c..8e64a89e95a 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4775,41 +4775,6 @@ grpc_cc_library( ], ) -grpc_cc_library( - name = "lb_endpoint_list", - srcs = [ - "ext/filters/client_channel/lb_policy/endpoint_list.cc", - ], - hdrs = [ - "ext/filters/client_channel/lb_policy/endpoint_list.h", - ], - external_deps = [ - "absl/functional:any_invocable", - "absl/status", - "absl/status:statusor", - "absl/types:optional", - ], - language = "c++", - deps = [ - "channel_args", - "delegating_helper", - "grpc_lb_policy_pick_first", - "json", - "lb_policy", - "lb_policy_registry", - "pollset_set", - "subchannel_interface", - "//:config", - "//:debug_location", - "//:gpr", - "//:grpc_base", - "//:orphanable", - "//:ref_counted_ptr", - "//:server_address", - "//:work_serializer", - ], -) - grpc_cc_library( name = "grpc_lb_policy_pick_first", srcs = [ @@ -4914,10 +4879,11 @@ grpc_cc_library( language = "c++", deps = [ "channel_args", + "grpc_lb_subchannel_list", "json", - "lb_endpoint_list", "lb_policy", "lb_policy_factory", + "subchannel_interface", "//:config", "//:debug_location", "//:gpr", diff --git a/src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc b/src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc deleted file mode 100644 index 9269359d748..00000000000 --- a/src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc +++ /dev/null @@ -1,188 +0,0 @@ -// -// Copyright 2015 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#include - -#include "src/core/ext/filters/client_channel/lb_policy/endpoint_list.h" - -#include - -#include -#include -#include -#include - -#include "absl/status/status.h" -#include "absl/status/statusor.h" -#include "absl/types/optional.h" - -#include -#include -#include - -#include "src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h" -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/config/core_configuration.h" -#include "src/core/lib/gprpp/debug_location.h" -#include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/iomgr/pollset_set.h" -#include "src/core/lib/json/json.h" -#include "src/core/lib/load_balancing/delegating_helper.h" -#include "src/core/lib/load_balancing/lb_policy.h" -#include "src/core/lib/load_balancing/lb_policy_registry.h" -#include "src/core/lib/resolver/server_address.h" - -namespace grpc_core { - -// -// EndpointList::Endpoint::Helper -// - -class EndpointList::Endpoint::Helper - : public LoadBalancingPolicy::DelegatingChannelControlHelper { - public: - explicit Helper(RefCountedPtr endpoint) - : endpoint_(std::move(endpoint)) {} - - ~Helper() override { endpoint_.reset(DEBUG_LOCATION, "Helper"); } - - RefCountedPtr CreateSubchannel( - ServerAddress address, const ChannelArgs& args) override { - return endpoint_->CreateSubchannel(std::move(address), args); - } - - void UpdateState( - grpc_connectivity_state state, const absl::Status& status, - RefCountedPtr picker) override { - auto old_state = std::exchange(endpoint_->connectivity_state_, state); - endpoint_->picker_ = std::move(picker); - endpoint_->OnStateUpdate(old_state, state, status); - } - - private: - LoadBalancingPolicy::ChannelControlHelper* parent_helper() const override { - return endpoint_->endpoint_list_->channel_control_helper(); - } - - RefCountedPtr endpoint_; -}; - -// -// EndpointList::Endpoint -// - -void EndpointList::Endpoint::Init( - const ServerAddress& address, const ChannelArgs& args, - std::shared_ptr work_serializer) { - ChannelArgs child_args = - args.Set(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING, true) - .Set(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX, true); - LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.work_serializer = std::move(work_serializer); - lb_policy_args.args = child_args; - lb_policy_args.channel_control_helper = - std::make_unique(Ref(DEBUG_LOCATION, "Helper")); - child_policy_ = - CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy( - "pick_first", std::move(lb_policy_args)); - if (GPR_UNLIKELY(endpoint_list_->tracer_ != nullptr)) { - gpr_log(GPR_INFO, "[%s %p] endpoint %p: created child policy %p", - endpoint_list_->tracer_, endpoint_list_->policy_.get(), this, - child_policy_.get()); - } - // Add our interested_parties pollset_set to that of the newly created - // child policy. This will make the child policy progress upon activity on - // this policy, which in turn is tied to the application's call. - grpc_pollset_set_add_pollset_set( - child_policy_->interested_parties(), - endpoint_list_->policy_->interested_parties()); - // Construct pick_first config. - auto config = - CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig( - Json::FromArray( - {Json::FromObject({{"pick_first", Json::FromObject({})}})})); - GPR_ASSERT(config.ok()); - // Update child policy. - LoadBalancingPolicy::UpdateArgs update_args; - update_args.addresses.emplace().emplace_back(address); - update_args.args = child_args; - update_args.config = std::move(*config); - // TODO(roth): If the child reports a non-OK status with the update, - // we need to propagate that back to the resolver somehow. - (void)child_policy_->UpdateLocked(std::move(update_args)); -} - -void EndpointList::Endpoint::Orphan() { - // Remove pollset_set linkage. - grpc_pollset_set_del_pollset_set( - child_policy_->interested_parties(), - endpoint_list_->policy_->interested_parties()); - child_policy_.reset(); - picker_.reset(); - Unref(); -} - -void EndpointList::Endpoint::ResetBackoffLocked() { - if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); -} - -void EndpointList::Endpoint::ExitIdleLocked() { - if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); -} - -size_t EndpointList::Endpoint::Index() const { - for (size_t i = 0; i < endpoint_list_->endpoints_.size(); ++i) { - if (endpoint_list_->endpoints_[i].get() == this) return i; - } - return -1; -} - -RefCountedPtr EndpointList::Endpoint::CreateSubchannel( - ServerAddress address, const ChannelArgs& args) { - return endpoint_list_->channel_control_helper()->CreateSubchannel( - std::move(address), args); -} - -// -// EndpointList -// - -void EndpointList::Init( - const ServerAddressList& addresses, const ChannelArgs& args, - absl::AnyInvocable( - RefCountedPtr, const ServerAddress&, const ChannelArgs&)> - create_endpoint) { - for (const ServerAddress& address : addresses) { - endpoints_.push_back( - create_endpoint(Ref(DEBUG_LOCATION, "Endpoint"), address, args)); - } -} - -void EndpointList::ResetBackoffLocked() { - for (const auto& endpoint : endpoints_) { - endpoint->ResetBackoffLocked(); - } -} - -bool EndpointList::AllEndpointsSeenInitialState() const { - for (const auto& endpoint : endpoints_) { - if (!endpoint->connectivity_state().has_value()) return false; - } - return true; -} - -} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/endpoint_list.h b/src/core/ext/filters/client_channel/lb_policy/endpoint_list.h deleted file mode 100644 index 66fce2871e4..00000000000 --- a/src/core/ext/filters/client_channel/lb_policy/endpoint_list.h +++ /dev/null @@ -1,214 +0,0 @@ -// -// Copyright 2015 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ENDPOINT_LIST_H -#define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ENDPOINT_LIST_H - -#include - -#include - -#include -#include -#include - -#include "absl/functional/any_invocable.h" -#include "absl/status/status.h" -#include "absl/types/optional.h" - -#include - -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/gprpp/debug_location.h" -#include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/gprpp/work_serializer.h" -#include "src/core/lib/load_balancing/lb_policy.h" -#include "src/core/lib/load_balancing/subchannel_interface.h" -#include "src/core/lib/resolver/server_address.h" - -namespace grpc_core { - -// A list of endpoints for use in a petiole LB policy. Each endpoint may -// have one or more addresses, which will be passed down to a pick_first -// child policy. -// -// To use this, a petiole policy must define its own subclass of both -// EndpointList and EndpointList::Endpoint, like so: -/* -class MyEndpointList : public EndpointList { - public: - MyEndpointList(RefCountedPtr lb_policy, - const ServerAddressList& addresses, const ChannelArgs& args) - : EndpointList(std::move(lb_policy), - GRPC_TRACE_FLAG_ENABLED(grpc_my_tracer) - ? "MyEndpointList" - : nullptr) { - Init(addresses, args, - [&](RefCountedPtr endpoint_list, - const ServerAddress& address, const ChannelArgs& args) { - return MakeOrphanable( - std::move(endpoint_list), address, args, - policy()->work_serializer()); - }); - } - - private: - class MyEndpoint : public Endpoint { - public: - MyEndpoint(RefCountedPtr endpoint_list, - const ServerAddress& address, const ChannelArgs& args, - std::shared_ptr work_serializer) - : Endpoint(std::move(endpoint_list)) { - Init(address, args, std::move(work_serializer)); - } - - private: - void OnStateUpdate( - absl::optional old_state, - grpc_connectivity_state new_state, - const absl::Status& status) override { - // ...handle connectivity state change... - } - }; - - LoadBalancingPolicy::ChannelControlHelper* channel_control_helper() - const override { - return policy()->channel_control_helper(); - } -}; -*/ -// TODO(roth): Consider wrapping this in an LB policy subclass for petiole -// policies to inherit from. -class EndpointList : public InternallyRefCounted { - public: - // An individual endpoint. - class Endpoint : public InternallyRefCounted { - public: - ~Endpoint() override { endpoint_list_.reset(DEBUG_LOCATION, "Endpoint"); } - - void Orphan() override; - - void ResetBackoffLocked(); - void ExitIdleLocked(); - - absl::optional connectivity_state() const { - return connectivity_state_; - } - RefCountedPtr picker() const { - return picker_; - } - - protected: - // We use two-phase initialization here to ensure that the vtable is - // initialized before we need to use it. Subclass must invoke Init() - // from inside its ctor. - explicit Endpoint(RefCountedPtr endpoint_list) - : endpoint_list_(std::move(endpoint_list)) {} - - void Init(const ServerAddress& address, const ChannelArgs& args, - std::shared_ptr work_serializer); - - // Templated for convenience, to provide a short-hand for - // down-casting in the caller. - template - T* endpoint_list() const { - return static_cast(endpoint_list_.get()); - } - - // Templated for convenience, to provide a short-hand for down-casting - // in the caller. - template - T* policy() const { - return endpoint_list_->policy(); - } - - // Returns the index of this endpoint within the EndpointList. - // Intended for trace logging. - size_t Index() const; - - private: - class Helper; - - // Called when the child policy reports a connectivity state update. - virtual void OnStateUpdate( - absl::optional old_state, - grpc_connectivity_state new_state, const absl::Status& status) = 0; - - // Called to create a subchannel. Subclasses may override. - virtual RefCountedPtr CreateSubchannel( - ServerAddress address, const ChannelArgs& args); - - RefCountedPtr endpoint_list_; - - OrphanablePtr child_policy_; - absl::optional connectivity_state_; - RefCountedPtr picker_; - }; - - ~EndpointList() override { policy_.reset(DEBUG_LOCATION, "EndpointList"); } - - void Orphan() override { - endpoints_.clear(); - Unref(); - } - - size_t size() const { return endpoints_.size(); } - - const std::vector>& endpoints() const { - return endpoints_; - } - - void ResetBackoffLocked(); - - protected: - // We use two-phase initialization here to ensure that the vtable is - // initialized before we need to use it. Subclass must invoke Init() - // from inside its ctor. - EndpointList(RefCountedPtr policy, const char* tracer) - : policy_(std::move(policy)), tracer_(tracer) {} - - void Init(const ServerAddressList& addresses, const ChannelArgs& args, - absl::AnyInvocable( - RefCountedPtr, const ServerAddress&, - const ChannelArgs&)> - create_endpoint); - - // Templated for convenience, to provide a short-hand for down-casting - // in the caller. - template - T* policy() const { - return static_cast(policy_.get()); - } - - // Returns true if all endpoints have seen their initial connectivity - // state notification. - bool AllEndpointsSeenInitialState() const; - - private: - // Returns the parent policy's helper. Needed because the accessor - // method is protected on LoadBalancingPolicy. - virtual LoadBalancingPolicy::ChannelControlHelper* channel_control_helper() - const = 0; - - RefCountedPtr policy_; - const char* tracer_; - std::vector> endpoints_; -}; - -} // namespace grpc_core - -#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ENDPOINT_LIST_H diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index d883fe0c7cc..4cf71c9c951 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -37,7 +37,7 @@ #include #include -#include "src/core/ext/filters/client_channel/lb_policy/endpoint_list.h" +#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" @@ -48,6 +48,7 @@ #include "src/core/lib/json/json.h" #include "src/core/lib/load_balancing/lb_policy.h" #include "src/core/lib/load_balancing/lb_policy_factory.h" +#include "src/core/lib/load_balancing/subchannel_interface.h" #include "src/core/lib/resolver/server_address.h" #include "src/core/lib/transport/connectivity_state.h" @@ -73,60 +74,93 @@ class RoundRobin : public LoadBalancingPolicy { void ResetBackoffLocked() override; private: - class RoundRobinEndpointList : public EndpointList { + ~RoundRobin() override; + + // Forward declaration. + class RoundRobinSubchannelList; + + // Data for a particular subchannel in a subchannel list. + // This subclass adds the following functionality: + // - Tracks the previous connectivity state of the subchannel, so that + // we know how many subchannels are in each state. + class RoundRobinSubchannelData + : public SubchannelData { public: - RoundRobinEndpointList(RefCountedPtr round_robin, - const ServerAddressList& addresses, - const ChannelArgs& args) - : EndpointList(std::move(round_robin), - GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) - ? "RoundRobinEndpointList" - : nullptr) { - Init(addresses, args, - [&](RefCountedPtr endpoint_list, - const ServerAddress& address, const ChannelArgs& args) { - return MakeOrphanable( - std::move(endpoint_list), address, args, - policy()->work_serializer()); - }); + RoundRobinSubchannelData( + SubchannelList* + subchannel_list, + const ServerAddress& address, + RefCountedPtr subchannel) + : SubchannelData(subchannel_list, address, std::move(subchannel)) {} + + absl::optional connectivity_state() const { + return logical_connectivity_state_; } private: - class RoundRobinEndpoint : public Endpoint { - public: - RoundRobinEndpoint(RefCountedPtr endpoint_list, - const ServerAddress& address, const ChannelArgs& args, - std::shared_ptr work_serializer) - : Endpoint(std::move(endpoint_list)) { - Init(address, args, std::move(work_serializer)); - } - - private: - // Called when the child policy reports a connectivity state update. - void OnStateUpdate(absl::optional old_state, - grpc_connectivity_state new_state, - const absl::Status& status) override; - }; - - LoadBalancingPolicy::ChannelControlHelper* channel_control_helper() - const override { - return policy()->channel_control_helper(); + // Performs connectivity state updates that need to be done only + // after we have started watching. + void ProcessConnectivityChangeLocked( + absl::optional old_state, + grpc_connectivity_state new_state) override; + + // Updates the logical connectivity state. + void UpdateLogicalConnectivityStateLocked( + grpc_connectivity_state connectivity_state); + + // The logical connectivity state of the subchannel. + // Note that the logical connectivity state may differ from the + // actual reported state in some cases (e.g., after we see + // TRANSIENT_FAILURE, we ignore any subsequent state changes until + // we see READY). + absl::optional logical_connectivity_state_; + }; + + // A list of subchannels. + class RoundRobinSubchannelList + : public SubchannelList { + public: + RoundRobinSubchannelList(RoundRobin* policy, ServerAddressList addresses, + const ChannelArgs& args) + : SubchannelList(policy, + (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) + ? "RoundRobinSubchannelList" + : nullptr), + std::move(addresses), policy->channel_control_helper(), + args) { + // Need to maintain a ref to the LB policy as long as we maintain + // any references to subchannels, since the subchannels' + // pollset_sets will include the LB policy's pollset_set. + policy->Ref(DEBUG_LOCATION, "subchannel_list").release(); } - // Updates the counters of children in each state when a - // child transitions from old_state to new_state. + ~RoundRobinSubchannelList() override { + RoundRobin* p = static_cast(policy()); + p->Unref(DEBUG_LOCATION, "subchannel_list"); + } + + // Updates the counters of subchannels in each state when a + // subchannel transitions from old_state to new_state. void UpdateStateCountersLocked( absl::optional old_state, grpc_connectivity_state new_state); - // Ensures that the right child list is used and then updates - // the RR policy's connectivity state based on the child list's + // Ensures that the right subchannel list is used and then updates + // the RR policy's connectivity state based on the subchannel list's // state counters. void MaybeUpdateRoundRobinConnectivityStateLocked( absl::Status status_for_tf); + private: + std::shared_ptr work_serializer() const override { + return static_cast(policy())->work_serializer(); + } + std::string CountersString() const { - return absl::StrCat("num_children=", size(), " num_ready=", num_ready_, + return absl::StrCat("num_subchannels=", num_subchannels(), + " num_ready=", num_ready_, " num_connecting=", num_connecting_, " num_transient_failure=", num_transient_failure_); } @@ -140,9 +174,7 @@ class RoundRobin : public LoadBalancingPolicy { class Picker : public SubchannelPicker { public: - Picker(RoundRobin* parent, - std::vector> - pickers); + Picker(RoundRobin* parent, RoundRobinSubchannelList* subchannel_list); PickResult Pick(PickArgs args) override; @@ -151,20 +183,18 @@ class RoundRobin : public LoadBalancingPolicy { RoundRobin* parent_; std::atomic last_picked_index_; - std::vector> pickers_; + std::vector> subchannels_; }; - ~RoundRobin() override; - void ShutdownLocked() override; - // Current child list. - OrphanablePtr endpoint_list_; - // Latest pending child list. - // When we get an updated address list, we create a new child list - // for it here, and we wait to swap it into endpoint_list_ until the new + // List of subchannels. + RefCountedPtr subchannel_list_; + // Latest pending subchannel list. + // When we get an updated address list, we create a new subchannel list + // for it here, and we wait to swap it into subchannel_list_ until the new // list becomes READY. - OrphanablePtr latest_pending_endpoint_list_; + RefCountedPtr latest_pending_subchannel_list_; bool shutdown_ = false; @@ -175,32 +205,38 @@ class RoundRobin : public LoadBalancingPolicy { // RoundRobin::Picker // -RoundRobin::Picker::Picker( - RoundRobin* parent, - std::vector> pickers) - : parent_(parent), pickers_(std::move(pickers)) { +RoundRobin::Picker::Picker(RoundRobin* parent, + RoundRobinSubchannelList* subchannel_list) + : parent_(parent) { + for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { + RoundRobinSubchannelData* sd = subchannel_list->subchannel(i); + if (sd->connectivity_state().value_or(GRPC_CHANNEL_IDLE) == + GRPC_CHANNEL_READY) { + subchannels_.push_back(sd->subchannel()->Ref()); + } + } // For discussion on why we generate a random starting index for // the picker, see https://github.com/grpc/grpc-go/issues/2580. - size_t index = absl::Uniform(parent->bit_gen_, 0, pickers_.size()); + size_t index = + absl::Uniform(parent->bit_gen_, 0, subchannels_.size()); last_picked_index_.store(index, std::memory_order_relaxed); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, - "[RR %p picker %p] created picker from endpoint_list=%p " - "with %" PRIuPTR " READY children; last_picked_index_=%" PRIuPTR, - parent_, this, parent_->endpoint_list_.get(), pickers_.size(), - index); + "[RR %p picker %p] created picker from subchannel_list=%p " + "with %" PRIuPTR " READY subchannels; last_picked_index_=%" PRIuPTR, + parent_, this, subchannel_list, subchannels_.size(), index); } } -RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs args) { +RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs /*args*/) { size_t index = last_picked_index_.fetch_add(1, std::memory_order_relaxed) % - pickers_.size(); + subchannels_.size(); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, - "[RR %p picker %p] using picker index %" PRIuPTR ", picker=%p", - parent_, this, index, pickers_[index].get()); + "[RR %p picker %p] returning index %" PRIuPTR ", subchannel=%p", + parent_, this, index, subchannels_[index].get()); } - return pickers_[index]->Pick(args); + return PickResult::Complete(subchannels_[index]); } // @@ -217,8 +253,8 @@ RoundRobin::~RoundRobin() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this); } - GPR_ASSERT(endpoint_list_ == nullptr); - GPR_ASSERT(latest_pending_endpoint_list_ == nullptr); + GPR_ASSERT(subchannel_list_ == nullptr); + GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); } void RoundRobin::ShutdownLocked() { @@ -226,14 +262,14 @@ void RoundRobin::ShutdownLocked() { gpr_log(GPR_INFO, "[RR %p] Shutting down", this); } shutdown_ = true; - endpoint_list_.reset(); - latest_pending_endpoint_list_.reset(); + subchannel_list_.reset(); + latest_pending_subchannel_list_.reset(); } void RoundRobin::ResetBackoffLocked() { - endpoint_list_->ResetBackoffLocked(); - if (latest_pending_endpoint_list_ != nullptr) { - latest_pending_endpoint_list_->ResetBackoffLocked(); + subchannel_list_->ResetBackoffLocked(); + if (latest_pending_subchannel_list_ != nullptr) { + latest_pending_subchannel_list_->ResetBackoffLocked(); } } @@ -250,31 +286,28 @@ absl::Status RoundRobin::UpdateLocked(UpdateArgs args) { gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this, args.addresses.status().ToString().c_str()); } - // If we already have a child list, then keep using the existing + // If we already have a subchannel list, then keep using the existing // list, but still report back that the update was not accepted. - if (endpoint_list_ != nullptr) return args.addresses.status(); + if (subchannel_list_ != nullptr) return args.addresses.status(); } - // Create new child list, replacing the previous pending list, if any. + // Create new subchannel list, replacing the previous pending list, if any. if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) && - latest_pending_endpoint_list_ != nullptr) { - gpr_log(GPR_INFO, "[RR %p] replacing previous pending child list %p", this, - latest_pending_endpoint_list_.get()); + latest_pending_subchannel_list_ != nullptr) { + gpr_log(GPR_INFO, "[RR %p] replacing previous pending subchannel list %p", + this, latest_pending_subchannel_list_.get()); } - latest_pending_endpoint_list_ = MakeOrphanable( - Ref(DEBUG_LOCATION, "RoundRobinEndpointList"), std::move(addresses), - args.args); + latest_pending_subchannel_list_ = MakeRefCounted( + this, std::move(addresses), args.args); + latest_pending_subchannel_list_->StartWatchingLocked(args.args); // If the new list is empty, immediately promote it to - // endpoint_list_ and report TRANSIENT_FAILURE. - // TODO(roth): As part of adding dualstack backend support, we need to - // also handle the case where the list of addresses for a given - // endpoint is empty. - if (latest_pending_endpoint_list_->size() == 0) { + // subchannel_list_ and report TRANSIENT_FAILURE. + if (latest_pending_subchannel_list_->num_subchannels() == 0) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) && - endpoint_list_ != nullptr) { - gpr_log(GPR_INFO, "[RR %p] replacing previous child list %p", this, - endpoint_list_.get()); + subchannel_list_ != nullptr) { + gpr_log(GPR_INFO, "[RR %p] replacing previous subchannel list %p", this, + subchannel_list_.get()); } - endpoint_list_ = std::move(latest_pending_endpoint_list_); + subchannel_list_ = std::move(latest_pending_subchannel_list_); absl::Status status = args.addresses.ok() ? absl::UnavailableError(absl::StrCat( "empty address list: ", args.resolution_note)) @@ -285,64 +318,26 @@ absl::Status RoundRobin::UpdateLocked(UpdateArgs args) { return status; } // Otherwise, if this is the initial update, immediately promote it to - // endpoint_list_. - if (endpoint_list_ == nullptr) { - endpoint_list_ = std::move(latest_pending_endpoint_list_); + // subchannel_list_. + if (subchannel_list_.get() == nullptr) { + subchannel_list_ = std::move(latest_pending_subchannel_list_); } return absl::OkStatus(); } // -// RoundRobin::RoundRobinEndpointList::RoundRobinEndpoint -// - -void RoundRobin::RoundRobinEndpointList::RoundRobinEndpoint::OnStateUpdate( - absl::optional old_state, - grpc_connectivity_state new_state, const absl::Status& status) { - auto* rr_endpoint_list = endpoint_list(); - auto* round_robin = policy(); - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { - gpr_log(GPR_INFO, - "[RR %p] connectivity changed for child %p, endpoint_list %p " - "(index %" PRIuPTR " of %" PRIuPTR - "): prev_state=%s new_state=%s " - "(%s)", - round_robin, this, rr_endpoint_list, Index(), - rr_endpoint_list->size(), - (old_state.has_value() ? ConnectivityStateName(*old_state) : "N/A"), - ConnectivityStateName(new_state), status.ToString().c_str()); - } - if (new_state == GRPC_CHANNEL_IDLE) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { - gpr_log(GPR_INFO, "[RR %p] child %p reported IDLE; requesting connection", - round_robin, this); - } - ExitIdleLocked(); - } - // If state changed, update state counters. - if (!old_state.has_value() || *old_state != new_state) { - rr_endpoint_list->UpdateStateCountersLocked(old_state, new_state); - } - // Update the policy state. - rr_endpoint_list->MaybeUpdateRoundRobinConnectivityStateLocked(status); -} - -// -// RoundRobin::RoundRobinEndpointList +// RoundRobinSubchannelList // -void RoundRobin::RoundRobinEndpointList::UpdateStateCountersLocked( +void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked( absl::optional old_state, grpc_connectivity_state new_state) { - // We treat IDLE the same as CONNECTING, since it will immediately - // transition into that state anyway. if (old_state.has_value()) { GPR_ASSERT(*old_state != GRPC_CHANNEL_SHUTDOWN); if (*old_state == GRPC_CHANNEL_READY) { GPR_ASSERT(num_ready_ > 0); --num_ready_; - } else if (*old_state == GRPC_CHANNEL_CONNECTING || - *old_state == GRPC_CHANNEL_IDLE) { + } else if (*old_state == GRPC_CHANNEL_CONNECTING) { GPR_ASSERT(num_connecting_ > 0); --num_connecting_; } else if (*old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { @@ -353,90 +348,161 @@ void RoundRobin::RoundRobinEndpointList::UpdateStateCountersLocked( GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN); if (new_state == GRPC_CHANNEL_READY) { ++num_ready_; - } else if (new_state == GRPC_CHANNEL_CONNECTING || - new_state == GRPC_CHANNEL_IDLE) { + } else if (new_state == GRPC_CHANNEL_CONNECTING) { ++num_connecting_; } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { ++num_transient_failure_; } } -void RoundRobin::RoundRobinEndpointList:: +void RoundRobin::RoundRobinSubchannelList:: MaybeUpdateRoundRobinConnectivityStateLocked(absl::Status status_for_tf) { - auto* round_robin = policy(); - // If this is latest_pending_endpoint_list_, then swap it into - // endpoint_list_ in the following cases: - // - endpoint_list_ has no READY children. - // - This list has at least one READY child and we have seen the - // initial connectivity state notification for all children. - // - All of the children in this list are in TRANSIENT_FAILURE. + RoundRobin* p = static_cast(policy()); + // If this is latest_pending_subchannel_list_, then swap it into + // subchannel_list_ in the following cases: + // - subchannel_list_ has no READY subchannels. + // - This list has at least one READY subchannel and we have seen the + // initial connectivity state notification for all subchannels. + // - All of the subchannels in this list are in TRANSIENT_FAILURE. // (This may cause the channel to go from READY to TRANSIENT_FAILURE, // but we're doing what the control plane told us to do.) - if (round_robin->latest_pending_endpoint_list_.get() == this && - (round_robin->endpoint_list_->num_ready_ == 0 || - (num_ready_ > 0 && AllEndpointsSeenInitialState()) || - num_transient_failure_ == size())) { + if (p->latest_pending_subchannel_list_.get() == this && + (p->subchannel_list_->num_ready_ == 0 || + (num_ready_ > 0 && AllSubchannelsSeenInitialState()) || + num_transient_failure_ == num_subchannels())) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { const std::string old_counters_string = - round_robin->endpoint_list_ != nullptr - ? round_robin->endpoint_list_->CountersString() - : ""; - gpr_log(GPR_INFO, - "[RR %p] swapping out child list %p (%s) in favor of %p (%s)", - round_robin, round_robin->endpoint_list_.get(), - old_counters_string.c_str(), this, CountersString().c_str()); + p->subchannel_list_ != nullptr ? p->subchannel_list_->CountersString() + : ""; + gpr_log( + GPR_INFO, + "[RR %p] swapping out subchannel list %p (%s) in favor of %p (%s)", p, + p->subchannel_list_.get(), old_counters_string.c_str(), this, + CountersString().c_str()); } - round_robin->endpoint_list_ = - std::move(round_robin->latest_pending_endpoint_list_); + p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); } - // Only set connectivity state if this is the current child list. - if (round_robin->endpoint_list_.get() != this) return; - // FIXME: scan children each time instead of keeping counters? + // Only set connectivity state if this is the current subchannel list. + if (p->subchannel_list_.get() != this) return; // First matching rule wins: - // 1) ANY child is READY => policy is READY. - // 2) ANY child is CONNECTING => policy is CONNECTING. - // 3) ALL children are TRANSIENT_FAILURE => policy is TRANSIENT_FAILURE. + // 1) ANY subchannel is READY => policy is READY. + // 2) ANY subchannel is CONNECTING => policy is CONNECTING. + // 3) ALL subchannels are TRANSIENT_FAILURE => policy is TRANSIENT_FAILURE. if (num_ready_ > 0) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { - gpr_log(GPR_INFO, "[RR %p] reporting READY with child list %p", - round_robin, this); + gpr_log(GPR_INFO, "[RR %p] reporting READY with subchannel list %p", p, + this); } - std::vector> pickers; - for (const auto& endpoint : endpoints()) { - auto state = endpoint->connectivity_state(); - if (state.has_value() && *state == GRPC_CHANNEL_READY) { - pickers.push_back(endpoint->picker()); - } - } - GPR_ASSERT(!pickers.empty()); - round_robin->channel_control_helper()->UpdateState( - GRPC_CHANNEL_READY, absl::OkStatus(), - MakeRefCounted(round_robin, std::move(pickers))); + p->channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(), + MakeRefCounted(p, this)); } else if (num_connecting_ > 0) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { - gpr_log(GPR_INFO, "[RR %p] reporting CONNECTING with child list %p", - round_robin, this); + gpr_log(GPR_INFO, "[RR %p] reporting CONNECTING with subchannel list %p", + p, this); } - round_robin->channel_control_helper()->UpdateState( + p->channel_control_helper()->UpdateState( GRPC_CHANNEL_CONNECTING, absl::Status(), - MakeRefCounted(nullptr)); - } else if (num_transient_failure_ == size()) { + MakeRefCounted(p->Ref(DEBUG_LOCATION, "QueuePicker"))); + } else if (num_transient_failure_ == num_subchannels()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, - "[RR %p] reporting TRANSIENT_FAILURE with child list %p: %s", - round_robin, this, status_for_tf.ToString().c_str()); + "[RR %p] reporting TRANSIENT_FAILURE with subchannel list %p: %s", + p, this, status_for_tf.ToString().c_str()); } if (!status_for_tf.ok()) { last_failure_ = absl::UnavailableError( absl::StrCat("connections to all backends failing; last error: ", - status_for_tf.message())); + status_for_tf.ToString())); } - round_robin->channel_control_helper()->UpdateState( + p->channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, last_failure_, MakeRefCounted(last_failure_)); } } +// +// RoundRobinSubchannelData +// + +void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( + absl::optional old_state, + grpc_connectivity_state new_state) { + RoundRobin* p = static_cast(subchannel_list()->policy()); + GPR_ASSERT(subchannel() != nullptr); + // If this is not the initial state notification and the new state is + // TRANSIENT_FAILURE or IDLE, re-resolve. + // Note that we don't want to do this on the initial state notification, + // because that would result in an endless loop of re-resolution. + if (old_state.has_value() && (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE || + new_state == GRPC_CHANNEL_IDLE)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, + "[RR %p] Subchannel %p reported %s; requesting re-resolution", p, + subchannel(), ConnectivityStateName(new_state)); + } + p->channel_control_helper()->RequestReresolution(); + } + if (new_state == GRPC_CHANNEL_IDLE) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, + "[RR %p] Subchannel %p reported IDLE; requesting connection", p, + subchannel()); + } + subchannel()->RequestConnection(); + } + // Update logical connectivity state. + UpdateLogicalConnectivityStateLocked(new_state); + // Update the policy state. + subchannel_list()->MaybeUpdateRoundRobinConnectivityStateLocked( + connectivity_status()); +} + +void RoundRobin::RoundRobinSubchannelData::UpdateLogicalConnectivityStateLocked( + grpc_connectivity_state connectivity_state) { + RoundRobin* p = static_cast(subchannel_list()->policy()); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { + gpr_log( + GPR_INFO, + "[RR %p] connectivity changed for subchannel %p, subchannel_list %p " + "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s", + p, subchannel(), subchannel_list(), Index(), + subchannel_list()->num_subchannels(), + (logical_connectivity_state_.has_value() + ? ConnectivityStateName(*logical_connectivity_state_) + : "N/A"), + ConnectivityStateName(connectivity_state)); + } + // Decide what state to report for aggregation purposes. + // If the last logical state was TRANSIENT_FAILURE, then ignore the + // state change unless the new state is READY. + if (logical_connectivity_state_.has_value() && + *logical_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && + connectivity_state != GRPC_CHANNEL_READY) { + return; + } + // If the new state is IDLE, treat it as CONNECTING, since it will + // immediately transition into CONNECTING anyway. + if (connectivity_state == GRPC_CHANNEL_IDLE) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, + "[RR %p] subchannel %p, subchannel_list %p (index %" PRIuPTR + " of %" PRIuPTR "): treating IDLE as CONNECTING", + p, subchannel(), subchannel_list(), Index(), + subchannel_list()->num_subchannels()); + } + connectivity_state = GRPC_CHANNEL_CONNECTING; + } + // If no change, return false. + if (logical_connectivity_state_.has_value() && + *logical_connectivity_state_ == connectivity_state) { + return; + } + // Otherwise, update counters and logical state. + subchannel_list()->UpdateStateCountersLocked(logical_connectivity_state_, + connectivity_state); + logical_connectivity_state_ = connectivity_state; +} + // // factory // diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index 7baa8b1fcae..cad482774fe 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -105,10 +105,6 @@ const char* const description_jitter_max_idle = "only on max connection age, but it seems like this could smooth out some " "herding problems."; const char* const additional_constraints_jitter_max_idle = "{}"; -const char* const description_round_robin_dualstack = - "Change round_robin code to delegate to pick_first as per dualstack " - "backend design."; -const char* const additional_constraints_round_robin_dualstack = "{}"; } // namespace namespace grpc_core { @@ -162,8 +158,6 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_lazier_stream_updates, true, true}, {"jitter_max_idle", description_jitter_max_idle, additional_constraints_jitter_max_idle, true, true}, - {"round_robin_dualstack", description_round_robin_dualstack, - additional_constraints_round_robin_dualstack, true, true}, }; } // namespace grpc_core @@ -253,10 +247,6 @@ const char* const description_jitter_max_idle = "only on max connection age, but it seems like this could smooth out some " "herding problems."; const char* const additional_constraints_jitter_max_idle = "{}"; -const char* const description_round_robin_dualstack = - "Change round_robin code to delegate to pick_first as per dualstack " - "backend design."; -const char* const additional_constraints_round_robin_dualstack = "{}"; } // namespace namespace grpc_core { @@ -310,8 +300,6 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_lazier_stream_updates, true, true}, {"jitter_max_idle", description_jitter_max_idle, additional_constraints_jitter_max_idle, true, true}, - {"round_robin_dualstack", description_round_robin_dualstack, - additional_constraints_round_robin_dualstack, true, true}, }; } // namespace grpc_core @@ -401,10 +389,6 @@ const char* const description_jitter_max_idle = "only on max connection age, but it seems like this could smooth out some " "herding problems."; const char* const additional_constraints_jitter_max_idle = "{}"; -const char* const description_round_robin_dualstack = - "Change round_robin code to delegate to pick_first as per dualstack " - "backend design."; -const char* const additional_constraints_round_robin_dualstack = "{}"; } // namespace namespace grpc_core { @@ -458,8 +442,6 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_lazier_stream_updates, true, true}, {"jitter_max_idle", description_jitter_max_idle, additional_constraints_jitter_max_idle, true, true}, - {"round_robin_dualstack", description_round_robin_dualstack, - additional_constraints_round_robin_dualstack, true, true}, }; } // namespace grpc_core diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index 1aee6347e8f..56251427db9 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -87,8 +87,6 @@ inline bool IsKeepaliveServerFixEnabled() { return false; } inline bool IsLazierStreamUpdatesEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE inline bool IsJitterMaxIdleEnabled() { return true; } -#define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DUALSTACK -inline bool IsRoundRobinDualstackEnabled() { return true; } #elif defined(GPR_WINDOWS) inline bool IsTcpFrameSizeTuningEnabled() { return false; } @@ -119,8 +117,6 @@ inline bool IsKeepaliveServerFixEnabled() { return false; } inline bool IsLazierStreamUpdatesEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE inline bool IsJitterMaxIdleEnabled() { return true; } -#define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DUALSTACK -inline bool IsRoundRobinDualstackEnabled() { return true; } #else inline bool IsTcpFrameSizeTuningEnabled() { return false; } @@ -151,8 +147,6 @@ inline bool IsKeepaliveServerFixEnabled() { return false; } inline bool IsLazierStreamUpdatesEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE inline bool IsJitterMaxIdleEnabled() { return true; } -#define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DUALSTACK -inline bool IsRoundRobinDualstackEnabled() { return true; } #endif #else @@ -208,10 +202,8 @@ inline bool IsKeepaliveServerFixEnabled() { return IsExperimentEnabled(20); } inline bool IsLazierStreamUpdatesEnabled() { return IsExperimentEnabled(21); } #define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE inline bool IsJitterMaxIdleEnabled() { return IsExperimentEnabled(22); } -#define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DUALSTACK -inline bool IsRoundRobinDualstackEnabled() { return IsExperimentEnabled(23); } -constexpr const size_t kNumExperiments = 24; +constexpr const size_t kNumExperiments = 23; extern const ExperimentMetadata g_experiment_metadata[kNumExperiments]; #endif diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index 45330706aa8..af6e4b4a6db 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -185,10 +185,3 @@ owner: ctiller@google.com test_tags: [] allow_in_fuzzing_config: true -- name: round_robin_dualstack - description: - Change round_robin code to delegate to pick_first as per dualstack - backend design. - expiry: 2023/11/15 - owner: roth@google.com - test_tags: [] diff --git a/src/core/lib/experiments/rollouts.yaml b/src/core/lib/experiments/rollouts.yaml index 7cac2d61850..77df5ad282e 100644 --- a/src/core/lib/experiments/rollouts.yaml +++ b/src/core/lib/experiments/rollouts.yaml @@ -94,5 +94,3 @@ default: true - name: jitter_max_idle default: true -- name: round_robin_dualstack - default: true diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 80f1628e13e..59259c58850 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -33,7 +33,6 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/http_proxy_mapper.cc', 'src/core/ext/filters/client_channel/lb_policy/address_filtering.cc', 'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc', - 'src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc', diff --git a/test/core/client_channel/lb_policy/outlier_detection_test.cc b/test/core/client_channel/lb_policy/outlier_detection_test.cc index 597bfabb8e6..ea3c0a477c2 100644 --- a/test/core/client_channel/lb_policy/outlier_detection_test.cc +++ b/test/core/client_channel/lb_policy/outlier_detection_test.cc @@ -229,6 +229,8 @@ TEST_F(OutlierDetectionTest, FailurePercentage) { time_cache_.IncrementBy(Duration::Seconds(10)); RunTimerCallback(); gpr_log(GPR_INFO, "### ejection complete"); + // Expect a re-resolution request. + ExpectReresolutionRequest(); // Expect a picker update. std::vector remaining_addresses; for (const auto& addr : kAddresses) { diff --git a/test/core/client_channel/lb_policy/round_robin_test.cc b/test/core/client_channel/lb_policy/round_robin_test.cc index 092242e66f3..ef82ceadebf 100644 --- a/test/core/client_channel/lb_policy/round_robin_test.cc +++ b/test/core/client_channel/lb_policy/round_robin_test.cc @@ -42,6 +42,8 @@ class RoundRobinTest : public LoadBalancingPolicyTest { void ExpectStartup(absl::Span addresses) { EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, nullptr), lb_policy_.get()), absl::OkStatus()); + // Expect the initial CONNECTNG update with a picker that queues. + ExpectConnectingUpdate(); // RR should have created a subchannel for each address. for (size_t i = 0; i < addresses.size(); ++i) { auto* subchannel = FindSubchannel(addresses[i]); @@ -50,8 +52,6 @@ class RoundRobinTest : public LoadBalancingPolicyTest { EXPECT_TRUE(subchannel->ConnectionRequested()); // The subchannel will connect successfully. subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); - // Expect the initial CONNECTNG update with a picker that queues. - if (i == 0) ExpectConnectingUpdate(); subchannel->SetConnectivityState(GRPC_CHANNEL_READY); // As each subchannel becomes READY, we should get a new picker that // includes the behavior. Note that there may be any number of diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 2aa1dcdc73d..3dd450b673c 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -2057,7 +2057,7 @@ TEST_F(RoundRobinTest, HealthChecking) { EXPECT_TRUE(WaitForChannelNotReady(channel.get())); CheckRpcSendFailure(DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE, "connections to all backends failing; last error: " - "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: " + "UNAVAILABLE: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: " "backend unhealthy"); // Clean up. EnableDefaultHealthCheckService(false); @@ -2116,7 +2116,7 @@ TEST_F(RoundRobinTest, WithHealthCheckingInhibitPerChannel) { EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1)); CheckRpcSendFailure(DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE, "connections to all backends failing; last error: " - "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: " + "UNAVAILABLE: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: " "backend unhealthy"); // Second channel should be READY. EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1)); @@ -2162,7 +2162,7 @@ TEST_F(RoundRobinTest, HealthCheckingServiceNamePerChannel) { EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1)); CheckRpcSendFailure(DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE, "connections to all backends failing; last error: " - "(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: " + "UNAVAILABLE: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: " "backend unhealthy"); // Second channel should be READY. EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1)); diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 771a3471091..c27cf5780a0 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1114,8 +1114,6 @@ src/core/ext/filters/client_channel/lb_policy/address_filtering.h \ src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h \ src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \ src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h \ -src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc \ -src/core/ext/filters/client_channel/lb_policy/endpoint_list.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 038c92baf2b..76f0add5f73 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -920,8 +920,6 @@ src/core/ext/filters/client_channel/lb_policy/address_filtering.h \ src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h \ src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \ src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h \ -src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc \ -src/core/ext/filters/client_channel/lb_policy/endpoint_list.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \