xds failover locality handling

pull/19981/head
Juanli Shen 5 years ago
parent 5052efd666
commit 6ced125bb3
  1. 7
      include/grpc/impl/codegen/grpc_types.h
  2. 958
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  3. 63
      src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc
  4. 71
      src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h
  5. 4
      src/core/lib/gprpp/inlined_vector.h
  6. 18
      test/core/gprpp/inlined_vector_test.cc
  7. 4
      test/core/util/ubsan_suppressions.txt
  8. 166
      test/cpp/end2end/xds_end2end_test.cc

@ -339,6 +339,13 @@ typedef struct {
value is 15 minutes. */
#define GRPC_ARG_LOCALITY_RETENTION_INTERVAL_MS \
"grpc.xds_locality_retention_interval_ms"
/* Timeout in milliseconds to wait for the localities of a specific priority to
complete their initial connection attempt before xDS fails over to the next
priority. Specifically, the connection attempt of a priority is considered
completed when any locality of that priority is ready or all the localities
of that priority fail to connect. If 0, failover happens immediately. Default
value is 10 seconds. */
#define GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS "grpc.xds_failover_timeout_ms"
/** If non-zero, grpc server's cronet compression workaround will be enabled */
#define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \
"grpc.workaround.cronet_compression"

File diff suppressed because it is too large Load Diff

@ -53,6 +53,42 @@ constexpr char kEndpointRequired[] = "endpointRequired";
} // namespace
bool XdsPriorityListUpdate::operator==(
const XdsPriorityListUpdate& other) const {
if (priorities_.size() != other.priorities_.size()) return false;
for (size_t i = 0; i < priorities_.size(); ++i) {
if (priorities_[i].localities != other.priorities_[i].localities) {
return false;
}
}
return true;
}
void XdsPriorityListUpdate::Add(
XdsPriorityListUpdate::LocalityMap::Locality locality) {
// Pad the missing priorities in case the localities are not ordered by
// priority.
// TODO(juanlishen): Implement InlinedVector::resize() and use that instead.
while (!Contains(locality.priority)) priorities_.emplace_back();
LocalityMap& locality_map = priorities_[locality.priority];
locality_map.localities.emplace(locality.name, std::move(locality));
}
const XdsPriorityListUpdate::LocalityMap* XdsPriorityListUpdate::Find(
uint32_t priority) const {
if (!Contains(priority)) return nullptr;
return &priorities_[priority];
}
bool XdsPriorityListUpdate::Contains(
const RefCountedPtr<XdsLocalityName>& name) {
for (size_t i = 0; i < priorities_.size(); ++i) {
const LocalityMap& locality_map = priorities_[i];
if (locality_map.Contains(name)) return true;
}
return false;
}
bool XdsDropConfig::ShouldDrop(const UniquePtr<char>** category_name) const {
for (size_t i = 0; i < drop_category_list_.size(); ++i) {
const auto& drop_category = drop_category_list_[i];
@ -136,7 +172,7 @@ UniquePtr<char> StringCopy(const upb_strview& strview) {
grpc_error* LocalityParse(
const envoy_api_v2_endpoint_LocalityLbEndpoints* locality_lb_endpoints,
XdsLocalityInfo* locality_info) {
XdsPriorityListUpdate::LocalityMap::Locality* output_locality) {
// Parse LB weight.
const google_protobuf_UInt32Value* lb_weight =
envoy_api_v2_endpoint_LocalityLbEndpoints_load_balancing_weight(
@ -144,13 +180,13 @@ grpc_error* LocalityParse(
// If LB weight is not specified, it means this locality is assigned no load.
// TODO(juanlishen): When we support CDS to configure the inter-locality
// policy, we should change the LB weight handling.
locality_info->lb_weight =
output_locality->lb_weight =
lb_weight != nullptr ? google_protobuf_UInt32Value_value(lb_weight) : 0;
if (locality_info->lb_weight == 0) return GRPC_ERROR_NONE;
if (output_locality->lb_weight == 0) return GRPC_ERROR_NONE;
// Parse locality name.
const envoy_api_v2_core_Locality* locality =
envoy_api_v2_endpoint_LocalityLbEndpoints_locality(locality_lb_endpoints);
locality_info->locality_name = MakeRefCounted<XdsLocalityName>(
output_locality->name = MakeRefCounted<XdsLocalityName>(
StringCopy(envoy_api_v2_core_Locality_region(locality)),
StringCopy(envoy_api_v2_core_Locality_zone(locality)),
StringCopy(envoy_api_v2_core_Locality_sub_zone(locality)));
@ -160,12 +196,12 @@ grpc_error* LocalityParse(
envoy_api_v2_endpoint_LocalityLbEndpoints_lb_endpoints(
locality_lb_endpoints, &size);
for (size_t i = 0; i < size; ++i) {
grpc_error* error = ServerAddressParseAndAppend(lb_endpoints[i],
&locality_info->serverlist);
grpc_error* error = ServerAddressParseAndAppend(
lb_endpoints[i], &output_locality->serverlist);
if (error != GRPC_ERROR_NONE) return error;
}
// Parse the priority.
locality_info->priority =
output_locality->priority =
envoy_api_v2_endpoint_LocalityLbEndpoints_priority(locality_lb_endpoints);
return GRPC_ERROR_NONE;
}
@ -253,18 +289,13 @@ grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response,
envoy_api_v2_ClusterLoadAssignment_endpoints(cluster_load_assignment,
&size);
for (size_t i = 0; i < size; ++i) {
XdsLocalityInfo locality_info;
grpc_error* error = LocalityParse(endpoints[i], &locality_info);
XdsPriorityListUpdate::LocalityMap::Locality locality;
grpc_error* error = LocalityParse(endpoints[i], &locality);
if (error != GRPC_ERROR_NONE) return error;
// Filter out locality with weight 0.
if (locality_info.lb_weight == 0) continue;
update->locality_list.push_back(std::move(locality_info));
if (locality.lb_weight == 0) continue;
update->priority_list_update.Add(locality);
}
// The locality list is sorted here into deterministic order so that it's
// easier to check if two locality lists contain the same set of localities.
std::sort(update->locality_list.data(),
update->locality_list.data() + update->locality_list.size(),
XdsLocalityInfo::Less());
// Get the drop config.
update->drop_config = MakeRefCounted<XdsDropConfig>();
const envoy_api_v2_ClusterLoadAssignment_Policy* policy =

@ -23,33 +23,66 @@
#include <grpc/slice_buffer.h>
#include <stdint.h>
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h"
#include "src/core/ext/filters/client_channel/server_address.h"
namespace grpc_core {
struct XdsLocalityInfo {
bool operator==(const XdsLocalityInfo& other) const {
return *locality_name == *other.locality_name &&
serverlist == other.serverlist && lb_weight == other.lb_weight &&
priority == other.priority;
}
// This comparator only compares the locality names.
struct Less {
bool operator()(const XdsLocalityInfo& lhs,
const XdsLocalityInfo& rhs) const {
return XdsLocalityName::Less()(lhs.locality_name, rhs.locality_name);
class XdsPriorityListUpdate {
public:
struct LocalityMap {
struct Locality {
bool operator==(const Locality& other) const {
return *name == *other.name && serverlist == other.serverlist &&
lb_weight == other.lb_weight && priority == other.priority;
}
// This comparator only compares the locality names.
struct Less {
bool operator()(const Locality& lhs, const Locality& rhs) const {
return XdsLocalityName::Less()(lhs.name, rhs.name);
}
};
RefCountedPtr<XdsLocalityName> name;
ServerAddressList serverlist;
uint32_t lb_weight;
uint32_t priority;
};
bool Contains(const RefCountedPtr<XdsLocalityName>& name) const {
return localities.find(name) != localities.end();
}
size_t size() const { return localities.size(); }
Map<RefCountedPtr<XdsLocalityName>, Locality, XdsLocalityName::Less>
localities;
};
RefCountedPtr<XdsLocalityName> locality_name;
ServerAddressList serverlist;
uint32_t lb_weight;
uint32_t priority;
};
bool operator==(const XdsPriorityListUpdate& other) const;
void Add(LocalityMap::Locality locality);
const LocalityMap* Find(uint32_t priority) const;
using XdsLocalityList = InlinedVector<XdsLocalityInfo, 1>;
bool Contains(uint32_t priority) const {
return priority < priorities_.size();
}
bool Contains(const RefCountedPtr<XdsLocalityName>& name);
bool empty() const { return priorities_.empty(); }
size_t size() const { return priorities_.size(); }
// Callers should make sure the priority list is non-empty.
uint32_t LowestPriority() const {
return static_cast<uint32_t>(priorities_.size()) - 1;
}
private:
InlinedVector<LocalityMap, 2> priorities_;
};
// There are two phases of accessing this class's content:
// 1. to initialize in the control plane combiner;
@ -93,7 +126,7 @@ class XdsDropConfig : public RefCounted<XdsDropConfig> {
};
struct XdsUpdate {
XdsLocalityList locality_list;
XdsPriorityListUpdate priority_list_update;
RefCountedPtr<XdsDropConfig> drop_config;
bool drop_all = false;
};

@ -107,6 +107,10 @@ class InlinedVector {
return true;
}
bool operator!=(const InlinedVector& other) const {
return !(*this == other);
}
void reserve(size_t capacity) {
if (capacity > capacity_) {
T* new_dynamic =

@ -127,6 +127,24 @@ TEST(InlinedVectorTest, EqualOperator) {
EXPECT_FALSE(v1 == v2);
}
TEST(InlinedVectorTest, NotEqualOperator) {
constexpr int kNumElements = 10;
// Both v1 and v2 are empty.
InlinedVector<int, 5> v1;
InlinedVector<int, 5> v2;
EXPECT_FALSE(v1 != v2);
// Both v1 and v2 contains the same data.
FillVector(&v1, kNumElements);
FillVector(&v2, kNumElements);
EXPECT_FALSE(v1 != v2);
// The sizes of v1 and v2 are different.
v1.push_back(0);
EXPECT_TRUE(v1 != v2);
// The contents of v1 and v2 are different although their sizes are the same.
v2.push_back(1);
EXPECT_TRUE(v1 != v2);
}
// the following constants and typedefs are used for copy/move
// construction/assignment
const size_t kInlinedLength = 8;

@ -22,3 +22,7 @@ signed-integer-overflow:chrono
enum:grpc_http2_error_to_grpc_status
enum:grpc_chttp2_cancel_stream
enum:api_fuzzer
# TODO(juanlishen): Remove this supression after
# https://github.com/GoogleCloudPlatform/layer-definitions/issues/531 is
# addressed.
alignment:grpc_core::XdsPriorityListUpdate::*

@ -99,6 +99,7 @@ constexpr char kDefaultLocalityZone[] = "xds_default_locality_zone";
constexpr char kLbDropType[] = "lb";
constexpr char kThrottleDropType[] = "throttle";
constexpr int kDefaultLocalityWeight = 3;
constexpr int kDefaultLocalityPriority = 0;
template <typename ServiceType>
class CountedService : public ServiceType {
@ -262,7 +263,8 @@ class EdsServiceImpl : public EdsService {
struct ResponseArgs {
struct Locality {
Locality(const grpc::string& sub_zone, std::vector<int> ports,
int lb_weight = kDefaultLocalityWeight, int priority = 0)
int lb_weight = kDefaultLocalityWeight,
int priority = kDefaultLocalityPriority)
: sub_zone(std::move(sub_zone)),
ports(std::move(ports)),
lb_weight(lb_weight),
@ -566,7 +568,7 @@ class XdsEnd2endTest : public ::testing::Test {
void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); }
void ResetStub(int fallback_timeout = 0,
void ResetStub(int fallback_timeout = 0, int failover_timeout = 0,
const grpc::string& expected_targets = "",
grpc::string scheme = "") {
ChannelArguments args;
@ -574,6 +576,9 @@ class XdsEnd2endTest : public ::testing::Test {
if (fallback_timeout > 0) {
args.SetInt(GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS, fallback_timeout);
}
if (failover_timeout > 0) {
args.SetInt(GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS, failover_timeout);
}
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_.get());
if (!expected_targets.empty()) {
@ -922,7 +927,7 @@ class XdsResolverTest : public XdsEnd2endTest {
// used.
TEST_F(XdsResolverTest, XdsResolverIsUsed) {
// Use xds-experimental scheme in URI.
ResetStub(0, "", "xds-experimental");
ResetStub(0, 0, "", "xds-experimental");
// Send an RPC to trigger resolution.
auto unused_result = SendRpc();
// Xds resolver returns xds_experimental as the LB policy.
@ -1067,7 +1072,7 @@ using SecureNamingTest = BasicTest;
// Tests that secure naming check passes if target name is expected.
TEST_F(SecureNamingTest, TargetNameIsExpected) {
// TODO(juanlishen): Use separate fake creds for the balancer channel.
ResetStub(0, kApplicationTargetName_ + ";lb");
ResetStub(0, 0, kApplicationTargetName_ + ";lb");
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannel({balancers_[0]->port()});
const size_t kNumRpcsPerAddress = 100;
@ -1098,7 +1103,7 @@ TEST_F(SecureNamingTest, TargetNameIsUnexpected) {
// the name from the balancer doesn't match expectations.
ASSERT_DEATH_IF_SUPPORTED(
{
ResetStub(0, kApplicationTargetName_ + ";lb");
ResetStub(0, 0, kApplicationTargetName_ + ";lb");
SetNextResolution({},
"{\n"
" \"loadBalancingConfig\":[\n"
@ -1287,6 +1292,151 @@ TEST_F(LocalityMapTest, UpdateMap) {
EXPECT_EQ(2U, balancers_[0]->eds_service()->response_count());
}
class FailoverTest : public BasicTest {
public:
FailoverTest() { ResetStub(0, 100, "", ""); }
};
// Localities with the highest priority are used when multiple priority exist.
TEST_F(FailoverTest, ChooseHighestPriority) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1},
{"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 2},
{"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 3},
{"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 0},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
WaitForBackend(3, false);
for (size_t i = 0; i < 3; ++i) {
EXPECT_EQ(0, backends_[i]->backend_service()->request_count());
}
// The EDS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count());
EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count());
}
// If the higher priority localities are not reachable, failover to the highest
// priority among the rest.
TEST_F(FailoverTest, Failover) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1},
{"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 2},
{"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 3},
{"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 0},
});
ShutdownBackend(3);
ShutdownBackend(0);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
WaitForBackend(1, false);
for (size_t i = 0; i < 4; ++i) {
if (i == 1) continue;
EXPECT_EQ(0, backends_[i]->backend_service()->request_count());
}
// The EDS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count());
EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count());
}
// If a locality with higher priority than the current one becomes ready,
// switch to it.
TEST_F(FailoverTest, SwitchBackToHigherPriority) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 100;
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1},
{"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 2},
{"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 3},
{"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 0},
});
ShutdownBackend(3);
ShutdownBackend(0);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
WaitForBackend(1, false);
for (size_t i = 0; i < 4; ++i) {
if (i == 1) continue;
EXPECT_EQ(0, backends_[i]->backend_service()->request_count());
}
StartBackend(0);
WaitForBackend(0);
CheckRpcSendOk(kNumRpcs);
EXPECT_EQ(kNumRpcs, backends_[0]->backend_service()->request_count());
// The EDS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count());
EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count());
}
// The first update only contains unavailable priorities. The second update
// contains available priorities.
TEST_F(FailoverTest, UpdateInitialUnavailable) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 0},
{"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 1},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
args = EdsServiceImpl::ResponseArgs({
{"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 0},
{"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 1},
{"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 2},
{"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 3},
});
ShutdownBackend(0);
ShutdownBackend(1);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 1000);
gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(500, GPR_TIMESPAN));
// Send 0.5 second worth of RPCs.
do {
CheckRpcSendFailure();
} while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);
WaitForBackend(2, false);
for (size_t i = 0; i < 4; ++i) {
if (i == 2) continue;
EXPECT_EQ(0, backends_[i]->backend_service()->request_count());
}
// The EDS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count());
EXPECT_EQ(2U, balancers_[0]->eds_service()->response_count());
}
// Tests that after the localities' priorities are updated, we still choose the
// highest READY priority with the updated localities.
TEST_F(FailoverTest, UpdatePriority) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 100;
EdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1},
{"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 2},
{"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 3},
{"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 0},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0);
args = EdsServiceImpl::ResponseArgs({
{"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 2},
{"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 0},
{"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 1},
{"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 3},
});
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 1000);
WaitForBackend(3, false);
for (size_t i = 0; i < 3; ++i) {
EXPECT_EQ(0, backends_[i]->backend_service()->request_count());
}
WaitForBackend(1);
CheckRpcSendOk(kNumRpcs);
EXPECT_EQ(kNumRpcs, backends_[1]->backend_service()->request_count());
// The EDS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count());
EXPECT_EQ(2U, balancers_[0]->eds_service()->response_count());
}
using DropTest = BasicTest;
// Tests that RPCs are dropped according to the drop config.
@ -1760,9 +1910,9 @@ TEST_F(FallbackTest, FallbackModeIsExitedAfterChildRready) {
SetNextResolutionForLbChannelAllBalancers();
// The state (TRANSIENT_FAILURE) update from the child policy will be ignored
// because we are still in fallback mode.
gpr_timespec deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(5000, GPR_TIMESPAN));
// Send 5 seconds worth of RPCs.
gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(500, GPR_TIMESPAN));
// Send 0.5 second worth of RPCs.
do {
CheckRpcSendOk();
} while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);

Loading…
Cancel
Save