Revert "[round_robin] third attempt: delegate to pick_first as per dualstack design" (#34335)

Reverts grpc/grpc#34320
pull/34339/head
Mark D. Roth 1 year ago committed by GitHub
parent 8dcdd7a7e9
commit 6534f0a6bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      CMakeLists.txt
  2. 2
      Makefile
  3. 2
      Package.swift
  4. 18
      bazel/experiments.bzl
  5. 4
      build_autogenerated.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 2
      gRPC-C++.podspec
  9. 3
      gRPC-Core.podspec
  10. 2
      grpc.gemspec
  11. 2
      grpc.gyp
  12. 2
      package.xml
  13. 37
      src/core/BUILD
  14. 188
      src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc
  15. 214
      src/core/ext/filters/client_channel/lb_policy/endpoint_list.h
  16. 436
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  17. 24
      src/core/lib/experiments/experiments.cc
  18. 12
      src/core/lib/experiments/experiments.h
  19. 7
      src/core/lib/experiments/experiments.yaml
  20. 2
      src/core/lib/experiments/rollouts.yaml
  21. 1
      src/python/grpcio/grpc_core_dependencies.py
  22. 2
      test/core/client_channel/lb_policy/outlier_detection_test.cc
  23. 4
      test/core/client_channel/lb_policy/round_robin_test.cc
  24. 2
      test/cpp/end2end/BUILD
  25. 40
      test/cpp/end2end/client_lb_end2end_test.cc
  26. 2
      tools/doxygen/Doxyfile.c++.internal
  27. 2
      tools/doxygen/Doxyfile.core.internal

2
CMakeLists.txt generated

@ -1739,7 +1739,6 @@ add_library(grpc
src/core/ext/filters/client_channel/http_proxy_mapper.cc
src/core/ext/filters/client_channel/lb_policy/address_filtering.cc
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc
@ -2789,7 +2788,6 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/http_proxy_mapper.cc
src/core/ext/filters/client_channel/lb_policy/address_filtering.cc
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc

2
Makefile generated

@ -972,7 +972,6 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/http_proxy_mapper.cc \
src/core/ext/filters/client_channel/lb_policy/address_filtering.cc \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \
src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc \
@ -1876,7 +1875,6 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/http_proxy_mapper.cc \
src/core/ext/filters/client_channel/lb_policy/address_filtering.cc \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \
src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc \

2
Package.swift generated

@ -149,8 +149,6 @@ let package = Package(
"src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h",
"src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc",
"src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h",
"src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc",
"src/core/ext/filters/client_channel/lb_policy/endpoint_list.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc",

@ -60,15 +60,9 @@ EXPERIMENTS = {
"core_end2end_test": [
"work_stealing",
],
"cpp_lb_end2end_test": [
"round_robin_delegate_to_pick_first",
],
"flow_control_test": [
"lazier_stream_updates",
],
"xds_end2end_test": [
"round_robin_delegate_to_pick_first",
],
},
},
"ios": {
@ -114,15 +108,9 @@ EXPERIMENTS = {
"core_end2end_test": [
"work_stealing",
],
"cpp_lb_end2end_test": [
"round_robin_delegate_to_pick_first",
],
"flow_control_test": [
"lazier_stream_updates",
],
"xds_end2end_test": [
"round_robin_delegate_to_pick_first",
],
},
},
"posix": {
@ -178,15 +166,9 @@ EXPERIMENTS = {
"core_end2end_test": [
"work_stealing",
],
"cpp_lb_end2end_test": [
"round_robin_delegate_to_pick_first",
],
"flow_control_test": [
"lazier_stream_updates",
],
"xds_end2end_test": [
"round_robin_delegate_to_pick_first",
],
},
},
}

@ -232,7 +232,6 @@ libs:
- src/core/ext/filters/client_channel/lb_policy/address_filtering.h
- src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h
- src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h
- src/core/ext/filters/client_channel/lb_policy/endpoint_list.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h
@ -1049,7 +1048,6 @@ libs:
- src/core/ext/filters/client_channel/http_proxy_mapper.cc
- src/core/ext/filters/client_channel/lb_policy/address_filtering.cc
- src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
- src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc
@ -1981,7 +1979,6 @@ libs:
- src/core/ext/filters/client_channel/lb_policy/address_filtering.h
- src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h
- src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h
- src/core/ext/filters/client_channel/lb_policy/endpoint_list.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h
@ -2414,7 +2411,6 @@ libs:
- src/core/ext/filters/client_channel/http_proxy_mapper.cc
- src/core/ext/filters/client_channel/lb_policy/address_filtering.cc
- src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
- src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc

1
config.m4 generated

@ -59,7 +59,6 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/http_proxy_mapper.cc \
src/core/ext/filters/client_channel/lb_policy/address_filtering.cc \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \
src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc \

1
config.w32 generated

@ -24,7 +24,6 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\http_proxy_mapper.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\address_filtering.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\child_policy_handler.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\endpoint_list.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\grpclb\\client_load_reporting_filter.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\grpclb\\grpclb.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\grpclb\\grpclb_balancer_addresses.cc " +

2
gRPC-C++.podspec generated

@ -263,7 +263,6 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/lb_policy/address_filtering.h',
'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h',
'src/core/ext/filters/client_channel/lb_policy/endpoint_list.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h',
@ -1350,7 +1349,6 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/lb_policy/address_filtering.h',
'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h',
'src/core/ext/filters/client_channel/lb_policy/endpoint_list.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h',

3
gRPC-Core.podspec generated

@ -250,8 +250,6 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h',
'src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc',
'src/core/ext/filters/client_channel/lb_policy/endpoint_list.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc',
@ -2123,7 +2121,6 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/lb_policy/address_filtering.h',
'src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h',
'src/core/ext/filters/client_channel/lb_policy/endpoint_list.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h',

2
grpc.gemspec generated

@ -155,8 +155,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/endpoint_list.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc )

