round_robin: avoid spurious updates when switching subchannel lists (#31939)

* round_robin: avoid spurious updates when switching subchannel lists

* clang-format

* clang-tidy
pull/31946/head
Mark D. Roth 2 years ago committed by GitHub
parent 896bfe373f
commit 1482bf9d18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 38
      CMakeLists.txt
  2. 11
      build_autogenerated.yaml
  3. 7
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  4. 7
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  5. 6
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  6. 13
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  7. 15
      test/core/client_channel/lb_policy/BUILD
  8. 21
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  9. 118
      test/core/client_channel/lb_policy/round_robin_test.cc
  10. 24
      tools/run_tests/generated/tests.json

38
CMakeLists.txt generated

@ -1123,6 +1123,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx retry_throttle_test)
add_dependencies(buildtests_cxx rls_end2end_test)
add_dependencies(buildtests_cxx rls_lb_config_parser_test)
add_dependencies(buildtests_cxx round_robin_test)
add_dependencies(buildtests_cxx secure_auth_context_test)
add_dependencies(buildtests_cxx secure_channel_create_test)
add_dependencies(buildtests_cxx secure_endpoint_test)
@ -17328,6 +17329,43 @@ target_link_libraries(rls_lb_config_parser_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(round_robin_test
test/core/client_channel/lb_policy/round_robin_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(round_robin_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(round_robin_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)

@ -10256,6 +10256,17 @@ targets:
deps:
- grpc_test_util
uses_polling: false
- name: round_robin_test
gtest: true
build: test
language: c++
headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
src:
- test/core/client_channel/lb_policy/round_robin_test.cc
deps:
- grpc_test_util
uses_polling: false
- name: secure_auth_context_test
gtest: true
build: test

@ -129,13 +129,6 @@ class PickFirst : public LoadBalancingPolicy {
size_t attempting_index() const { return attempting_index_; }
void set_attempting_index(size_t index) { attempting_index_ = index; }
bool AllSubchannelsSeenInitialState() {
for (size_t i = 0; i < num_subchannels(); ++i) {
if (!subchannel(i)->connectivity_state().has_value()) return false;
}
return true;
}
private:
bool in_transient_failure_ = false;
size_t attempting_index_ = 0;

@ -231,13 +231,6 @@ class RingHash : public LoadBalancingPolicy {
absl::Status status);
private:
bool AllSubchannelsSeenInitialState() {
for (size_t i = 0; i < num_subchannels(); ++i) {
if (!subchannel(i)->connectivity_state().has_value()) return false;
}
return true;
}
size_t num_idle_;
size_t num_ready_ = 0;
size_t num_connecting_ = 0;

@ -355,12 +355,14 @@ void RoundRobin::RoundRobinSubchannelList::
// If this is latest_pending_subchannel_list_, then swap it into
// subchannel_list_ in the following cases:
// - subchannel_list_ has no READY subchannels.
// - This list has at least one READY subchannel.
// - This list has at least one READY subchannel and we have seen the
// initial connectivity state notification for all subchannels.
// - All of the subchannels in this list are in TRANSIENT_FAILURE.
// (This may cause the channel to go from READY to TRANSIENT_FAILURE,
// but we're doing what the control plane told us to do.)
if (p->latest_pending_subchannel_list_.get() == this &&
(p->subchannel_list_->num_ready_ == 0 || num_ready_ > 0 ||
(p->subchannel_list_->num_ready_ == 0 ||
(num_ready_ > 0 && AllSubchannelsSeenInitialState()) ||
num_transient_failure_ == num_subchannels())) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
const std::string old_counters_string =

@ -200,6 +200,10 @@ class SubchannelList : public DualRefCounted<SubchannelListType> {
// Resets connection backoff of all subchannels.
void ResetBackoffLocked();
// Returns true if all subchannels have seen their initial
// connectivity state notifications.
bool AllSubchannelsSeenInitialState();
void Orphan() override;
protected:
@ -435,6 +439,15 @@ void SubchannelList<SubchannelListType,
}
}
template <typename SubchannelListType, typename SubchannelDataType>
bool SubchannelList<SubchannelListType,
SubchannelDataType>::AllSubchannelsSeenInitialState() {
for (size_t i = 0; i < num_subchannels(); ++i) {
if (!subchannel(i)->connectivity_state().has_value()) return false;
}
return true;
}
} // namespace grpc_core
#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H

@ -52,6 +52,21 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "round_robin_test",
srcs = ["round_robin_test.cc"],
external_deps = ["gtest"],
language = "C++",
uses_event_engine = False,
uses_polling = False,
deps = [
":lb_policy_test_lib",
"//src/core:channel_args",
"//src/core:grpc_lb_policy_round_robin",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "outlier_detection_lb_config_parser_test",
srcs = ["outlier_detection_lb_config_parser_test.cc"],

@ -554,6 +554,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
absl::Span<const absl::string_view> old_addresses,
absl::Span<const absl::string_view> new_addresses,
size_t num_iterations = 3, SourceLocation location = SourceLocation()) {
gpr_log(GPR_INFO, "Waiting for expected RR addresses...");
bool retval = false;
WaitForStateUpdate(
[&](FakeHelper::StateUpdate update) {
@ -567,15 +568,14 @@ class LoadBalancingPolicyTest : public ::testing::Test {
EXPECT_TRUE(picks.has_value())
<< location.file() << ":" << location.line();
if (!picks.has_value()) return false;
std::vector<absl::string_view> pick_addresses(picks->begin(),
picks->end());
gpr_log(GPR_INFO, "PICKS: %s", absl::StrJoin(*picks, " ").c_str());
// If the picks still match the old list, then keep going.
if (PicksAreRoundRobin(old_addresses, pick_addresses)) return true;
if (PicksAreRoundRobin(old_addresses, *picks)) return true;
// Otherwise, the picks should match the new list.
retval = PicksAreRoundRobin(new_addresses, pick_addresses);
retval = PicksAreRoundRobin(new_addresses, *picks);
EXPECT_TRUE(retval)
<< "Expected: " << absl::StrJoin(new_addresses, ", ")
<< "\nActual: " << absl::StrJoin(pick_addresses, ", ") << "\nat "
<< "\nActual: " << absl::StrJoin(*picks, ", ") << "\nat "
<< location.file() << ":" << location.line();
return false; // Stop.
},
@ -654,9 +654,9 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// addresses may start anywhere in the list of expected addresses but
// must then continue in round-robin fashion, with wrap-around.
bool PicksAreRoundRobin(absl::Span<const absl::string_view> expected,
absl::Span<const absl::string_view> actual) {
absl::Span<const std::string> actual) {
absl::optional<size_t> expected_index;
for (auto address : actual) {
for (const auto& address : actual) {
auto it = std::find(expected.begin(), expected.end(), address);
if (it == expected.end()) return false;
size_t index = it - expected.begin();
@ -675,11 +675,10 @@ class LoadBalancingPolicyTest : public ::testing::Test {
auto picks =
GetCompletePicks(picker, num_iterations * addresses.size(), location);
ASSERT_TRUE(picks.has_value()) << location.file() << ":" << location.line();
std::vector<absl::string_view> pick_addresses(picks->begin(), picks->end());
EXPECT_TRUE(PicksAreRoundRobin(addresses, pick_addresses))
EXPECT_TRUE(PicksAreRoundRobin(addresses, *picks))
<< "Expected: " << absl::StrJoin(addresses, ", ")
<< "Actual: " << absl::StrJoin(pick_addresses, ", ") << location.file()
<< ":" << location.line();
<< "Actual: " << absl::StrJoin(*picks, ", ") << location.file() << ":"
<< location.line();
}
// Requests a picker on picker and expects a Fail result.

@ -0,0 +1,118 @@
//
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <stddef.h>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
#include "test/core/util/test_config.h"
namespace grpc_core {
namespace testing {
namespace {
class RoundRobinTest : public LoadBalancingPolicyTest {
protected:
RoundRobinTest() : lb_policy_(MakeLbPolicy("round_robin")) {}
void ExpectStartup(absl::Span<const absl::string_view> addresses) {
EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses), lb_policy_.get()),
absl::OkStatus());
// Expect the initial CONNECTNG update with a picker that queues.
ExpectConnectingUpdate();
// RR should have created a subchannel for each address.
for (size_t i = 0; i < addresses.size(); ++i) {
auto* subchannel = FindSubchannel(addresses[i]);
ASSERT_NE(subchannel, nullptr) << "Address: " << addresses[i];
// RR should ask each subchannel to connect.
EXPECT_TRUE(subchannel->ConnectionRequested());
// The subchannel will connect successfully.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
// As each subchannel becomes READY, we should get a new picker that
// includes the behavior. Note that there may be any number of
// duplicate updates for the previous state in the queue before the
// update that we actually want to see.
if (i == 0) {
// When the first subchannel becomes READY, accept any number of
// CONNECTING updates with a picker that queues followed by a READY
// update with a picker that repeatedly returns only the first address.
auto picker = WaitForConnected();
ExpectRoundRobinPicks(picker.get(), {addresses[0]});
} else {
// When each subsequent subchannel becomes READY, we accept any number
// of READY updates where the picker returns only the previously
// connected subchannel(s) followed by a READY update where the picker
// returns the previously connected subchannel(s) *and* the newly
// connected subchannel.
WaitForRoundRobinListChange(
absl::MakeSpan(addresses).subspan(0, i),
absl::MakeSpan(addresses).subspan(0, i + 1));
}
}
}
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
};
TEST_F(RoundRobinTest, Basic) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
ExpectStartup(kAddresses);
}
TEST_F(RoundRobinTest, AddressUpdates) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
ExpectStartup(kAddresses);
// Send update to remove address 2.
EXPECT_EQ(ApplyUpdate(BuildUpdate(absl::MakeSpan(kAddresses).first(2)),
lb_policy_.get()),
absl::OkStatus());
WaitForRoundRobinListChange(kAddresses, absl::MakeSpan(kAddresses).first(2));
// Send update to remove address 0 and re-add address 2.
EXPECT_EQ(ApplyUpdate(BuildUpdate(absl::MakeSpan(kAddresses).last(2)),
lb_policy_.get()),
absl::OkStatus());
WaitForRoundRobinListChange(absl::MakeSpan(kAddresses).first(2),
absl::MakeSpan(kAddresses).last(2));
}
// TODO(roth): Add test cases:
// - empty address list
// - subchannels failing connection attempts
} // namespace
} // namespace testing
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret;
}

@ -5923,6 +5923,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "round_robin_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save