[pick_first] implement address interleaving for Happy Eyeballs (#34615)

pull/34673/head
Mark D. Roth 1 year ago committed by GitHub
parent 594d4ed34b
commit 067fc48dca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/core/BUILD
  2. 40
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  3. 3
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  4. 92
      test/core/client_channel/lb_policy/pick_first_test.cc

@ -4858,6 +4858,7 @@ grpc_cc_library(
"//:grpc_trace",
"//:orphanable",
"//:ref_counted_ptr",
"//:sockaddr_utils",
"//:work_serializer",
],
)

@ -22,6 +22,7 @@
#include <string.h>
#include <algorithm>
#include <map>
#include <memory>
#include <type_traits>
#include <utility>
@ -41,6 +42,7 @@
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/health_check_client.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
@ -445,13 +447,49 @@ absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
absl::c_shuffle(*args.addresses, bit_gen_);
}
// Flatten the list so that we have one address per endpoint.
// While we're iterating, also determine the desired address family
// order and the index of the first element of each family, for use in
// the interleaving below.
auto get_address_family = [](const grpc_resolved_address& address) {
const char* uri_scheme = grpc_sockaddr_get_uri_scheme(&address);
return absl::string_view(uri_scheme == nullptr ? "other" : uri_scheme);
};
std::vector<absl::string_view> address_family_order;
std::map<absl::string_view, size_t> address_family_indexes;
EndpointAddressesList endpoints;
for (const auto& endpoint : *args.addresses) {
for (const auto& address : endpoint.addresses()) {
endpoints.emplace_back(address, endpoint.args());
if (IsPickFirstHappyEyeballsEnabled()) {
absl::string_view scheme = get_address_family(address);
bool inserted =
address_family_indexes.emplace(scheme, endpoints.size() - 1)
.second;
if (inserted) address_family_order.push_back(scheme);
}
}
}
// Interleave addresses as per RFC-8305 section 4.
if (IsPickFirstHappyEyeballsEnabled()) {
EndpointAddressesList interleaved_endpoints;
for (size_t i = 0; i < endpoints.size(); ++i) {
absl::string_view scheme_to_use =
address_family_order[i % address_family_order.size()];
size_t& next_index = address_family_indexes[scheme_to_use];
for (; next_index < endpoints.size(); ++next_index) {
if (get_address_family(endpoints[next_index].address()) ==
scheme_to_use) {
break;
}
}
if (next_index == endpoints.size()) continue;
interleaved_endpoints.emplace_back(std::move(endpoints[next_index]));
next_index++;
}
args.addresses = std::move(interleaved_endpoints);
} else {
args.addresses = std::move(endpoints);
}
args.addresses = std::move(endpoints);
}
// If the update contains a resolver error and we have a previous update
// that was not a resolver error, keep using the previous addresses.

@ -1387,7 +1387,10 @@ class LoadBalancingPolicyTest : public ::testing::Test {
void IncrementTimeBy(Duration duration) {
ExecCtx exec_ctx;
gpr_log(GPR_INFO, "Incrementing time by %s...",
duration.ToString().c_str());
fuzzing_ee_->TickForDuration(duration);
gpr_log(GPR_INFO, "Done incrementing time");
// Flush WorkSerializer, in case the timer callback enqueued anything.
WaitForWorkSerializerToFlush();
}

@ -589,6 +589,98 @@ TEST_F(PickFirstTest, HappyEyeballsCompletesWithoutSuccess) {
}
}
TEST_F(PickFirstTest, HappyEyeballsAddressInterleaving) {
if (!IsPickFirstHappyEyeballsEnabled()) return;
// Send an update containing three IPv4 addresses followed by three
// IPv6 addresses.
constexpr std::array<absl::string_view, 6> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445",
"ipv6:[::1]:443", "ipv6:[::1]:444", "ipv6:[::1]:445"};
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for all addresses.
auto* subchannel_ipv4_1 = FindSubchannel(kAddresses[0]);
ASSERT_NE(subchannel_ipv4_1, nullptr);
auto* subchannel_ipv4_2 = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel_ipv4_2, nullptr);
auto* subchannel_ipv4_3 = FindSubchannel(kAddresses[2]);
ASSERT_NE(subchannel_ipv4_3, nullptr);
auto* subchannel_ipv6_1 = FindSubchannel(kAddresses[3]);
ASSERT_NE(subchannel_ipv6_1, nullptr);
auto* subchannel_ipv6_2 = FindSubchannel(kAddresses[4]);
ASSERT_NE(subchannel_ipv6_2, nullptr);
auto* subchannel_ipv6_3 = FindSubchannel(kAddresses[5]);
ASSERT_NE(subchannel_ipv6_3, nullptr);
// When the LB policy receives the subchannels' initial connectivity
// state notifications (all IDLE), it will request a connection on the
// first IPv4 subchannel.
EXPECT_TRUE(subchannel_ipv4_1->ConnectionRequested());
subchannel_ipv4_1->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// No other subchannels should be connecting.
EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv6_1->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv6_3->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the first IPv6
// subchannel.
EXPECT_TRUE(subchannel_ipv6_1->ConnectionRequested());
subchannel_ipv6_1->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// No other subchannels should be connecting.
EXPECT_FALSE(subchannel_ipv4_2->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv6_3->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the second IPv4
// subchannel.
EXPECT_TRUE(subchannel_ipv4_2->ConnectionRequested());
subchannel_ipv4_2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// No other subchannels should be connecting.
EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv6_2->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv6_3->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the second IPv6
// subchannel.
EXPECT_TRUE(subchannel_ipv6_2->ConnectionRequested());
subchannel_ipv6_2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// No other subchannels should be connecting.
EXPECT_FALSE(subchannel_ipv4_3->ConnectionRequested());
EXPECT_FALSE(subchannel_ipv6_3->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the third IPv4
// subchannel.
EXPECT_TRUE(subchannel_ipv4_3->ConnectionRequested());
subchannel_ipv4_3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// No other subchannels should be connecting.
EXPECT_FALSE(subchannel_ipv6_3->ConnectionRequested());
// The timer fires before the connection attempt completes.
IncrementTimeBy(Duration::Milliseconds(250));
// This causes the LB policy to start connecting to the third IPv6
// subchannel.
EXPECT_TRUE(subchannel_ipv6_3->ConnectionRequested());
subchannel_ipv6_3->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
}
TEST_F(PickFirstTest, FirstAddressGoesIdleBeforeSecondOneFails) {
// Send an update containing two addresses.
constexpr std::array<absl::string_view, 2> kAddresses = {

Loading…
Cancel
Save