2
grpc.gyp generated

@ -288,7 +288,6 @@
'src/core/ext/filters/client_channel/http_proxy_mapper.cc',
'src/core/ext/filters/client_channel/lb_policy/address_filtering.cc',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc',
'src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc',
@ -1132,7 +1131,6 @@
'src/core/ext/filters/client_channel/http_proxy_mapper.cc',
'src/core/ext/filters/client_channel/lb_policy/address_filtering.cc',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc',
'src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc',

2
package.xml generated

@ -137,8 +137,6 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/endpoint_list.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc" role="src" />

@ -4776,41 +4776,6 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "lb_endpoint_list",
srcs = [
"ext/filters/client_channel/lb_policy/endpoint_list.cc",
],
hdrs = [
"ext/filters/client_channel/lb_policy/endpoint_list.h",
],
external_deps = [
"absl/functional:any_invocable",
"absl/status",
"absl/status:statusor",
"absl/types:optional",
],
language = "c++",
deps = [
"channel_args",
"delegating_helper",
"grpc_lb_policy_pick_first",
"json",
"lb_policy",
"lb_policy_registry",
"pollset_set",
"subchannel_interface",
"//:config",
"//:debug_location",
"//:gpr",
"//:grpc_base",
"//:orphanable",
"//:ref_counted_ptr",
"//:server_address",
"//:work_serializer",
],
)
grpc_cc_library(
name = "grpc_lb_policy_pick_first",
srcs = [
@ -4915,10 +4880,8 @@ grpc_cc_library(
language = "c++",
deps = [
"channel_args",
"experiments",
"grpc_lb_subchannel_list",
"json",
"lb_endpoint_list",
"lb_policy",
"lb_policy_factory",
"subchannel_interface",

@ -1,188 +0,0 @@
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/lb_policy/endpoint_list.h"
#include <stdlib.h>
#include <algorithm>
#include <memory>
#include <utility>
#include <vector>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/types/optional.h"
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/json.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/delegating_helper.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
#include "src/core/lib/resolver/server_address.h"
namespace grpc_core {
//
// EndpointList::Endpoint::Helper
//
class EndpointList::Endpoint::Helper
: public LoadBalancingPolicy::DelegatingChannelControlHelper {
public:
explicit Helper(RefCountedPtr<Endpoint> endpoint)
: endpoint_(std::move(endpoint)) {}
~Helper() override { endpoint_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const ChannelArgs& args) override {
return endpoint_->CreateSubchannel(std::move(address), args);
}
void UpdateState(
grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
auto old_state = std::exchange(endpoint_->connectivity_state_, state);
endpoint_->picker_ = std::move(picker);
endpoint_->OnStateUpdate(old_state, state, status);
}
private:
LoadBalancingPolicy::ChannelControlHelper* parent_helper() const override {
return endpoint_->endpoint_list_->channel_control_helper();
}
RefCountedPtr<Endpoint> endpoint_;
};
//
// EndpointList::Endpoint
//
void EndpointList::Endpoint::Init(
const ServerAddress& address, const ChannelArgs& args,
std::shared_ptr<WorkSerializer> work_serializer) {
ChannelArgs child_args =
args.Set(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING, true)
.Set(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX, true);
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer = std::move(work_serializer);
lb_policy_args.args = child_args;
lb_policy_args.channel_control_helper =
std::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
child_policy_ =
CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
"pick_first", std::move(lb_policy_args));
if (GPR_UNLIKELY(endpoint_list_->tracer_ != nullptr)) {
gpr_log(GPR_INFO, "[%s %p] endpoint %p: created child policy %p",
endpoint_list_->tracer_, endpoint_list_->policy_.get(), this,
child_policy_.get());
}
// Add our interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
// this policy, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(
child_policy_->interested_parties(),
endpoint_list_->policy_->interested_parties());
// Construct pick_first config.
auto config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
Json::FromArray(
{Json::FromObject({{"pick_first", Json::FromObject({})}})}));
GPR_ASSERT(config.ok());
// Update child policy.
LoadBalancingPolicy::UpdateArgs update_args;
update_args.addresses.emplace().emplace_back(address);
update_args.args = child_args;
update_args.config = std::move(*config);
// TODO(roth): If the child reports a non-OK status with the update,
// we need to propagate that back to the resolver somehow.
(void)child_policy_->UpdateLocked(std::move(update_args));
}
void EndpointList::Endpoint::Orphan() {
// Remove pollset_set linkage.
grpc_pollset_set_del_pollset_set(
child_policy_->interested_parties(),
endpoint_list_->policy_->interested_parties());
child_policy_.reset();
picker_.reset();
Unref();
}
void EndpointList::Endpoint::ResetBackoffLocked() {
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
}
void EndpointList::Endpoint::ExitIdleLocked() {
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
}
size_t EndpointList::Endpoint::Index() const {
for (size_t i = 0; i < endpoint_list_->endpoints_.size(); ++i) {
if (endpoint_list_->endpoints_[i].get() == this) return i;
}
return -1;
}
RefCountedPtr<SubchannelInterface> EndpointList::Endpoint::CreateSubchannel(
ServerAddress address, const ChannelArgs& args) {
return endpoint_list_->channel_control_helper()->CreateSubchannel(
std::move(address), args);
}
//
// EndpointList
//
void EndpointList::Init(
const ServerAddressList& addresses, const ChannelArgs& args,
absl::AnyInvocable<OrphanablePtr<Endpoint>(
RefCountedPtr<EndpointList>, const ServerAddress&, const ChannelArgs&)>
create_endpoint) {
for (const ServerAddress& address : addresses) {
endpoints_.push_back(
create_endpoint(Ref(DEBUG_LOCATION, "Endpoint"), address, args));
}
}
void EndpointList::ResetBackoffLocked() {
for (const auto& endpoint : endpoints_) {
endpoint->ResetBackoffLocked();
}
}
bool EndpointList::AllEndpointsSeenInitialState() const {
for (const auto& endpoint : endpoints_) {
if (!endpoint->connectivity_state().has_value()) return false;
}
return true;
}
} // namespace grpc_core

@ -1,214 +0,0 @@
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ENDPOINT_LIST_H
#define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ENDPOINT_LIST_H
#include <grpc/support/port_platform.h>
#include <stdlib.h>
#include <memory>
#include <utility>
#include <vector>
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include <grpc/impl/connectivity_state.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
#include "src/core/lib/resolver/server_address.h"
namespace grpc_core {
// A list of endpoints for use in a petiole LB policy. Each endpoint may
// have one or more addresses, which will be passed down to a pick_first
// child policy.
//
// To use this, a petiole policy must define its own subclass of both
// EndpointList and EndpointList::Endpoint, like so:
/*
class MyEndpointList : public EndpointList {
public:
MyEndpointList(RefCountedPtr<MyLbPolicy> lb_policy,
const ServerAddressList& addresses, const ChannelArgs& args)
: EndpointList(std::move(lb_policy),
GRPC_TRACE_FLAG_ENABLED(grpc_my_tracer)
? "MyEndpointList"
: nullptr) {
Init(addresses, args,
[&](RefCountedPtr<MyEndpointList> endpoint_list,
const ServerAddress& address, const ChannelArgs& args) {
return MakeOrphanable<MyEndpoint>(
std::move(endpoint_list), address, args,
policy<MyLbPolicy>()->work_serializer());
});
}
private:
class MyEndpoint : public Endpoint {
public:
MyEndpoint(RefCountedPtr<MyEndpointList> endpoint_list,
const ServerAddress& address, const ChannelArgs& args,
std::shared_ptr<WorkSerializer> work_serializer)
: Endpoint(std::move(endpoint_list)) {
Init(address, args, std::move(work_serializer));
}
private:
void OnStateUpdate(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state,
const absl::Status& status) override {
// ...handle connectivity state change...
}
};
LoadBalancingPolicy::ChannelControlHelper* channel_control_helper()
const override {
return policy<MyLbPolicy>()->channel_control_helper();
}
};
*/
// TODO(roth): Consider wrapping this in an LB policy subclass for petiole
// policies to inherit from.
class EndpointList : public InternallyRefCounted<EndpointList> {
public:
// An individual endpoint.
class Endpoint : public InternallyRefCounted<Endpoint> {
public:
~Endpoint() override { endpoint_list_.reset(DEBUG_LOCATION, "Endpoint"); }
void Orphan() override;
void ResetBackoffLocked();
void ExitIdleLocked();
absl::optional<grpc_connectivity_state> connectivity_state() const {
return connectivity_state_;
}
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker() const {
return picker_;
}
protected:
// We use two-phase initialization here to ensure that the vtable is
// initialized before we need to use it. Subclass must invoke Init()
// from inside its ctor.
explicit Endpoint(RefCountedPtr<EndpointList> endpoint_list)
: endpoint_list_(std::move(endpoint_list)) {}
void Init(const ServerAddress& address, const ChannelArgs& args,
std::shared_ptr<WorkSerializer> work_serializer);
// Templated for convenience, to provide a short-hand for
// down-casting in the caller.
template <typename T>
T* endpoint_list() const {
return static_cast<T*>(endpoint_list_.get());
}
// Templated for convenience, to provide a short-hand for down-casting
// in the caller.
template <typename T>
T* policy() const {
return endpoint_list_->policy<T>();
}
// Returns the index of this endpoint within the EndpointList.
// Intended for trace logging.
size_t Index() const;
private:
class Helper;
// Called when the child policy reports a connectivity state update.
virtual void OnStateUpdate(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state, const absl::Status& status) = 0;
// Called to create a subchannel. Subclasses may override.
virtual RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const ChannelArgs& args);
RefCountedPtr<EndpointList> endpoint_list_;
OrphanablePtr<LoadBalancingPolicy> child_policy_;
absl::optional<grpc_connectivity_state> connectivity_state_;
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_;
};
~EndpointList() override { policy_.reset(DEBUG_LOCATION, "EndpointList"); }
void Orphan() override {
endpoints_.clear();
Unref();
}
size_t size() const { return endpoints_.size(); }
const std::vector<OrphanablePtr<Endpoint>>& endpoints() const {
return endpoints_;
}
void ResetBackoffLocked();
protected:
// We use two-phase initialization here to ensure that the vtable is
// initialized before we need to use it. Subclass must invoke Init()
// from inside its ctor.
EndpointList(RefCountedPtr<LoadBalancingPolicy> policy, const char* tracer)
: policy_(std::move(policy)), tracer_(tracer) {}
void Init(const ServerAddressList& addresses, const ChannelArgs& args,
absl::AnyInvocable<OrphanablePtr<Endpoint>(
RefCountedPtr<EndpointList>, const ServerAddress&,
const ChannelArgs&)>
create_endpoint);
// Templated for convenience, to provide a short-hand for down-casting
// in the caller.
template <typename T>
T* policy() const {
return static_cast<T*>(policy_.get());
}
// Returns true if all endpoints have seen their initial connectivity
// state notification.
bool AllEndpointsSeenInitialState() const;
private:
// Returns the parent policy's helper. Needed because the accessor
// method is protected on LoadBalancingPolicy.
virtual LoadBalancingPolicy::ChannelControlHelper* channel_control_helper()
const = 0;
RefCountedPtr<LoadBalancingPolicy> policy_;
const char* tracer_;
std::vector<OrphanablePtr<Endpoint>> endpoints_;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_ENDPOINT_LIST_H

@ -37,12 +37,10 @@
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/endpoint_list.h"
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -61,14 +59,14 @@ TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
namespace {
//
// legacy round_robin LB policy (before dualstack support)
// round_robin LB policy
//
constexpr absl::string_view kRoundRobin = "round_robin";
class OldRoundRobin : public LoadBalancingPolicy {
class RoundRobin : public LoadBalancingPolicy {
public:
explicit OldRoundRobin(Args args);
explicit RoundRobin(Args args);
absl::string_view name() const override { return kRoundRobin; }
@ -76,7 +74,7 @@ class OldRoundRobin : public LoadBalancingPolicy {
void ResetBackoffLocked() override;
private:
~OldRoundRobin() override;
~RoundRobin() override;
// Forward declaration.
class RoundRobinSubchannelList;
@ -124,7 +122,7 @@ class OldRoundRobin : public LoadBalancingPolicy {
: public SubchannelList<RoundRobinSubchannelList,
RoundRobinSubchannelData> {
public:
RoundRobinSubchannelList(OldRoundRobin* policy, ServerAddressList addresses,
RoundRobinSubchannelList(RoundRobin* policy, ServerAddressList addresses,
const ChannelArgs& args)
: SubchannelList(policy,
(GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)
@ -139,7 +137,7 @@ class OldRoundRobin : public LoadBalancingPolicy {
}
~RoundRobinSubchannelList() override {
OldRoundRobin* p = static_cast<OldRoundRobin*>(policy());
RoundRobin* p = static_cast<RoundRobin*>(policy());
p->Unref(DEBUG_LOCATION, "subchannel_list");
}
@ -157,7 +155,7 @@ class OldRoundRobin : public LoadBalancingPolicy {
private:
std::shared_ptr<WorkSerializer> work_serializer() const override {
return static_cast<OldRoundRobin*>(policy())->work_serializer();
return static_cast<RoundRobin*>(policy())->work_serializer();
}
std::string CountersString() const {
@ -176,13 +174,13 @@ class OldRoundRobin : public LoadBalancingPolicy {
class Picker : public SubchannelPicker {
public:
Picker(OldRoundRobin* parent, RoundRobinSubchannelList* subchannel_list);
Picker(RoundRobin* parent, RoundRobinSubchannelList* subchannel_list);
PickResult Pick(PickArgs args) override;
private:
// Using pointer value only, no ref held -- do not dereference!
OldRoundRobin* parent_;
RoundRobin* parent_;
std::atomic<size_t> last_picked_index_;
std::vector<RefCountedPtr<SubchannelInterface>> subchannels_;
@ -204,11 +202,11 @@ class OldRoundRobin : public LoadBalancingPolicy {
};
//
// OldRoundRobin::Picker
// RoundRobin::Picker
//
OldRoundRobin::Picker::Picker(OldRoundRobin* parent,
RoundRobinSubchannelList* subchannel_list)
RoundRobin::Picker::Picker(RoundRobin* parent,
RoundRobinSubchannelList* subchannel_list)
: parent_(parent) {
for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
RoundRobinSubchannelData* sd = subchannel_list->subchannel(i);
@ -230,7 +228,7 @@ OldRoundRobin::Picker::Picker(OldRoundRobin* parent,
}
}
OldRoundRobin::PickResult OldRoundRobin::Picker::Pick(PickArgs /*args*/) {
RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs /*args*/) {
size_t index = last_picked_index_.fetch_add(1, std::memory_order_relaxed) %
subchannels_.size();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
@ -245,13 +243,13 @@ OldRoundRobin::PickResult OldRoundRobin::Picker::Pick(PickArgs /*args*/) {
// RoundRobin
//
OldRoundRobin::OldRoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) {
RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Created", this);
}
}
OldRoundRobin::~OldRoundRobin() {
RoundRobin::~RoundRobin() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
}
@ -259,7 +257,7 @@ OldRoundRobin::~OldRoundRobin() {
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
}
void OldRoundRobin::ShutdownLocked() {
void RoundRobin::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
}
@ -268,14 +266,14 @@ void OldRoundRobin::ShutdownLocked() {
latest_pending_subchannel_list_.reset();
}
void OldRoundRobin::ResetBackoffLocked() {
void RoundRobin::ResetBackoffLocked() {
subchannel_list_->ResetBackoffLocked();
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->ResetBackoffLocked();
}
}
absl::Status OldRoundRobin::UpdateLocked(UpdateArgs args) {
absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
ServerAddressList addresses;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
@ -331,7 +329,7 @@ absl::Status OldRoundRobin::UpdateLocked(UpdateArgs args) {
// RoundRobinSubchannelList
//
void OldRoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) {
if (old_state.has_value()) {
@ -357,9 +355,9 @@ void OldRoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
}
}
void OldRoundRobin::RoundRobinSubchannelList::
void RoundRobin::RoundRobinSubchannelList::
MaybeUpdateRoundRobinConnectivityStateLocked(absl::Status status_for_tf) {
OldRoundRobin* p = static_cast<OldRoundRobin*>(policy());
RoundRobin* p = static_cast<RoundRobin*>(policy());
// If this is latest_pending_subchannel_list_, then swap it into
// subchannel_list_ in the following cases:
// - subchannel_list_ has no READY subchannels.
@ -426,10 +424,10 @@ void OldRoundRobin::RoundRobinSubchannelList::
// RoundRobinSubchannelData
//
void OldRoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) {
OldRoundRobin* p = static_cast<OldRoundRobin*>(subchannel_list()->policy());
RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
GPR_ASSERT(subchannel() != nullptr);
// If this is not the initial state notification and the new state is
// TRANSIENT_FAILURE or IDLE, re-resolve.
@ -459,10 +457,9 @@ void OldRoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
connectivity_status());
}
void OldRoundRobin::RoundRobinSubchannelData::
UpdateLogicalConnectivityStateLocked(
grpc_connectivity_state connectivity_state) {
OldRoundRobin* p = static_cast<OldRoundRobin*>(subchannel_list()->policy());
void RoundRobin::RoundRobinSubchannelData::UpdateLogicalConnectivityStateLocked(
grpc_connectivity_state connectivity_state) {
RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(
GPR_INFO,
@ -506,384 +503,6 @@ void OldRoundRobin::RoundRobinSubchannelData::
logical_connectivity_state_ = connectivity_state;
}
//
// round_robin LB policy (with dualstack changes)
//
class RoundRobin : public LoadBalancingPolicy {
public:
explicit RoundRobin(Args args);
absl::string_view name() const override { return kRoundRobin; }
absl::Status UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;
private:
class RoundRobinEndpointList : public EndpointList {
public:
RoundRobinEndpointList(RefCountedPtr<RoundRobin> round_robin,
const ServerAddressList& addresses,
const ChannelArgs& args)
: EndpointList(std::move(round_robin),
GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)
? "RoundRobinEndpointList"
: nullptr) {
Init(addresses, args,
[&](RefCountedPtr<RoundRobinEndpointList> endpoint_list,
const ServerAddress& address, const ChannelArgs& args) {
return MakeOrphanable<RoundRobinEndpoint>(
std::move(endpoint_list), address, args,
policy<RoundRobin>()->work_serializer());
});
}
private:
class RoundRobinEndpoint : public Endpoint {
public:
RoundRobinEndpoint(RefCountedPtr<RoundRobinEndpointList> endpoint_list,
const ServerAddress& address, const ChannelArgs& args,
std::shared_ptr<WorkSerializer> work_serializer)
: Endpoint(std::move(endpoint_list)) {
Init(address, args, std::move(work_serializer));
}
private:
// Called when the child policy reports a connectivity state update.
void OnStateUpdate(absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state,
const absl::Status& status) override;
};
LoadBalancingPolicy::ChannelControlHelper* channel_control_helper()
const override {
return policy<RoundRobin>()->channel_control_helper();
}
// Updates the counters of children in each state when a
// child transitions from old_state to new_state.
void UpdateStateCountersLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state);
// Ensures that the right child list is used and then updates
// the RR policy's connectivity state based on the child list's
// state counters.
void MaybeUpdateRoundRobinConnectivityStateLocked(
absl::Status status_for_tf);
std::string CountersString() const {
return absl::StrCat("num_children=", size(), " num_ready=", num_ready_,
" num_connecting=", num_connecting_,
" num_transient_failure=", num_transient_failure_);
}
size_t num_ready_ = 0;
size_t num_connecting_ = 0;
size_t num_transient_failure_ = 0;
absl::Status last_failure_;
};
class Picker : public SubchannelPicker {
public:
Picker(RoundRobin* parent,
std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>>
pickers);
PickResult Pick(PickArgs args) override;
private:
// Using pointer value only, no ref held -- do not dereference!
RoundRobin* parent_;
std::atomic<size_t> last_picked_index_;
std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers_;
};
~RoundRobin() override;
void ShutdownLocked() override;
// Current child list.
OrphanablePtr<RoundRobinEndpointList> endpoint_list_;
// Latest pending child list.
// When we get an updated address list, we create a new child list
// for it here, and we wait to swap it into endpoint_list_ until the new
// list becomes READY.
OrphanablePtr<RoundRobinEndpointList> latest_pending_endpoint_list_;
bool shutdown_ = false;
absl::BitGen bit_gen_;
};
//
// RoundRobin::Picker
//
RoundRobin::Picker::Picker(
RoundRobin* parent,
std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers)
: parent_(parent), pickers_(std::move(pickers)) {
// For discussion on why we generate a random starting index for
// the picker, see https://github.com/grpc/grpc-go/issues/2580.
size_t index = absl::Uniform<size_t>(parent->bit_gen_, 0, pickers_.size());
last_picked_index_.store(index, std::memory_order_relaxed);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
"[RR %p picker %p] created picker from endpoint_list=%p "
"with %" PRIuPTR " READY children; last_picked_index_=%" PRIuPTR,
parent_, this, parent_->endpoint_list_.get(), pickers_.size(),
index);
}
}
RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs args) {
size_t index = last_picked_index_.fetch_add(1, std::memory_order_relaxed) %
pickers_.size();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
"[RR %p picker %p] using picker index %" PRIuPTR ", picker=%p",
parent_, this, index, pickers_[index].get());
}
return pickers_[index]->Pick(args);
}
//
// RoundRobin
//
RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Created", this);
}
}
RoundRobin::~RoundRobin() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
}
GPR_ASSERT(endpoint_list_ == nullptr);
GPR_ASSERT(latest_pending_endpoint_list_ == nullptr);
}
void RoundRobin::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
}
shutdown_ = true;
endpoint_list_.reset();
latest_pending_endpoint_list_.reset();
}
void RoundRobin::ResetBackoffLocked() {
endpoint_list_->ResetBackoffLocked();
if (latest_pending_endpoint_list_ != nullptr) {
latest_pending_endpoint_list_->ResetBackoffLocked();
}
}
absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
ServerAddressList addresses;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
this, args.addresses->size());
}
addresses = std::move(*args.addresses);
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this,
args.addresses.status().ToString().c_str());
}
// If we already have a child list, then keep using the existing
// list, but still report back that the update was not accepted.
if (endpoint_list_ != nullptr) return args.addresses.status();
}
// Create new child list, replacing the previous pending list, if any.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) &&
latest_pending_endpoint_list_ != nullptr) {
gpr_log(GPR_INFO, "[RR %p] replacing previous pending child list %p", this,
latest_pending_endpoint_list_.get());
}
latest_pending_endpoint_list_ = MakeOrphanable<RoundRobinEndpointList>(
Ref(DEBUG_LOCATION, "RoundRobinEndpointList"), std::move(addresses),
args.args);
// If the new list is empty, immediately promote it to
// endpoint_list_ and report TRANSIENT_FAILURE.
// TODO(roth): As part of adding dualstack backend support, we need to
// also handle the case where the list of addresses for a given
// endpoint is empty.
if (latest_pending_endpoint_list_->size() == 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) &&
endpoint_list_ != nullptr) {
gpr_log(GPR_INFO, "[RR %p] replacing previous child list %p", this,
endpoint_list_.get());
}
endpoint_list_ = std::move(latest_pending_endpoint_list_);
absl::Status status =
args.addresses.ok() ? absl::UnavailableError(absl::StrCat(
"empty address list: ", args.resolution_note))
: args.addresses.status();
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
return status;
}
// Otherwise, if this is the initial update, immediately promote it to
// endpoint_list_.
if (endpoint_list_ == nullptr) {
endpoint_list_ = std::move(latest_pending_endpoint_list_);
}
return absl::OkStatus();
}
//
// RoundRobin::RoundRobinEndpointList::RoundRobinEndpoint
//
void RoundRobin::RoundRobinEndpointList::RoundRobinEndpoint::OnStateUpdate(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state, const absl::Status& status) {
auto* rr_endpoint_list = endpoint_list<RoundRobinEndpointList>();
auto* round_robin = policy<RoundRobin>();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
"[RR %p] connectivity changed for child %p, endpoint_list %p "
"(index %" PRIuPTR " of %" PRIuPTR
"): prev_state=%s new_state=%s "
"(%s)",
round_robin, this, rr_endpoint_list, Index(),
rr_endpoint_list->size(),
(old_state.has_value() ? ConnectivityStateName(*old_state) : "N/A"),
ConnectivityStateName(new_state), status.ToString().c_str());
}
if (new_state == GRPC_CHANNEL_IDLE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] child %p reported IDLE; requesting connection",
round_robin, this);
}
ExitIdleLocked();
}
// If state changed, update state counters.
if (!old_state.has_value() || *old_state != new_state) {
rr_endpoint_list->UpdateStateCountersLocked(old_state, new_state);
}
// Update the policy state.
rr_endpoint_list->MaybeUpdateRoundRobinConnectivityStateLocked(status);
}
//
// RoundRobin::RoundRobinEndpointList
//
void RoundRobin::RoundRobinEndpointList::UpdateStateCountersLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) {
// We treat IDLE the same as CONNECTING, since it will immediately
// transition into that state anyway.
if (old_state.has_value()) {
GPR_ASSERT(*old_state != GRPC_CHANNEL_SHUTDOWN);
if (*old_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(num_ready_ > 0);
--num_ready_;
} else if (*old_state == GRPC_CHANNEL_CONNECTING ||
*old_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(num_connecting_ > 0);
--num_connecting_;
} else if (*old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(num_transient_failure_ > 0);
--num_transient_failure_;
}
}
GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
if (new_state == GRPC_CHANNEL_READY) {
++num_ready_;
} else if (new_state == GRPC_CHANNEL_CONNECTING ||
new_state == GRPC_CHANNEL_IDLE) {
++num_connecting_;
} else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++num_transient_failure_;
}
}
void RoundRobin::RoundRobinEndpointList::
MaybeUpdateRoundRobinConnectivityStateLocked(absl::Status status_for_tf) {
auto* round_robin = policy<RoundRobin>();
// If this is latest_pending_endpoint_list_, then swap it into
// endpoint_list_ in the following cases:
// - endpoint_list_ has no READY children.
// - This list has at least one READY child and we have seen the
// initial connectivity state notification for all children.
// - All of the children in this list are in TRANSIENT_FAILURE.
// (This may cause the channel to go from READY to TRANSIENT_FAILURE,
// but we're doing what the control plane told us to do.)
if (round_robin->latest_pending_endpoint_list_.get() == this &&
(round_robin->endpoint_list_->num_ready_ == 0 ||
(num_ready_ > 0 && AllEndpointsSeenInitialState()) ||
num_transient_failure_ == size())) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
const std::string old_counters_string =
round_robin->endpoint_list_ != nullptr
? round_robin->endpoint_list_->CountersString()
: "";
gpr_log(GPR_INFO,
"[RR %p] swapping out child list %p (%s) in favor of %p (%s)",
round_robin, round_robin->endpoint_list_.get(),
old_counters_string.c_str(), this, CountersString().c_str());
}
round_robin->endpoint_list_ =
std::move(round_robin->latest_pending_endpoint_list_);
}
// Only set connectivity state if this is the current child list.
if (round_robin->endpoint_list_.get() != this) return;
// FIXME: scan children each time instead of keeping counters?
// First matching rule wins:
// 1) ANY child is READY => policy is READY.
// 2) ANY child is CONNECTING => policy is CONNECTING.
// 3) ALL children are TRANSIENT_FAILURE => policy is TRANSIENT_FAILURE.
if (num_ready_ > 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] reporting READY with child list %p",
round_robin, this);
}
std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers;
for (const auto& endpoint : endpoints()) {
auto state = endpoint->connectivity_state();
if (state.has_value() && *state == GRPC_CHANNEL_READY) {
pickers.push_back(endpoint->picker());
}
}
GPR_ASSERT(!pickers.empty());
round_robin->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::OkStatus(),
MakeRefCounted<Picker>(round_robin, std::move(pickers)));
} else if (num_connecting_ > 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] reporting CONNECTING with child list %p",
round_robin, this);
}
round_robin->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
MakeRefCounted<QueuePicker>(nullptr));
} else if (num_transient_failure_ == size()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
"[RR %p] reporting TRANSIENT_FAILURE with child list %p: %s",
round_robin, this, status_for_tf.ToString().c_str());
}
if (!status_for_tf.ok()) {
last_failure_ = absl::UnavailableError(
absl::StrCat("connections to all backends failing; last error: ",
status_for_tf.message()));
}
round_robin->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, last_failure_,
MakeRefCounted<TransientFailurePicker>(last_failure_));
}
}
//
// factory
//
@ -897,9 +516,6 @@ class RoundRobinFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
if (!IsRoundRobinDelegateToPickFirstEnabled()) {
return MakeOrphanable<OldRoundRobin>(std::move(args));
}
return MakeOrphanable<RoundRobin>(std::move(args));
}

