From 1482bf9d182524a015ba44dc860e19ae799c618f Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 21 Dec 2022 07:35:49 -0800 Subject: [PATCH] round_robin: avoid spurious updates when switching subchannel lists (#31939) * round_robin: avoid spurious updates when switching subchannel lists * clang-format * clang-tidy --- CMakeLists.txt | 38 ++++++ build_autogenerated.yaml | 11 ++ .../lb_policy/pick_first/pick_first.cc | 7 -- .../lb_policy/ring_hash/ring_hash.cc | 7 -- .../lb_policy/round_robin/round_robin.cc | 6 +- .../lb_policy/subchannel_list.h | 13 ++ test/core/client_channel/lb_policy/BUILD | 15 +++ .../lb_policy/lb_policy_test_lib.h | 21 ++-- .../lb_policy/round_robin_test.cc | 118 ++++++++++++++++++ tools/run_tests/generated/tests.json | 24 ++++ 10 files changed, 233 insertions(+), 27 deletions(-) create mode 100644 test/core/client_channel/lb_policy/round_robin_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index ee75debc48c..a60ff6848b7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1123,6 +1123,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx retry_throttle_test) add_dependencies(buildtests_cxx rls_end2end_test) add_dependencies(buildtests_cxx rls_lb_config_parser_test) + add_dependencies(buildtests_cxx round_robin_test) add_dependencies(buildtests_cxx secure_auth_context_test) add_dependencies(buildtests_cxx secure_channel_create_test) add_dependencies(buildtests_cxx secure_endpoint_test) @@ -17328,6 +17329,43 @@ target_link_libraries(rls_lb_config_parser_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(round_robin_test + test/core/client_channel/lb_policy/round_robin_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(round_robin_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(round_robin_test + ${_gRPC_BASELIB_LIBRARIES} + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ZLIB_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 102c3a56ac5..20e9490c9b2 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -10256,6 +10256,17 @@ targets: deps: - grpc_test_util uses_polling: false +- name: round_robin_test + gtest: true + build: test + language: c++ + headers: + - test/core/client_channel/lb_policy/lb_policy_test_lib.h + src: + - test/core/client_channel/lb_policy/round_robin_test.cc + deps: + - grpc_test_util + uses_polling: false - name: secure_auth_context_test gtest: true build: test diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index c665ede08ae..0468a3cc338 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -129,13 +129,6 @@ class PickFirst : public LoadBalancingPolicy { size_t attempting_index() const { return attempting_index_; } void set_attempting_index(size_t index) { attempting_index_ = index; } - bool AllSubchannelsSeenInitialState() { - for (size_t i = 0; i < num_subchannels(); ++i) { - if (!subchannel(i)->connectivity_state().has_value()) return false; - } - return true; - } - private: bool in_transient_failure_ = false; size_t attempting_index_ = 0; diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc index 9fbadfd7a25..56c773a060e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc +++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc @@ -231,13 +231,6 @@ class RingHash : public LoadBalancingPolicy { absl::Status status); private: - bool AllSubchannelsSeenInitialState() { - for (size_t i = 0; i < num_subchannels(); ++i) { - if (!subchannel(i)->connectivity_state().has_value()) return false; - } - return true; - } - size_t num_idle_; size_t num_ready_ = 0; size_t num_connecting_ = 0; diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index e39a56f99ce..634729b80db 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -355,12 +355,14 @@ void RoundRobin::RoundRobinSubchannelList:: // If this is latest_pending_subchannel_list_, then swap it into // subchannel_list_ in the following cases: // - subchannel_list_ has no READY subchannels. - // - This list has at least one READY subchannel. + // - This list has at least one READY subchannel and we have seen the + // initial connectivity state notification for all subchannels. // - All of the subchannels in this list are in TRANSIENT_FAILURE. // (This may cause the channel to go from READY to TRANSIENT_FAILURE, // but we're doing what the control plane told us to do.) if (p->latest_pending_subchannel_list_.get() == this && - (p->subchannel_list_->num_ready_ == 0 || num_ready_ > 0 || + (p->subchannel_list_->num_ready_ == 0 || + (num_ready_ > 0 && AllSubchannelsSeenInitialState()) || num_transient_failure_ == num_subchannels())) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { const std::string old_counters_string = diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index fce6854d9dd..1d26a0b63ff 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -200,6 +200,10 @@ class SubchannelList : public DualRefCounted { // Resets connection backoff of all subchannels. void ResetBackoffLocked(); + // Returns true if all subchannels have seen their initial + // connectivity state notifications. + bool AllSubchannelsSeenInitialState(); + void Orphan() override; protected: @@ -435,6 +439,15 @@ void SubchannelList +bool SubchannelList::AllSubchannelsSeenInitialState() { + for (size_t i = 0; i < num_subchannels(); ++i) { + if (!subchannel(i)->connectivity_state().has_value()) return false; + } + return true; +} + } // namespace grpc_core #endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H diff --git a/test/core/client_channel/lb_policy/BUILD b/test/core/client_channel/lb_policy/BUILD index a2c69886152..a6dbc918529 100644 --- a/test/core/client_channel/lb_policy/BUILD +++ b/test/core/client_channel/lb_policy/BUILD @@ -52,6 +52,21 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "round_robin_test", + srcs = ["round_robin_test.cc"], + external_deps = ["gtest"], + language = "C++", + uses_event_engine = False, + uses_polling = False, + deps = [ + ":lb_policy_test_lib", + "//src/core:channel_args", + "//src/core:grpc_lb_policy_round_robin", + "//test/core/util:grpc_test_util", + ], +) + grpc_cc_test( name = "outlier_detection_lb_config_parser_test", srcs = ["outlier_detection_lb_config_parser_test.cc"], diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h index 25677489f06..3173013fa94 100644 --- a/test/core/client_channel/lb_policy/lb_policy_test_lib.h +++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h @@ -554,6 +554,7 @@ class LoadBalancingPolicyTest : public ::testing::Test { absl::Span old_addresses, absl::Span new_addresses, size_t num_iterations = 3, SourceLocation location = SourceLocation()) { + gpr_log(GPR_INFO, "Waiting for expected RR addresses..."); bool retval = false; WaitForStateUpdate( [&](FakeHelper::StateUpdate update) { @@ -567,15 +568,14 @@ class LoadBalancingPolicyTest : public ::testing::Test { EXPECT_TRUE(picks.has_value()) << location.file() << ":" << location.line(); if (!picks.has_value()) return false; - std::vector pick_addresses(picks->begin(), - picks->end()); + gpr_log(GPR_INFO, "PICKS: %s", absl::StrJoin(*picks, " ").c_str()); // If the picks still match the old list, then keep going. - if (PicksAreRoundRobin(old_addresses, pick_addresses)) return true; + if (PicksAreRoundRobin(old_addresses, *picks)) return true; // Otherwise, the picks should match the new list. - retval = PicksAreRoundRobin(new_addresses, pick_addresses); + retval = PicksAreRoundRobin(new_addresses, *picks); EXPECT_TRUE(retval) << "Expected: " << absl::StrJoin(new_addresses, ", ") - << "\nActual: " << absl::StrJoin(pick_addresses, ", ") << "\nat " + << "\nActual: " << absl::StrJoin(*picks, ", ") << "\nat " << location.file() << ":" << location.line(); return false; // Stop. }, @@ -654,9 +654,9 @@ class LoadBalancingPolicyTest : public ::testing::Test { // addresses may start anywhere in the list of expected addresses but // must then continue in round-robin fashion, with wrap-around. bool PicksAreRoundRobin(absl::Span expected, - absl::Span actual) { + absl::Span actual) { absl::optional expected_index; - for (auto address : actual) { + for (const auto& address : actual) { auto it = std::find(expected.begin(), expected.end(), address); if (it == expected.end()) return false; size_t index = it - expected.begin(); @@ -675,11 +675,10 @@ class LoadBalancingPolicyTest : public ::testing::Test { auto picks = GetCompletePicks(picker, num_iterations * addresses.size(), location); ASSERT_TRUE(picks.has_value()) << location.file() << ":" << location.line(); - std::vector pick_addresses(picks->begin(), picks->end()); - EXPECT_TRUE(PicksAreRoundRobin(addresses, pick_addresses)) + EXPECT_TRUE(PicksAreRoundRobin(addresses, *picks)) << "Expected: " << absl::StrJoin(addresses, ", ") - << "Actual: " << absl::StrJoin(pick_addresses, ", ") << location.file() - << ":" << location.line(); + << "Actual: " << absl::StrJoin(*picks, ", ") << location.file() << ":" + << location.line(); } // Requests a picker on picker and expects a Fail result. diff --git a/test/core/client_channel/lb_policy/round_robin_test.cc b/test/core/client_channel/lb_policy/round_robin_test.cc new file mode 100644 index 00000000000..ca9ded1a80a --- /dev/null +++ b/test/core/client_channel/lb_policy/round_robin_test.cc @@ -0,0 +1,118 @@ +// +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include "absl/status/status.h" +#include "absl/strings/string_view.h" +#include "gtest/gtest.h" + +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/load_balancing/lb_policy.h" +#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h" +#include "test/core/util/test_config.h" + +namespace grpc_core { +namespace testing { +namespace { + +class RoundRobinTest : public LoadBalancingPolicyTest { + protected: + RoundRobinTest() : lb_policy_(MakeLbPolicy("round_robin")) {} + + void ExpectStartup(absl::Span addresses) { + EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses), lb_policy_.get()), + absl::OkStatus()); + // Expect the initial CONNECTNG update with a picker that queues. + ExpectConnectingUpdate(); + // RR should have created a subchannel for each address. + for (size_t i = 0; i < addresses.size(); ++i) { + auto* subchannel = FindSubchannel(addresses[i]); + ASSERT_NE(subchannel, nullptr) << "Address: " << addresses[i]; + // RR should ask each subchannel to connect. + EXPECT_TRUE(subchannel->ConnectionRequested()); + // The subchannel will connect successfully. + subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + subchannel->SetConnectivityState(GRPC_CHANNEL_READY); + // As each subchannel becomes READY, we should get a new picker that + // includes the behavior. Note that there may be any number of + // duplicate updates for the previous state in the queue before the + // update that we actually want to see. + if (i == 0) { + // When the first subchannel becomes READY, accept any number of + // CONNECTING updates with a picker that queues followed by a READY + // update with a picker that repeatedly returns only the first address. + auto picker = WaitForConnected(); + ExpectRoundRobinPicks(picker.get(), {addresses[0]}); + } else { + // When each subsequent subchannel becomes READY, we accept any number + // of READY updates where the picker returns only the previously + // connected subchannel(s) followed by a READY update where the picker + // returns the previously connected subchannel(s) *and* the newly + // connected subchannel. + WaitForRoundRobinListChange( + absl::MakeSpan(addresses).subspan(0, i), + absl::MakeSpan(addresses).subspan(0, i + 1)); + } + } + } + + OrphanablePtr lb_policy_; +}; + +TEST_F(RoundRobinTest, Basic) { + const std::array kAddresses = { + "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; + ExpectStartup(kAddresses); +} + +TEST_F(RoundRobinTest, AddressUpdates) { + const std::array kAddresses = { + "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; + ExpectStartup(kAddresses); + // Send update to remove address 2. + EXPECT_EQ(ApplyUpdate(BuildUpdate(absl::MakeSpan(kAddresses).first(2)), + lb_policy_.get()), + absl::OkStatus()); + WaitForRoundRobinListChange(kAddresses, absl::MakeSpan(kAddresses).first(2)); + // Send update to remove address 0 and re-add address 2. + EXPECT_EQ(ApplyUpdate(BuildUpdate(absl::MakeSpan(kAddresses).last(2)), + lb_policy_.get()), + absl::OkStatus()); + WaitForRoundRobinListChange(absl::MakeSpan(kAddresses).first(2), + absl::MakeSpan(kAddresses).last(2)); +} + +// TODO(roth): Add test cases: +// - empty address list +// - subchannels failing connection attempts + +} // namespace +} // namespace testing +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(&argc, argv); + grpc_init(); + int ret = RUN_ALL_TESTS(); + grpc_shutdown(); + return ret; +} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index f4d39b6401f..2c4c1dd1bb4 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -5923,6 +5923,30 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "round_robin_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,