Revert "Revert "[lb pick_first] Enable random shuffling of address list" (#33497)

Original: #33496 

This reverts commit d59c8eb0f5.
pull/33505/head
Eugene Ostroukhov 1 year ago committed by GitHub
parent f8dd1a05dc
commit 7bce35ed41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      CMakeLists.txt
  2. 3
      build_autogenerated.yaml
  3. 1
      gRPC-C++.podspec
  4. 1
      gRPC-Core.podspec
  5. 2
      grpc.gyp
  6. 6
      src/core/BUILD
  7. 58
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  8. 1
      test/core/client_channel/lb_policy/BUILD
  9. 2
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  10. 164
      test/core/client_channel/lb_policy/pick_first_test.cc
  11. 8
      test/core/client_channel/lb_policy/round_robin_test.cc
  12. 3
      test/core/client_channel/lb_policy/xds_override_host_test.cc
  13. 8
      test/core/util/test_lb_policies.cc

10
CMakeLists.txt generated

@ -2478,6 +2478,7 @@ target_link_libraries(grpc
${_gRPC_RE2_LIBRARIES} ${_gRPC_RE2_LIBRARIES}
${_gRPC_UPB_LIBRARIES} ${_gRPC_UPB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES}
absl::algorithm_container
absl::cleanup absl::cleanup
absl::flat_hash_map absl::flat_hash_map
absl::flat_hash_set absl::flat_hash_set
@ -3122,6 +3123,7 @@ target_link_libraries(grpc_unsecure
${_gRPC_RE2_LIBRARIES} ${_gRPC_RE2_LIBRARIES}
${_gRPC_UPB_LIBRARIES} ${_gRPC_UPB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES}
absl::algorithm_container
absl::cleanup absl::cleanup
absl::flat_hash_map absl::flat_hash_map
absl::flat_hash_set absl::flat_hash_set
@ -30869,7 +30871,7 @@ generate_pkgconfig(
"gRPC" "gRPC"
"high performance general RPC framework" "high performance general RPC framework"
"${gRPC_CORE_VERSION}" "${gRPC_CORE_VERSION}"
"gpr absl_any_invocable absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant" "gpr absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant"
"openssl re2 libcares zlib" "openssl re2 libcares zlib"
"-lgrpc" "-lgrpc"
"-laddress_sorting -lupb" "-laddress_sorting -lupb"
@ -30880,7 +30882,7 @@ generate_pkgconfig(
"gRPC unsecure" "gRPC unsecure"
"high performance general RPC framework without SSL" "high performance general RPC framework without SSL"
"${gRPC_CORE_VERSION}" "${gRPC_CORE_VERSION}"
"gpr absl_any_invocable absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant" "gpr absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant"
"re2 libcares zlib" "re2 libcares zlib"
"-lgrpc_unsecure" "-lgrpc_unsecure"
"-laddress_sorting -lupb" "-laddress_sorting -lupb"
@ -30891,7 +30893,7 @@ generate_pkgconfig(
"gRPC++" "gRPC++"
"C++ wrapper for gRPC" "C++ wrapper for gRPC"
"${gRPC_CPP_VERSION}" "${gRPC_CPP_VERSION}"
"grpc absl_any_invocable absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant" "grpc absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant"
"" ""
"-lgrpc++" "-lgrpc++"
"" ""
@ -30902,7 +30904,7 @@ generate_pkgconfig(
"gRPC++ unsecure" "gRPC++ unsecure"
"C++ wrapper for gRPC without SSL" "C++ wrapper for gRPC without SSL"
"${gRPC_CPP_VERSION}" "${gRPC_CPP_VERSION}"
"grpc_unsecure absl_any_invocable absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant" "grpc_unsecure absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant"
"" ""
"-lgrpc++_unsecure" "-lgrpc++_unsecure"
"" ""

@ -1781,6 +1781,7 @@ libs:
- src/core/tsi/transport_security.cc - src/core/tsi/transport_security.cc
- src/core/tsi/transport_security_grpc.cc - src/core/tsi/transport_security_grpc.cc
deps: deps:
- absl/algorithm:container
- absl/cleanup:cleanup - absl/cleanup:cleanup
- absl/container:flat_hash_map - absl/container:flat_hash_map
- absl/container:flat_hash_set - absl/container:flat_hash_set
@ -2715,6 +2716,7 @@ libs:
- src/core/tsi/transport_security.cc - src/core/tsi/transport_security.cc
- src/core/tsi/transport_security_grpc.cc - src/core/tsi/transport_security_grpc.cc
deps: deps:
- absl/algorithm:container
- absl/cleanup:cleanup - absl/cleanup:cleanup
- absl/container:flat_hash_map - absl/container:flat_hash_map
- absl/container:flat_hash_set - absl/container:flat_hash_set
@ -11217,6 +11219,7 @@ targets:
headers: headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h - test/core/client_channel/lb_policy/lb_policy_test_lib.h
- test/core/event_engine/mock_event_engine.h - test/core/event_engine/mock_event_engine.h
- test/core/util/scoped_env_var.h
src: src:
- test/core/client_channel/lb_policy/pick_first_test.cc - test/core/client_channel/lb_policy/pick_first_test.cc
deps: deps:

1
gRPC-C++.podspec generated

@ -216,6 +216,7 @@ Pod::Spec.new do |s|
ss.dependency "#{s.name}/Interface", version ss.dependency "#{s.name}/Interface", version
ss.dependency 'gRPC-Core', version ss.dependency 'gRPC-Core', version
abseil_version = '1.20230125.3' abseil_version = '1.20230125.3'
ss.dependency 'abseil/algorithm/container', abseil_version
ss.dependency 'abseil/base/base', abseil_version ss.dependency 'abseil/base/base', abseil_version
ss.dependency 'abseil/base/core_headers', abseil_version ss.dependency 'abseil/base/core_headers', abseil_version
ss.dependency 'abseil/cleanup/cleanup', abseil_version ss.dependency 'abseil/cleanup/cleanup', abseil_version

1
gRPC-Core.podspec generated

@ -183,6 +183,7 @@ Pod::Spec.new do |s|
ss.libraries = 'z' ss.libraries = 'z'
ss.dependency "#{s.name}/Interface", version ss.dependency "#{s.name}/Interface", version
ss.dependency 'BoringSSL-GRPC', '0.0.29' ss.dependency 'BoringSSL-GRPC', '0.0.29'
ss.dependency 'abseil/algorithm/container', abseil_version
ss.dependency 'abseil/base/base', abseil_version ss.dependency 'abseil/base/base', abseil_version
ss.dependency 'abseil/base/core_headers', abseil_version ss.dependency 'abseil/base/core_headers', abseil_version
ss.dependency 'abseil/cleanup/cleanup', abseil_version ss.dependency 'abseil/cleanup/cleanup', abseil_version

2
grpc.gyp generated

@ -249,6 +249,7 @@
'target_name': 'grpc', 'target_name': 'grpc',
'type': 'static_library', 'type': 'static_library',
'dependencies': [ 'dependencies': [
'absl/algorithm:container',
'absl/cleanup:cleanup', 'absl/cleanup:cleanup',
'absl/container:flat_hash_map', 'absl/container:flat_hash_map',
'absl/container:flat_hash_set', 'absl/container:flat_hash_set',
@ -1079,6 +1080,7 @@
'target_name': 'grpc_unsecure', 'target_name': 'grpc_unsecure',
'type': 'static_library', 'type': 'static_library',
'dependencies': [ 'dependencies': [
'absl/algorithm:container',
'absl/cleanup:cleanup', 'absl/cleanup:cleanup',
'absl/container:flat_hash_map', 'absl/container:flat_hash_map',
'absl/container:flat_hash_set', 'absl/container:flat_hash_set',

@ -4624,6 +4624,8 @@ grpc_cc_library(
"ext/filters/client_channel/lb_policy/pick_first/pick_first.cc", "ext/filters/client_channel/lb_policy/pick_first/pick_first.cc",
], ],
external_deps = [ external_deps = [
"absl/algorithm:container",
"absl/random",
"absl/status", "absl/status",
"absl/status:statusor", "absl/status:statusor",
"absl/strings", "absl/strings",
@ -4632,12 +4634,16 @@ grpc_cc_library(
language = "c++", language = "c++",
deps = [ deps = [
"channel_args", "channel_args",
"env",
"grpc_lb_subchannel_list", "grpc_lb_subchannel_list",
"grpc_outlier_detection_header", "grpc_outlier_detection_header",
"json", "json",
"json_args",
"json_object_loader",
"lb_policy", "lb_policy",
"lb_policy_factory", "lb_policy_factory",
"subchannel_interface", "subchannel_interface",
"validation_errors",
"//:config", "//:config",
"//:debug_location", "//:debug_location",
"//:gpr", "//:gpr",

@ -25,6 +25,8 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "absl/algorithm/container.h"
#include "absl/random/random.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
#include "absl/strings/str_cat.h" #include "absl/strings/str_cat.h"
@ -40,11 +42,16 @@
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/json/json.h" #include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/load_balancing/lb_policy.h" #include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h" #include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/subchannel_interface.h" #include "src/core/lib/load_balancing/subchannel_interface.h"
@ -63,6 +70,40 @@ namespace {
constexpr absl::string_view kPickFirst = "pick_first"; constexpr absl::string_view kPickFirst = "pick_first";
// TODO(eostroukhov): Remove once this feature is no longer experimental.
bool ShufflePickFirstEnabled() {
auto value = GetEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG");
if (!value.has_value()) return false;
bool parsed_value;
bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
return parse_succeeded && parsed_value;
}
class PickFirstConfig : public LoadBalancingPolicy::Config {
public:
absl::string_view name() const override { return kPickFirst; }
bool shuffle_addresses() const { return shuffle_addresses_; }
static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
static const auto kJsonLoader =
JsonObjectLoader<PickFirstConfig>()
.OptionalField("shuffleAddressList",
&PickFirstConfig::shuffle_addresses_)
.Finish();
return kJsonLoader;
}
void JsonPostLoad(const Json& /* json */, const JsonArgs& /* args */,
ValidationErrors* /* errors */) {
if (!ShufflePickFirstEnabled()) {
shuffle_addresses_ = false;
}
}
private:
bool shuffle_addresses_ = false;
};
class PickFirst : public LoadBalancingPolicy { class PickFirst : public LoadBalancingPolicy {
public: public:
explicit PickFirst(Args args); explicit PickFirst(Args args);
@ -169,6 +210,8 @@ class PickFirst : public LoadBalancingPolicy {
bool idle_ = false; bool idle_ = false;
// Are we shut down? // Are we shut down?
bool shutdown_ = false; bool shutdown_ = false;
// Random bit generator used for shuffling addresses if configured
absl::BitGen bit_gen_;
}; };
PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) { PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) {
@ -274,6 +317,11 @@ absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
status = args.addresses.status(); status = args.addresses.status();
} else if (args.addresses->empty()) { } else if (args.addresses->empty()) {
status = absl::UnavailableError("address list must not be empty"); status = absl::UnavailableError("address list must not be empty");
} else {
auto config = static_cast<PickFirstConfig*>(args.config.get());
if (config->shuffle_addresses()) {
absl::c_shuffle(*args.addresses, bit_gen_);
}
} }
// TODO(roth): This is a hack to disable outlier_detection when used // TODO(roth): This is a hack to disable outlier_detection when used
// with pick_first, for the reasons described in // with pick_first, for the reasons described in
@ -513,11 +561,6 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
} }
} }
class PickFirstConfig : public LoadBalancingPolicy::Config {
public:
absl::string_view name() const override { return kPickFirst; }
};
// //
// factory // factory
// //
@ -532,8 +575,9 @@ class PickFirstFactory : public LoadBalancingPolicyFactory {
absl::string_view name() const override { return kPickFirst; } absl::string_view name() const override { return kPickFirst; }
absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>> absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json& /*json*/) const override { ParseLoadBalancingConfig(const Json& json) const override {
return MakeRefCounted<PickFirstConfig>(); return LoadFromJson<RefCountedPtr<PickFirstConfig>>(
json, JsonArgs(), "errors validating pick_first LB policy config");
} }
}; };

@ -50,6 +50,7 @@ grpc_cc_test(
"//src/core:channel_args", "//src/core:channel_args",
"//src/core:grpc_lb_policy_pick_first", "//src/core:grpc_lb_policy_pick_first",
"//test/core/util:grpc_test_util", "//test/core/util:grpc_test_util",
"//test/core/util:scoped_env_var",
], ],
) )

@ -598,7 +598,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// Constructs an update containing a list of addresses. // Constructs an update containing a list of addresses.
LoadBalancingPolicy::UpdateArgs BuildUpdate( LoadBalancingPolicy::UpdateArgs BuildUpdate(
absl::Span<const absl::string_view> addresses, absl::Span<const absl::string_view> addresses,
RefCountedPtr<LoadBalancingPolicy::Config> config = nullptr) { RefCountedPtr<LoadBalancingPolicy::Config> config) {
LoadBalancingPolicy::UpdateArgs update; LoadBalancingPolicy::UpdateArgs update;
update.addresses.emplace(); update.addresses.emplace();
for (const absl::string_view& address : addresses) { for (const absl::string_view& address : addresses) {

@ -16,19 +16,32 @@
#include <stddef.h> #include <stddef.h>
#include <algorithm>
#include <array> #include <array>
#include <map>
#include <memory>
#include <utility>
#include <vector>
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/span.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/support/json.h>
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h" #include "src/core/lib/load_balancing/lb_policy.h"
#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h" #include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
#include "test/core/util/scoped_env_var.h"
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
namespace grpc_core { namespace grpc_core {
@ -39,6 +52,80 @@ class PickFirstTest : public LoadBalancingPolicyTest {
protected: protected:
PickFirstTest() : lb_policy_(MakeLbPolicy("pick_first")) {} PickFirstTest() : lb_policy_(MakeLbPolicy("pick_first")) {}
static RefCountedPtr<LoadBalancingPolicy::Config> MakePickFirstConfig(
absl::optional<bool> shuffle_address_list = absl::nullopt) {
return MakeConfig(Json::FromArray({Json::FromObject(
{{"pick_first",
shuffle_address_list.has_value()
? Json::FromObject({{"shuffleAddressList",
Json::FromBool(*shuffle_address_list)}})
: Json::FromObject({})}})}));
}
// Gets order the addresses are being picked. Return type is void so
// assertions can be used
void GetOrderAddressesArePicked(
absl::Span<const absl::string_view> addresses,
std::vector<absl::string_view>* out_address_order) {
work_serializer_->Run([&]() { lb_policy_->ExitIdleLocked(); },
DEBUG_LOCATION);
out_address_order->clear();
// Construct a map of subchannel to address.
// We will remove entries as each subchannel starts to connect.
std::map<SubchannelState*, absl::string_view> subchannels;
for (auto address : addresses) {
auto* subchannel = FindSubchannel(
address, ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
ASSERT_NE(subchannel, nullptr);
subchannels.emplace(subchannel, address);
}
// Now process each subchannel in the order in which pick_first tries it.
while (!subchannels.empty()) {
// Find the subchannel that is being attempted.
SubchannelState* subchannel = nullptr;
for (const auto& p : subchannels) {
if (p.first->ConnectionRequested()) {
out_address_order->push_back(p.second);
subchannel = p.first;
break;
}
}
ASSERT_NE(subchannel, nullptr);
// The subchannel reports CONNECTING.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// If this is the first subchannel being attempted, expect a CONNECTING
// update.
if (subchannels.size() == addresses.size()) {
ExpectConnectingUpdate();
}
if (subchannels.size() > 1) {
// Not the last subchannel in the list. Connection attempt should fail.
subchannel->SetConnectivityState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("failed to connect"));
subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
} else {
// Last subchannel in the list. Connection attempt should succeed.
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
auto picker = WaitForConnected();
ASSERT_NE(picker, nullptr);
EXPECT_EQ(ExpectPickComplete(picker.get()), out_address_order->back());
// Then it should become disconnected.
subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
ExpectReresolutionRequest();
// We would normally call ExpectStateAndQueueingPicker() here instead of
// just ExpectState(). However, calling the picker would also trigger
// exiting IDLE, which we don't want here, because if the test is going
// to send an address list update and call GetOrderAddressesArePicked()
// again, we don't want to trigger a connection attempt on any
// subchannel until after that next address list update is processed.
ExpectState(GRPC_CHANNEL_IDLE);
}
// Remove the subchannel from the map.
subchannels.erase(subchannel);
}
}
OrphanablePtr<LoadBalancingPolicy> lb_policy_; OrphanablePtr<LoadBalancingPolicy> lb_policy_;
}; };
@ -46,7 +133,8 @@ TEST_F(PickFirstTest, FirstAddressWorks) {
// Send an update containing two addresses. // Send an update containing two addresses.
constexpr std::array<absl::string_view, 2> kAddresses = { constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"}; "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
absl::Status status = ApplyUpdate(BuildUpdate(kAddresses), lb_policy_.get()); absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status; EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses with // LB policy should have created a subchannel for both addresses with
// the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg. // the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
@ -81,7 +169,8 @@ TEST_F(PickFirstTest, FirstAddressFails) {
// Send an update containing two addresses. // Send an update containing two addresses.
constexpr std::array<absl::string_view, 2> kAddresses = { constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"}; "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
absl::Status status = ApplyUpdate(BuildUpdate(kAddresses), lb_policy_.get()); absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status; EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses with // LB policy should have created a subchannel for both addresses with
// the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg. // the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
@ -94,7 +183,8 @@ TEST_F(PickFirstTest, FirstAddressFails) {
// When the LB policy receives the first subchannel's initial connectivity // When the LB policy receives the first subchannel's initial connectivity
// state notification (IDLE), it will request a connection. // state notification (IDLE), it will request a connection.
EXPECT_TRUE(subchannel->ConnectionRequested()); EXPECT_TRUE(subchannel->ConnectionRequested());
// This causes the subchannel to start to connect, so it reports CONNECTING. // This causes the subchannel to start to connect, so it reports
// CONNECTING.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state. // LB policy should have reported CONNECTING state.
ExpectConnectingUpdate(); ExpectConnectingUpdate();
@ -105,7 +195,8 @@ TEST_F(PickFirstTest, FirstAddressFails) {
absl::UnavailableError("failed to connect")); absl::UnavailableError("failed to connect"));
// The LB policy will start a connection attempt on the second subchannel. // The LB policy will start a connection attempt on the second subchannel.
EXPECT_TRUE(subchannel2->ConnectionRequested()); EXPECT_TRUE(subchannel2->ConnectionRequested());
// This causes the subchannel to start to connect, so it reports CONNECTING. // This causes the subchannel to start to connect, so it reports
// CONNECTING.
subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING); subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// The connection attempt succeeds. // The connection attempt succeeds.
subchannel2->SetConnectivityState(GRPC_CHANNEL_READY); subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
@ -123,7 +214,8 @@ TEST_F(PickFirstTest, GoesIdleWhenConnectionFailsThenCanReconnect) {
// Send an update containing two addresses. // Send an update containing two addresses.
constexpr std::array<absl::string_view, 2> kAddresses = { constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"}; "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
absl::Status status = ApplyUpdate(BuildUpdate(kAddresses), lb_policy_.get()); absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status; EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses with // LB policy should have created a subchannel for both addresses with
// the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg. // the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
@ -177,6 +269,68 @@ TEST_F(PickFirstTest, GoesIdleWhenConnectionFailsThenCanReconnect) {
} }
} }
TEST_F(PickFirstTest, WithShuffle) {
testing::ScopedExperimentalEnvVar env_var(
"GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG");
constexpr std::array<absl::string_view, 6> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445",
"ipv4:127.0.0.1:446", "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
// 6 addresses have 6! = 720 permutations or roughly 0.14% chance that
// the shuffle returns same permutation. We allow for several tries to
// prevent flake test.
constexpr size_t kMaxTries = 10;
std::vector<absl::string_view> addresses_after_update;
bool shuffled = false;
for (size_t i = 0; i < kMaxTries; ++i) {
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(true)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
GetOrderAddressesArePicked(kAddresses, &addresses_after_update);
if (absl::MakeConstSpan(addresses_after_update) !=
absl::MakeConstSpan(kAddresses)) {
shuffled = true;
break;
}
}
ASSERT_TRUE(shuffled);
// Address order should be stable between updates
std::vector<absl::string_view> addresses_on_another_try;
GetOrderAddressesArePicked(kAddresses, &addresses_on_another_try);
EXPECT_EQ(addresses_on_another_try, addresses_after_update);
}
TEST_F(PickFirstTest, ShufflingDisabled) {
testing::ScopedExperimentalEnvVar env_var(
"GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG");
constexpr std::array<absl::string_view, 6> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445",
"ipv4:127.0.0.1:446", "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
constexpr static size_t kMaxAttempts = 5;
for (size_t attempt = 0; attempt < kMaxAttempts; ++attempt) {
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
std::vector<absl::string_view> address_order;
GetOrderAddressesArePicked(kAddresses, &address_order);
EXPECT_THAT(address_order, ::testing::ElementsAreArray(kAddresses));
}
}
// TODO(eugeneo): remove when the env var no longer necessary
TEST_F(PickFirstTest, ShufflingDisabledViaEnvVar) {
constexpr std::array<absl::string_view, 6> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445",
"ipv4:127.0.0.1:446", "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
constexpr static size_t kMaxAttempts = 5;
for (size_t attempt = 0; attempt < kMaxAttempts; ++attempt) {
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(true)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
std::vector<absl::string_view> address_order;
GetOrderAddressesArePicked(kAddresses, &address_order);
EXPECT_THAT(address_order, ::testing::ElementsAreArray(kAddresses));
}
}
} // namespace } // namespace
} // namespace testing } // namespace testing
} // namespace grpc_core } // namespace grpc_core

@ -40,7 +40,7 @@ class RoundRobinTest : public LoadBalancingPolicyTest {
RoundRobinTest() : lb_policy_(MakeLbPolicy("round_robin")) {} RoundRobinTest() : lb_policy_(MakeLbPolicy("round_robin")) {}
void ExpectStartup(absl::Span<const absl::string_view> addresses) { void ExpectStartup(absl::Span<const absl::string_view> addresses) {
EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses), lb_policy_.get()), EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, nullptr), lb_policy_.get()),
absl::OkStatus()); absl::OkStatus());
// Expect the initial CONNECTNG update with a picker that queues. // Expect the initial CONNECTNG update with a picker that queues.
ExpectConnectingUpdate(); ExpectConnectingUpdate();
@ -90,12 +90,14 @@ TEST_F(RoundRobinTest, AddressUpdates) {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
ExpectStartup(kAddresses); ExpectStartup(kAddresses);
// Send update to remove address 2. // Send update to remove address 2.
EXPECT_EQ(ApplyUpdate(BuildUpdate(absl::MakeSpan(kAddresses).first(2)), EXPECT_EQ(
ApplyUpdate(BuildUpdate(absl::MakeSpan(kAddresses).first(2), nullptr),
lb_policy_.get()), lb_policy_.get()),
absl::OkStatus()); absl::OkStatus());
WaitForRoundRobinListChange(kAddresses, absl::MakeSpan(kAddresses).first(2)); WaitForRoundRobinListChange(kAddresses, absl::MakeSpan(kAddresses).first(2));
// Send update to remove address 0 and re-add address 2. // Send update to remove address 0 and re-add address 2.
EXPECT_EQ(ApplyUpdate(BuildUpdate(absl::MakeSpan(kAddresses).last(2)), EXPECT_EQ(
ApplyUpdate(BuildUpdate(absl::MakeSpan(kAddresses).last(2), nullptr),
lb_policy_.get()), lb_policy_.get()),
absl::OkStatus()); absl::OkStatus());
WaitForRoundRobinListChange(absl::MakeSpan(kAddresses).first(2), WaitForRoundRobinListChange(absl::MakeSpan(kAddresses).first(2),

@ -119,7 +119,8 @@ TEST_F(XdsOverrideHostTest, DelegatesToChild) {
TEST_F(XdsOverrideHostTest, NoConfigReportsError) { TEST_F(XdsOverrideHostTest, NoConfigReportsError) {
EXPECT_EQ( EXPECT_EQ(
ApplyUpdate(BuildUpdate({"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"}), ApplyUpdate(
BuildUpdate({"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"}, nullptr),
policy_.get()), policy_.get()),
absl::InvalidArgumentError("Missing policy config")); absl::InvalidArgumentError("Missing policy config"));
} }

@ -27,6 +27,7 @@
#include "absl/types/variant.h" #include "absl/types/variant.h"
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/support/json.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.h" #include "src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.h"
@ -78,6 +79,13 @@ class ForwardingLoadBalancingPolicy : public LoadBalancingPolicy {
~ForwardingLoadBalancingPolicy() override = default; ~ForwardingLoadBalancingPolicy() override = default;
absl::Status UpdateLocked(UpdateArgs args) override { absl::Status UpdateLocked(UpdateArgs args) override {
// Use correct config for the delegate load balancing policy
auto config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
Json::FromArray({Json::FromObject(
{{std::string(delegate_->name()), Json::FromObject({})}})}));
GPR_ASSERT(config.ok());
args.config = *config;
return delegate_->UpdateLocked(std::move(args)); return delegate_->UpdateLocked(std::move(args));
} }

Loading…
Cancel
Save