@ -105,11 +105,6 @@ const char* const description_jitter_max_idle =
"only on max connection age, but it seems like this could smooth out some "
"herding problems.";
const char* const additional_constraints_jitter_max_idle = "{}";
const char* const description_round_robin_delegate_to_pick_first =
"Change round_robin code to delegate to pick_first as per dualstack "
"backend design.";
const char* const additional_constraints_round_robin_delegate_to_pick_first =
"{}";
} // namespace
namespace grpc_core {
@ -163,9 +158,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_lazier_stream_updates, true, true},
{"jitter_max_idle", description_jitter_max_idle,
additional_constraints_jitter_max_idle, true, true},
{"round_robin_delegate_to_pick_first",
description_round_robin_delegate_to_pick_first,
additional_constraints_round_robin_delegate_to_pick_first, true, true},
};
} // namespace grpc_core
@ -255,11 +247,6 @@ const char* const description_jitter_max_idle =
"only on max connection age, but it seems like this could smooth out some "
"herding problems.";
const char* const additional_constraints_jitter_max_idle = "{}";
const char* const description_round_robin_delegate_to_pick_first =
"Change round_robin code to delegate to pick_first as per dualstack "
"backend design.";
const char* const additional_constraints_round_robin_delegate_to_pick_first =
"{}";
} // namespace
namespace grpc_core {
@ -313,9 +300,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_lazier_stream_updates, true, true},
{"jitter_max_idle", description_jitter_max_idle,
additional_constraints_jitter_max_idle, true, true},
{"round_robin_delegate_to_pick_first",
description_round_robin_delegate_to_pick_first,
additional_constraints_round_robin_delegate_to_pick_first, true, true},
};
} // namespace grpc_core
@ -405,11 +389,6 @@ const char* const description_jitter_max_idle =
"only on max connection age, but it seems like this could smooth out some "
"herding problems.";
const char* const additional_constraints_jitter_max_idle = "{}";
const char* const description_round_robin_delegate_to_pick_first =
"Change round_robin code to delegate to pick_first as per dualstack "
"backend design.";
const char* const additional_constraints_round_robin_delegate_to_pick_first =
"{}";
} // namespace
namespace grpc_core {
@ -463,9 +442,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_lazier_stream_updates, true, true},
{"jitter_max_idle", description_jitter_max_idle,
additional_constraints_jitter_max_idle, true, true},
{"round_robin_delegate_to_pick_first",
description_round_robin_delegate_to_pick_first,
additional_constraints_round_robin_delegate_to_pick_first, true, true},
};
} // namespace grpc_core

