diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h index e864b8bd954..3a8e7300d42 100644 --- a/test/core/client_channel/lb_policy/lb_policy_test_lib.h +++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h @@ -34,6 +34,7 @@ #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/str_format.h" +#include "absl/strings/str_join.h" #include "absl/strings/string_view.h" #include "absl/synchronization/notification.h" #include "absl/types/optional.h" @@ -449,6 +450,10 @@ class LoadBalancingPolicyTest : public ::testing::Test { return status; } + void ExpectQueueEmpty(SourceLocation location = SourceLocation()) { + helper_->ExpectQueueEmpty(location); + } + // Keeps reading state updates until continue_predicate() returns false. // Returns false if the helper reports no events or if the event is // not a state update; otherwise (if continue_predicate() tells us to @@ -519,22 +524,61 @@ class LoadBalancingPolicyTest : public ::testing::Test { std::function check_status, SourceLocation location = SourceLocation()) { bool retval = false; - WaitForStateUpdate([&](FakeHelper::StateUpdate update) { - if (update.state == GRPC_CHANNEL_CONNECTING) { - EXPECT_TRUE(update.status.ok()) - << update.status << " at " << location.file() << ":" - << location.line(); - ExpectPickQueued(update.picker.get(), location); - return true; // Keep going. - } - EXPECT_EQ(update.state, GRPC_CHANNEL_TRANSIENT_FAILURE) - << ConnectivityStateName(update.state) << " at " << location.file() - << ":" << location.line(); - check_status(update.status); - ExpectPickFail(update.picker.get(), check_status, location); - retval = update.state == GRPC_CHANNEL_TRANSIENT_FAILURE; - return false; // Stop. - }); + WaitForStateUpdate( + [&](FakeHelper::StateUpdate update) { + if (update.state == GRPC_CHANNEL_CONNECTING) { + EXPECT_TRUE(update.status.ok()) + << update.status << " at " << location.file() << ":" + << location.line(); + ExpectPickQueued(update.picker.get(), location); + return true; // Keep going. + } + EXPECT_EQ(update.state, GRPC_CHANNEL_TRANSIENT_FAILURE) + << ConnectivityStateName(update.state) << " at " + << location.file() << ":" << location.line(); + check_status(update.status); + ExpectPickFail(update.picker.get(), check_status, location); + retval = update.state == GRPC_CHANNEL_TRANSIENT_FAILURE; + return false; // Stop. + }, + location); + return retval; + } + + // Waits for the round_robin policy to start using an updated address list. + // There can be any number of READY updates where the picker is still using + // the old list followed by one READY update where the picker is using the + // new list. Returns true if the reported states match expectations. + bool WaitForRoundRobinListChange( + absl::Span old_addresses, + absl::Span new_addresses, + size_t num_iterations = 3, SourceLocation location = SourceLocation()) { + bool retval = false; + WaitForStateUpdate( + [&](FakeHelper::StateUpdate update) { + EXPECT_EQ(update.state, GRPC_CHANNEL_READY) + << location.file() << ":" << location.line(); + if (update.state != GRPC_CHANNEL_READY) return false; + // Get enough picks to round-robin num_iterations times across all + // expected addresses. + auto picks = GetCompletePicks(update.picker.get(), + new_addresses.size() * num_iterations); + EXPECT_TRUE(picks.has_value()) + << location.file() << ":" << location.line(); + if (!picks.has_value()) return false; + std::vector pick_addresses(picks->begin(), + picks->end()); + // If the picks still match the old list, then keep going. + if (PicksAreRoundRobin(old_addresses, pick_addresses)) return true; + // Otherwise, the picks should match the new list. + retval = PicksAreRoundRobin(new_addresses, pick_addresses); + EXPECT_TRUE(retval) + << "Expected: " << absl::StrJoin(new_addresses, ", ") + << "\nActual: " << absl::StrJoin(pick_addresses, ", ") << "\nat " + << location.file() << ":" << location.line(); + return false; // Stop. + }, + location); return retval; } @@ -590,6 +634,53 @@ class LoadBalancingPolicyTest : public ::testing::Test { return subchannel->state()->address(); } + // Gets num_picks complete picks from picker and returns the resulting + // list of addresses, or nullopt if a non-complete pick was returned. + absl::optional> GetCompletePicks( + LoadBalancingPolicy::SubchannelPicker* picker, size_t num_picks, + SourceLocation location = SourceLocation()) { + std::vector results; + for (size_t i = 0; i < num_picks; ++i) { + auto address = ExpectPickComplete(picker, location); + if (!address.has_value()) return absl::nullopt; + results.emplace_back(std::move(*address)); + } + return results; + } + + // Returns true if the list of actual pick result addresses matches the + // list of expected addresses for round_robin. Note that the actual + // addresses may start anywhere in the list of expected addresses but + // must then continue in round-robin fashion, with wrap-around. + bool PicksAreRoundRobin(absl::Span expected, + absl::Span actual) { + absl::optional expected_index; + for (auto address : actual) { + auto it = std::find(expected.begin(), expected.end(), address); + if (it == expected.end()) return false; + size_t index = it - expected.begin(); + if (expected_index.has_value() && index != *expected_index) return false; + expected_index = (index + 1) % expected.size(); + } + return true; + } + + // Checks that the picker has round-robin behavior over the specified + // set of addresses. + void ExpectRoundRobinPicks(LoadBalancingPolicy::SubchannelPicker* picker, + absl::Span addresses, + size_t num_iterations = 3, + SourceLocation location = SourceLocation()) { + auto picks = + GetCompletePicks(picker, num_iterations * addresses.size(), location); + ASSERT_TRUE(picks.has_value()) << location.file() << ":" << location.line(); + std::vector pick_addresses(picks->begin(), picks->end()); + EXPECT_TRUE(PicksAreRoundRobin(addresses, pick_addresses)) + << "Expected: " << absl::StrJoin(addresses, ", ") + << "Actual: " << absl::StrJoin(pick_addresses, ", ") << location.file() + << ":" << location.line(); + } + // Requests a picker on picker and expects a Fail result. // The failing status is passed to check_status. void ExpectPickFail(LoadBalancingPolicy::SubchannelPicker* picker, @@ -651,10 +742,6 @@ class LoadBalancingPolicyTest : public ::testing::Test { return &it->second; } - void ExpectQueueEmpty(SourceLocation location = SourceLocation()) { - helper_->ExpectQueueEmpty(location); - } - std::shared_ptr work_serializer_; FakeHelper* helper_ = nullptr; std::map subchannel_pool_; diff --git a/test/core/client_channel/lb_policy/xds_override_host_test.cc b/test/core/client_channel/lb_policy/xds_override_host_test.cc index c758c450d3c..2bc0e9a420b 100644 --- a/test/core/client_channel/lb_policy/xds_override_host_test.cc +++ b/test/core/client_channel/lb_policy/xds_override_host_test.cc @@ -29,7 +29,7 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest { : policy_(MakeLbPolicy("xds_override_host_experimental")) {} RefCountedPtr MakeXdsOverrideHostConfig( - std::string child_policy = "pick_first") { + std::string child_policy = "round_robin") { Json::Object child_policy_config = {{child_policy, Json::Object()}}; return MakeConfig(Json::Array{Json::Object{ {"xds_override_host_experimental", @@ -40,31 +40,43 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest { }; TEST_F(XdsOverrideHostTest, DelegatesToChild) { - ASSERT_NE(policy_, nullptr); - const std::array kAddresses = {"ipv4:127.0.0.1:441", - "ipv4:127.0.0.1:442"}; - EXPECT_EQ(policy_->name(), "xds_override_host_experimental"); - // 1. We use pick_first as a child + // Send address list to LB policy. + const std::array kAddresses = { + "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; EXPECT_EQ(ApplyUpdate(BuildUpdate(kAddresses, MakeXdsOverrideHostConfig()), policy_.get()), absl::OkStatus()); + // Expect the initial CONNECTNG update with a picker that queues. ExpectConnectingUpdate(); - auto subchannel = - FindSubchannel({kAddresses[0]}, - ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); - ASSERT_NE(subchannel, nullptr); - ASSERT_TRUE(subchannel->ConnectionRequested()); - subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); - subchannel->SetConnectivityState(GRPC_CHANNEL_READY); - subchannel = - FindSubchannel({kAddresses[1]}, - ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); - ASSERT_NE(subchannel, nullptr); - ASSERT_FALSE(subchannel->ConnectionRequested()); - auto picker = WaitForConnected(); - // Pick first policy will always pick first! - EXPECT_EQ(ExpectPickComplete(picker.get()), "ipv4:127.0.0.1:441"); - EXPECT_EQ(ExpectPickComplete(picker.get()), "ipv4:127.0.0.1:441"); + // RR should have created a subchannel for each address. + for (size_t i = 0; i < kAddresses.size(); ++i) { + auto* subchannel = FindSubchannel(kAddresses[i]); + ASSERT_NE(subchannel, nullptr); + // RR should ask each subchannel to connect. + EXPECT_TRUE(subchannel->ConnectionRequested()); + // The subchannel will connect successfully. + subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + subchannel->SetConnectivityState(GRPC_CHANNEL_READY); + // As each subchannel becomes READY, we should get a new picker that + // includes the behavior. Note that there may be any number of + // duplicate updates for the previous state in the queue before the + // update that we actually want to see. + if (i == 0) { + // When the first subchannel becomes READY, accept any number of + // CONNECTING updates with a picker that queues followed by a READY + // update with a picker that repeatedly returns only the first address. + auto picker = WaitForConnected(); + ExpectRoundRobinPicks(picker.get(), {kAddresses[0]}); + } else { + // When each subsequent subchannel becomes READY, we accept any number + // of READY updates where the picker returns only the previously + // connected subchannel(s) followed by a READY update where the picker + // returns the previously connected subchannel(s) *and* the newly + // connected subchannel. + WaitForRoundRobinListChange(absl::MakeSpan(kAddresses).subspan(0, i), + absl::MakeSpan(kAddresses).subspan(0, i + 1)); + } + } } TEST_F(XdsOverrideHostTest, NoConfigReportsError) {