diff --git a/src/core/BUILD b/src/core/BUILD index 0d5ecd9eaef..8d38562e086 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4858,6 +4858,7 @@ grpc_cc_library( "//:grpc_trace", "//:orphanable", "//:ref_counted_ptr", + "//:sockaddr_utils", "//:work_serializer", ], ) diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 4775aedae51..08075f3cc20 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -41,6 +42,7 @@ #include #include "src/core/ext/filters/client_channel/lb_policy/health_check_client.h" +#include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" @@ -445,13 +447,49 @@ absl::Status PickFirst::UpdateLocked(UpdateArgs args) { absl::c_shuffle(*args.addresses, bit_gen_); } // Flatten the list so that we have one address per endpoint. + // While we're iterating, also determine the desired address family + // order and the index of the first element of each family, for use in + // the interleaving below. + auto get_address_family = [](const grpc_resolved_address& address) { + const char* uri_scheme = grpc_sockaddr_get_uri_scheme(&address); + return absl::string_view(uri_scheme == nullptr ? "other" : uri_scheme); + }; + std::vector address_family_order; + std::map address_family_indexes; EndpointAddressesList endpoints; for (const auto& endpoint : *args.addresses) { for (const auto& address : endpoint.addresses()) { endpoints.emplace_back(address, endpoint.args()); + if (IsPickFirstHappyEyeballsEnabled()) { + absl::string_view scheme = get_address_family(address); + bool inserted = + address_family_indexes.emplace(scheme, endpoints.size() - 1) + .second; + if (inserted) address_family_order.push_back(scheme); + } + } + } + // Interleave addresses as per RFC-8305 section 4. + if (IsPickFirstHappyEyeballsEnabled()) { + EndpointAddressesList interleaved_endpoints; + for (size_t i = 0; i < endpoints.size(); ++i) { + absl::string_view scheme_to_use = + address_family_order[i % address_family_order.size()]; + size_t& next_index = address_family_indexes[scheme_to_use]; + for (; next_index < endpoints.size(); ++next_index) { + if (get_address_family(endpoints[next_index].address()) == + scheme_to_use) { + break; + } + } + if (next_index == endpoints.size()) continue; + interleaved_endpoints.emplace_back(std::move(endpoints[next_index])); + next_index++; } + args.addresses = std::move(interleaved_endpoints); + } else { + args.addresses = std::move(endpoints); } - args.addresses = std::move(endpoints); } // If the update contains a resolver error and we have a previous update // that was not a resolver error, keep using the previous addresses. diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h index 72ed07319ef..380fe6c89f5 100644 --- a/test/core/client_channel/lb_policy/lb_policy_test_lib.h +++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h @@ -1387,7 +1387,10 @@ class LoadBalancingPolicyTest : public ::testing::Test { void IncrementTimeBy(Duration duration) { ExecCtx exec_ctx; + gpr_log(GPR_INFO, "Incrementing time by %s...", + duration.ToString().c_str()); fuzzing_ee_->TickForDuration(duration); + gpr_log(GPR_INFO, "Done incrementing time"); // Flush WorkSerializer, in case the timer callback enqueued anything. WaitForWorkSerializerToFlush(); } diff --git a/test/core/client_channel/lb_policy/pick_first_test.cc b/test/core/client_channel/lb_policy/pick_first_test.cc index b6e698782dc..0612711b6e7 100644 --- a/test/core/client_channel/lb_policy/pick_first_test.cc +++ b/test/core/client_channel/lb_policy/pick_first_test.cc @@ -589,6 +589,98 @@ TEST_F(PickFirstTest, HappyEyeballsCompletesWithoutSuccess) { } } +TEST_F(PickFirstTest, HappyEyeballsAddressInterleaving) { + if (!IsPickFirstHappyEyeballsEnabled()) return; + // Send an update containing three IPv4 addresses followed by three + // IPv6 addresses. + constexpr std::array kAddresses = { + "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445", + "ipv6:[::1]:443", "ipv6:[::1]:444", "ipv6:[::1]:445"}; + absl::Status status = ApplyUpdate( + BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy()); + EXPECT_TRUE(status.ok()) << status; + // LB policy should have created a subchannel for all addresses. + auto* subchannel_ipv4_1 = FindSubchannel(kAddresses[0]); + ASSERT_NE(subchannel_ipv4_1, nullptr); + auto* subchannel_ipv4_2 = FindSubchannel(kAddresses[1]); + ASSERT_NE(subchannel_ipv4_2, nullptr); + auto* subchannel_ipv4_3 = FindSubchannel(kAddresses[2]); + ASSERT_NE(subchannel_ipv4_3, nullptr); + auto* subchannel_ipv6_1 = FindSubchannel(kAddresses[3]); + ASSERT_NE(subchannel_ipv6_1, nullptr); + auto* subchannel_ipv6_2 = FindSubchannel(kAddresses[4]); + ASSERT_NE(subchannel_ipv6_2, nullptr); + auto* subchannel_ipv6_3 = FindSubchannel(kAddresses[5]); + ASSERT_NE(subchannel_ipv6_3, nullptr); + // When the LB policy receives the subchannels' initial connectivity + // state notifications (all IDLE), it will request a connection on the + // first IPv4 subchannel. + EXPECT_TRUE(subchannel_ipv4_1->ConnectionRequested()); + subchannel_ipv4_1->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + // LB policy should have reported CONNECTING state. + ExpectConnectingUpdate(); + // No other subchannels should be connecting. + EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested()); + EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested()); + EXPECT_FALSE(subchannel_ipv6_1->ConnectionRequested()); + EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested()); + EXPECT_FALSE(subchannel_ipv6_3->ConnectionRequested()); + // The timer fires before the connection attempt completes. + IncrementTimeBy(Duration::Milliseconds(250)); + // This causes the LB policy to start connecting to the first IPv6 + // subchannel. + EXPECT_TRUE(subchannel_ipv6_1->ConnectionRequested()); + subchannel_ipv6_1->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + // LB policy should have reported CONNECTING state. + ExpectConnectingUpdate(); + // No other subchannels should be connecting. + EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested()); + EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested()); + EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested()); + EXPECT_FALSE(subchannel_ipv6_3->ConnectionRequested()); + // The timer fires before the connection attempt completes. + IncrementTimeBy(Duration::Milliseconds(250)); + // This causes the LB policy to start connecting to the second IPv4 + // subchannel. + EXPECT_TRUE(subchannel_ipv4_2->ConnectionRequested()); + subchannel_ipv4_2->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + // LB policy should have reported CONNECTING state. + ExpectConnectingUpdate(); + // No other subchannels should be connecting. + EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested()); + EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested()); + EXPECT_FALSE(subchannel_ipv6_3->ConnectionRequested()); + // The timer fires before the connection attempt completes. + IncrementTimeBy(Duration::Milliseconds(250)); + // This causes the LB policy to start connecting to the second IPv6 + // subchannel. + EXPECT_TRUE(subchannel_ipv6_2->ConnectionRequested()); + subchannel_ipv6_2->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + // LB policy should have reported CONNECTING state. + ExpectConnectingUpdate(); + // No other subchannels should be connecting. + EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested()); + EXPECT_FALSE(subchannel_ipv6_3->ConnectionRequested()); + // The timer fires before the connection attempt completes. + IncrementTimeBy(Duration::Milliseconds(250)); + // This causes the LB policy to start connecting to the third IPv4 + // subchannel. + EXPECT_TRUE(subchannel_ipv4_3->ConnectionRequested()); + subchannel_ipv4_3->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + // LB policy should have reported CONNECTING state. + ExpectConnectingUpdate(); + // No other subchannels should be connecting. + EXPECT_FALSE(subchannel_ipv6_3->ConnectionRequested()); + // The timer fires before the connection attempt completes. + IncrementTimeBy(Duration::Milliseconds(250)); + // This causes the LB policy to start connecting to the third IPv6 + // subchannel. + EXPECT_TRUE(subchannel_ipv6_3->ConnectionRequested()); + subchannel_ipv6_3->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + // LB policy should have reported CONNECTING state. + ExpectConnectingUpdate(); +} + TEST_F(PickFirstTest, FirstAddressGoesIdleBeforeSecondOneFails) { // Send an update containing two addresses. constexpr std::array kAddresses = {