@ -87,8 +87,6 @@ inline bool IsKeepaliveServerFixEnabled() { return false; }
inline bool IsLazierStreamUpdatesEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE
inline bool IsJitterMaxIdleEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST
inline bool IsRoundRobinDelegateToPickFirstEnabled() { return true; }
#elif defined(GPR_WINDOWS)
inline bool IsTcpFrameSizeTuningEnabled() { return false; }
@ -119,8 +117,6 @@ inline bool IsKeepaliveServerFixEnabled() { return false; }
inline bool IsLazierStreamUpdatesEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE
inline bool IsJitterMaxIdleEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST
inline bool IsRoundRobinDelegateToPickFirstEnabled() { return true; }
#else
inline bool IsTcpFrameSizeTuningEnabled() { return false; }
@ -151,8 +147,6 @@ inline bool IsKeepaliveServerFixEnabled() { return false; }
inline bool IsLazierStreamUpdatesEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE
inline bool IsJitterMaxIdleEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST
inline bool IsRoundRobinDelegateToPickFirstEnabled() { return true; }
#endif
#else
@ -208,12 +202,8 @@ inline bool IsKeepaliveServerFixEnabled() { return IsExperimentEnabled(20); }
inline bool IsLazierStreamUpdatesEnabled() { return IsExperimentEnabled(21); }
#define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE
inline bool IsJitterMaxIdleEnabled() { return IsExperimentEnabled(22); }
#define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST
inline bool IsRoundRobinDelegateToPickFirstEnabled() {
return IsExperimentEnabled(23);
}
constexpr const size_t kNumExperiments = 24;
constexpr const size_t kNumExperiments = 23;
extern const ExperimentMetadata g_experiment_metadata[kNumExperiments];
#endif

@ -185,10 +185,3 @@
owner: ctiller@google.com
test_tags: []
allow_in_fuzzing_config: true
- name: round_robin_delegate_to_pick_first
description:
Change round_robin code to delegate to pick_first as per dualstack
backend design.
expiry: 2023/11/15
owner: roth@google.com
test_tags: ["cpp_lb_end2end_test", "xds_end2end_test"]

@ -94,5 +94,3 @@
default: true
- name: jitter_max_idle
default: true
- name: round_robin_delegate_to_pick_first
default: true

@ -33,7 +33,6 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/http_proxy_mapper.cc',
'src/core/ext/filters/client_channel/lb_policy/address_filtering.cc',
'src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc',
'src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc',

@ -229,6 +229,8 @@ TEST_F(OutlierDetectionTest, FailurePercentage) {
time_cache_.IncrementBy(Duration::Seconds(10));
RunTimerCallback();
gpr_log(GPR_INFO, "### ejection complete");
// Expect a re-resolution request.
ExpectReresolutionRequest();
// Expect a picker update.
std::vector<absl::string_view> remaining_addresses;
for (const auto& addr : kAddresses) {

@ -42,6 +42,8 @@ class RoundRobinTest : public LoadBalancingPolicyTest {
void ExpectStartup(absl::Span<const absl::string_view> addresses) {
EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, nullptr), lb_policy_.get()),
absl::OkStatus());
// Expect the initial CONNECTNG update with a picker that queues.
ExpectConnectingUpdate();
// RR should have created a subchannel for each address.
for (size_t i = 0; i < addresses.size(); ++i) {
auto* subchannel = FindSubchannel(addresses[i]);
@ -50,8 +52,6 @@ class RoundRobinTest : public LoadBalancingPolicyTest {
EXPECT_TRUE(subchannel->ConnectionRequested());
// The subchannel will connect successfully.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// Expect the initial CONNECTNG update with a picker that queues.
if (i == 0) ExpectConnectingUpdate();
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
// As each subchannel becomes READY, we should get a new picker that
// includes the behavior. Note that there may be any number of

@ -513,7 +513,6 @@ grpc_cc_test(
flaky = True, # TODO(b/151315347)
tags = [
"cpp_end2end_test",
"cpp_lb_end2end_test",
"no_windows",
], # TODO(jtattermusch): fix test on windows
deps = [
@ -598,7 +597,6 @@ grpc_cc_test(
flaky = True, # TODO(b/150567713)
tags = [
"cpp_end2end_test",
"cpp_lb_end2end_test",
"no_windows",
], # TODO(jtattermusch): fix test on windows
deps = [

@ -56,7 +56,6 @@
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/env.h"
@ -2056,15 +2055,10 @@ TEST_F(RoundRobinTest, HealthChecking) {
servers_[1]->SetServingStatus("health_check_service_name", false);
servers_[2]->SetServingStatus("health_check_service_name", false);
EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
CheckRpcSendFailure(
DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
grpc_core::IsRoundRobinDelegateToPickFirstEnabled()
? "connections to all backends failing; last error: "
"(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
"backend unhealthy"
: "connections to all backends failing; last error: "
"UNAVAILABLE: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
"backend unhealthy");
CheckRpcSendFailure(DEBUG_LOCATION, stub, StatusCode::UNAVAILABLE,
"connections to all backends failing; last error: "
"UNAVAILABLE: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
"backend unhealthy");
// Clean up.
EnableDefaultHealthCheckService(false);
}
@ -2120,15 +2114,10 @@ TEST_F(RoundRobinTest, WithHealthCheckingInhibitPerChannel) {
// First channel should not become READY, because health checks should be
// failing.
EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
CheckRpcSendFailure(
DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE,
grpc_core::IsRoundRobinDelegateToPickFirstEnabled()
? "connections to all backends failing; last error: "
"(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
"backend unhealthy"
: "connections to all backends failing; last error: "
"UNAVAILABLE: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
"backend unhealthy");
CheckRpcSendFailure(DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE,
"connections to all backends failing; last error: "
"UNAVAILABLE: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
"backend unhealthy");
// Second channel should be READY.
EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
CheckRpcSendOk(DEBUG_LOCATION, stub2);
@ -2171,15 +2160,10 @@ TEST_F(RoundRobinTest, HealthCheckingServiceNamePerChannel) {
// First channel should not become READY, because health checks should be
// failing.
EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
CheckRpcSendFailure(
DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE,
grpc_core::IsRoundRobinDelegateToPickFirstEnabled()
? "connections to all backends failing; last error: "
"(ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
"backend unhealthy"
: "connections to all backends failing; last error: "
"UNAVAILABLE: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
"backend unhealthy");
CheckRpcSendFailure(DEBUG_LOCATION, stub1, StatusCode::UNAVAILABLE,
"connections to all backends failing; last error: "
"UNAVAILABLE: (ipv6:%5B::1%5D|ipv4:127.0.0.1):[0-9]+: "
"backend unhealthy");
// Second channel should be READY.
EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
CheckRpcSendOk(DEBUG_LOCATION, stub2);

@ -1114,8 +1114,6 @@ src/core/ext/filters/client_channel/lb_policy/address_filtering.h \
src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h \
src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc \
src/core/ext/filters/client_channel/lb_policy/endpoint_list.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \

@ -920,8 +920,6 @@ src/core/ext/filters/client_channel/lb_policy/address_filtering.h \
src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc \
src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h \
src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc \
src/core/ext/filters/client_channel/lb_policy/endpoint_list.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \

Loading…
Cancel
Save