Merge pull request #20794 from markdroth/xds_new_fields

Second attempt: Add new fields to xds policy config.
pull/20804/head
Mark D. Roth 5 years ago committed by GitHub
commit d2dccbc8e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 198
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  2. 21
      src/core/ext/filters/client_channel/xds/xds_client.cc
  3. 11
      src/core/ext/filters/client_channel/xds/xds_client.h
  4. 212
      test/cpp/end2end/xds_end2end_test.cc

@ -77,9 +77,14 @@ constexpr char kXds[] = "xds_experimental";
class ParsedXdsConfig : public LoadBalancingPolicy::Config {
public:
ParsedXdsConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy)
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy,
UniquePtr<char> eds_service_name,
UniquePtr<char> lrs_load_reporting_server_name)
: child_policy_(std::move(child_policy)),
fallback_policy_(std::move(fallback_policy)) {}
fallback_policy_(std::move(fallback_policy)),
eds_service_name_(std::move(eds_service_name)),
lrs_load_reporting_server_name_(
std::move(lrs_load_reporting_server_name)) {}
const char* name() const override { return kXds; }
@ -91,9 +96,17 @@ class ParsedXdsConfig : public LoadBalancingPolicy::Config {
return fallback_policy_;
}
const char* eds_service_name() const { return eds_service_name_.get(); };
const char* lrs_load_reporting_server_name() const {
return lrs_load_reporting_server_name_.get();
};
private:
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy_;
UniquePtr<char> eds_service_name_;
UniquePtr<char> lrs_load_reporting_server_name_;
};
class XdsLb : public LoadBalancingPolicy {
@ -111,16 +124,17 @@ class XdsLb : public LoadBalancingPolicy {
// We need this wrapper for the following reasons:
// 1. To process per-locality load reporting.
// 2. Since pickers are UniquePtrs we use this RefCounted wrapper to control
// references to it by the xds picker and the locality.
class PickerWrapper : public RefCounted<PickerWrapper> {
// references to it by the xds picker and the locality.
class EndpointPickerWrapper : public RefCounted<EndpointPickerWrapper> {
public:
PickerWrapper(UniquePtr<SubchannelPicker> picker,
RefCountedPtr<XdsClientStats::LocalityStats> locality_stats)
EndpointPickerWrapper(
UniquePtr<SubchannelPicker> picker,
RefCountedPtr<XdsClientStats::LocalityStats> locality_stats)
: picker_(std::move(picker)),
locality_stats_(std::move(locality_stats)) {
locality_stats_->RefByPicker();
}
~PickerWrapper() { locality_stats_->UnrefByPicker(); }
~EndpointPickerWrapper() { locality_stats_->UnrefByPicker(); }
PickResult Pick(PickArgs args);
@ -131,15 +145,16 @@ class XdsLb : public LoadBalancingPolicy {
// The picker will use a stateless weighting algorithm to pick the locality to
// use for each request.
class Picker : public SubchannelPicker {
class LocalityPicker : public SubchannelPicker {
public:
// Maintains a weighted list of pickers from each locality that is in ready
// state. The first element in the pair represents the end of a range
// proportional to the locality's weight. The start of the range is the
// previous value in the vector and is 0 for the first element.
using PickerList =
InlinedVector<std::pair<uint32_t, RefCountedPtr<PickerWrapper>>, 1>;
Picker(RefCountedPtr<XdsLb> xds_policy, PickerList pickers)
InlinedVector<std::pair<uint32_t, RefCountedPtr<EndpointPickerWrapper>>,
1>;
LocalityPicker(RefCountedPtr<XdsLb> xds_policy, PickerList pickers)
: xds_policy_(std::move(xds_policy)),
pickers_(std::move(pickers)),
drop_config_(xds_policy_->drop_config_) {}
@ -204,7 +219,7 @@ class XdsLb : public LoadBalancingPolicy {
return connectivity_state_;
}
uint32_t weight() const { return weight_; }
RefCountedPtr<PickerWrapper> picker_wrapper() const {
RefCountedPtr<EndpointPickerWrapper> picker_wrapper() const {
return picker_wrapper_;
}
@ -256,7 +271,7 @@ class XdsLb : public LoadBalancingPolicy {
RefCountedPtr<XdsLocalityName> name_;
OrphanablePtr<LoadBalancingPolicy> child_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
RefCountedPtr<PickerWrapper> picker_wrapper_;
RefCountedPtr<EndpointPickerWrapper> picker_wrapper_;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
uint32_t weight_;
@ -377,16 +392,24 @@ class XdsLb : public LoadBalancingPolicy {
const char* name, const grpc_channel_args* args);
void MaybeExitFallbackMode();
const char* eds_service_name() const {
if (config_ != nullptr && config_->eds_service_name() != nullptr) {
return config_->eds_service_name();
}
return server_name_.get();
}
XdsClient* xds_client() const {
return xds_client_from_channel_ != nullptr ? xds_client_from_channel_.get()
: xds_client_.get();
}
// Name of the backend server to connect to.
const char* server_name_ = nullptr;
// Server name from target URI.
UniquePtr<char> server_name_;
// Current channel args from the resolver.
// Current channel args and config from the resolver.
const grpc_channel_args* args_ = nullptr;
RefCountedPtr<ParsedXdsConfig> config_;
// Internal state.
bool shutting_down_ = false;
@ -418,14 +441,10 @@ class XdsLb : public LoadBalancingPolicy {
grpc_timer lb_fallback_timer_;
grpc_closure lb_on_fallback_;
// The policy to use for the fallback backends.
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy_config_;
// Non-null iff we are in fallback mode.
OrphanablePtr<LoadBalancingPolicy> fallback_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_fallback_policy_;
// The policy to use for the backends.
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config_;
const grpc_millis locality_retention_interval_ms_;
const grpc_millis locality_map_failover_timeout_ms_;
// A list of locality maps indexed by priority.
@ -441,10 +460,10 @@ class XdsLb : public LoadBalancingPolicy {
};
//
// XdsLb::PickerWrapper::Pick
// XdsLb::EndpointPickerWrapper
//
LoadBalancingPolicy::PickResult XdsLb::PickerWrapper::Pick(
LoadBalancingPolicy::PickResult XdsLb::EndpointPickerWrapper::Pick(
LoadBalancingPolicy::PickArgs args) {
// Forward the pick to the picker returned from the child policy.
PickResult result = picker_->Pick(args);
@ -470,10 +489,10 @@ LoadBalancingPolicy::PickResult XdsLb::PickerWrapper::Pick(
}
//
// XdsLb::Picker
// XdsLb::LocalityPicker
//
XdsLb::PickResult XdsLb::Picker::Pick(PickArgs args) {
XdsLb::PickResult XdsLb::LocalityPicker::Pick(PickArgs args) {
// Handle drop.
const UniquePtr<char>* drop_category;
if (drop_config_->ShouldDrop(&drop_category)) {
@ -489,8 +508,8 @@ XdsLb::PickResult XdsLb::Picker::Pick(PickArgs args) {
return PickFromLocality(key, args);
}
XdsLb::PickResult XdsLb::Picker::PickFromLocality(const uint32_t key,
PickArgs args) {
XdsLb::PickResult XdsLb::LocalityPicker::PickFromLocality(const uint32_t key,
PickArgs args) {
size_t mid = 0;
size_t start_index = 0;
size_t end_index = pickers_.size() - 1;
@ -686,11 +705,11 @@ XdsLb::XdsLb(Args args)
GPR_ASSERT(server_uri != nullptr);
grpc_uri* uri = grpc_uri_parse(server_uri, true);
GPR_ASSERT(uri->path[0] != '\0');
server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
server_name_.reset(
gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path));
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO,
"[xdslb %p] Will use '%s' as the server name for LB request.", this,
server_name_);
gpr_log(GPR_INFO, "[xdslb %p] server name from channel: %s", this,
server_name_.get());
}
grpc_uri_destroy(uri);
}
@ -699,7 +718,6 @@ XdsLb::~XdsLb() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] destroying xds LB policy", this);
}
gpr_free((void*)server_name_);
grpc_channel_args_destroy(args_);
}
@ -722,9 +740,13 @@ void XdsLb::ShutdownLocked() {
pending_fallback_policy_.reset();
// Cancel the endpoint watch here instead of in our dtor, because the
// watcher holds a ref to us.
xds_client()->CancelEndpointDataWatch(StringView(server_name_),
xds_client()->CancelEndpointDataWatch(StringView(eds_service_name()),
endpoint_watcher_);
xds_client()->RemoveClientStats(StringView(server_name_), &client_stats_);
if (config_->lrs_load_reporting_server_name() != nullptr) {
xds_client()->RemoveClientStats(
StringView(config_->lrs_load_reporting_server_name()),
StringView(eds_service_name()), &client_stats_);
}
xds_client_from_channel_.reset();
xds_client_.reset();
}
@ -753,9 +775,9 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
}
const bool is_initial_update = args_ == nullptr;
// Update config.
auto* xds_config = static_cast<const ParsedXdsConfig*>(args.config.get());
child_policy_config_ = xds_config->child_policy();
fallback_policy_config_ = xds_config->fallback_policy();
const char* old_eds_service_name = eds_service_name();
auto old_config = std::move(config_);
config_ = std::move(args.config);
// Update fallback address list.
fallback_backend_addresses_ = std::move(args.addresses);
// Update args.
@ -772,7 +794,7 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
if (xds_client_from_channel_ == nullptr) {
grpc_error* error = GRPC_ERROR_NONE;
xds_client_ = MakeOrphanable<XdsClient>(
combiner(), interested_parties(), StringView(server_name_),
combiner(), interested_parties(), StringView(eds_service_name()),
nullptr /* service config watcher */, *args_, &error);
// TODO(roth): If we decide that we care about fallback mode, add
// proper error handling here.
@ -782,11 +804,6 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
xds_client_.get());
}
}
auto watcher = MakeUnique<EndpointWatcher>(Ref());
endpoint_watcher_ = watcher.get();
xds_client()->WatchEndpointData(StringView(server_name_),
std::move(watcher));
xds_client()->AddClientStats(StringView(server_name_), &client_stats_);
// Start fallback-at-startup checks.
grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure
@ -795,6 +812,42 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
fallback_at_startup_checks_pending_ = true;
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
}
// Update endpoint watcher if needed.
if (is_initial_update ||
strcmp(old_eds_service_name, eds_service_name()) != 0) {
if (!is_initial_update) {
xds_client()->CancelEndpointDataWatch(StringView(old_eds_service_name),
endpoint_watcher_);
}
auto watcher = MakeUnique<EndpointWatcher>(Ref());
endpoint_watcher_ = watcher.get();
xds_client()->WatchEndpointData(StringView(eds_service_name()),
std::move(watcher));
}
// Update load reporting if needed.
// TODO(roth): Ideally, we should not collect any stats if load reporting
// is disabled, which would require changing this code to recreate
// all of the pickers whenever load reporting is enabled or disabled
// here.
if (is_initial_update ||
(config_->lrs_load_reporting_server_name() == nullptr) !=
(old_config->lrs_load_reporting_server_name() == nullptr) ||
(config_->lrs_load_reporting_server_name() != nullptr &&
old_config->lrs_load_reporting_server_name() != nullptr &&
strcmp(config_->lrs_load_reporting_server_name(),
old_config->lrs_load_reporting_server_name()) != 0)) {
if (old_config != nullptr &&
old_config->lrs_load_reporting_server_name() != nullptr) {
xds_client()->RemoveClientStats(
StringView(old_config->lrs_load_reporting_server_name()),
StringView(old_eds_service_name), &client_stats_);
}
if (config_->lrs_load_reporting_server_name() != nullptr) {
xds_client()->AddClientStats(
StringView(config_->lrs_load_reporting_server_name()),
StringView(eds_service_name()), &client_stats_);
}
}
}
//
@ -839,9 +892,7 @@ void XdsLb::UpdateFallbackPolicyLocked() {
// Construct update args.
UpdateArgs update_args;
update_args.addresses = fallback_backend_addresses_;
update_args.config = fallback_policy_config_ == nullptr
? nullptr
: fallback_policy_config_->Ref();
update_args.config = config_->fallback_policy();
update_args.args = grpc_channel_args_copy(args_);
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
@ -892,9 +943,9 @@ void XdsLb::UpdateFallbackPolicyLocked() {
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
const char* fallback_policy_name = fallback_policy_config_ == nullptr
const char* fallback_policy_name = update_args.config == nullptr
? "round_robin"
: fallback_policy_config_->name();
: update_args.config->name();
const bool create_policy =
// case 1
fallback_policy_ == nullptr ||
@ -1175,7 +1226,7 @@ void XdsLb::PriorityList::LocalityMap::UpdateXdsPickerLocked() {
// that are ready. Each locality is represented by a portion of the range
// proportional to its weight, such that the total range is the sum of the
// weights of all localities.
Picker::PickerList picker_list;
LocalityPicker::PickerList picker_list;
uint32_t end = 0;
for (const auto& p : localities_) {
const auto& locality_name = p.first;
@ -1187,9 +1238,9 @@ void XdsLb::PriorityList::LocalityMap::UpdateXdsPickerLocked() {
picker_list.push_back(std::make_pair(end, locality->picker_wrapper()));
}
xds_policy()->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY,
MakeUnique<Picker>(xds_policy_->Ref(DEBUG_LOCATION, "XdsLb+Picker"),
std::move(picker_list)));
GRPC_CHANNEL_READY, MakeUnique<LocalityPicker>(
xds_policy_->Ref(DEBUG_LOCATION, "XdsLb+Picker"),
std::move(picker_list)));
}
OrphanablePtr<XdsLb::PriorityList::LocalityMap::Locality>
@ -1490,9 +1541,7 @@ void XdsLb::PriorityList::LocalityMap::Locality::UpdateLocked(
// Construct update args.
UpdateArgs update_args;
update_args.addresses = std::move(serverlist);
update_args.config = xds_policy()->child_policy_config_ == nullptr
? nullptr
: xds_policy()->child_policy_config_->Ref();
update_args.config = xds_policy()->config_->child_policy();
update_args.args = CreateChildPolicyArgsLocked(xds_policy()->args_);
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
@ -1545,10 +1594,9 @@ void XdsLb::PriorityList::LocalityMap::Locality::UpdateLocked(
// when the new child transitions into state READY.
// TODO(juanlishen): If the child policy is not configured via service config,
// use whatever algorithm is specified by the balancer.
const char* child_policy_name =
xds_policy()->child_policy_config_ == nullptr
? "round_robin"
: xds_policy()->child_policy_config_->name();
const char* child_policy_name = update_args.config == nullptr
? "round_robin"
: update_args.config->name();
const bool create_policy =
// case 1
child_policy_ == nullptr ||
@ -1715,7 +1763,11 @@ void XdsLb::PriorityList::LocalityMap::Locality::Helper::UpdateState(
return;
}
// Cache the picker and its state in the locality.
locality_->picker_wrapper_ = MakeRefCounted<PickerWrapper>(
// TODO(roth): If load reporting is not configured, we should ideally
// pass a null LocalityStats ref to the EndpointPickerWrapper and have it
// not collect any stats, since they're not going to be used. This would
// require recreating all of the pickers whenever we get a config update.
locality_->picker_wrapper_ = MakeRefCounted<EndpointPickerWrapper>(
std::move(picker),
locality_->xds_policy()->client_stats_.FindLocalityStats(
locality_->name_));
@ -1762,6 +1814,8 @@ class XdsFactory : public LoadBalancingPolicyFactory {
InlinedVector<grpc_error*, 3> error_list;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy;
const char* eds_service_name = nullptr;
const char* lrs_load_reporting_server_name = nullptr;
for (const grpc_json* field = json->child; field != nullptr;
field = field->next) {
if (field->key == nullptr) continue;
@ -1789,11 +1843,35 @@ class XdsFactory : public LoadBalancingPolicyFactory {
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
error_list.push_back(parse_error);
}
} else if (strcmp(field->key, "edsServiceName") == 0) {
if (eds_service_name != nullptr) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:edsServiceName error:Duplicate entry"));
}
if (field->type != GRPC_JSON_STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:edsServiceName error:type should be string"));
continue;
}
eds_service_name = field->value;
} else if (strcmp(field->key, "lrsLoadReportingServerName") == 0) {
if (lrs_load_reporting_server_name != nullptr) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:lrsLoadReportingServerName error:Duplicate entry"));
}
if (field->type != GRPC_JSON_STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:lrsLoadReportingServerName error:type should be string"));
continue;
}
lrs_load_reporting_server_name = field->value;
}
}
if (error_list.empty()) {
return RefCountedPtr<LoadBalancingPolicy::Config>(New<ParsedXdsConfig>(
std::move(child_policy), std::move(fallback_policy)));
return MakeRefCounted<ParsedXdsConfig>(
std::move(child_policy), std::move(fallback_policy),
UniquePtr<char>(gpr_strdup(eds_service_name)),
UniquePtr<char>(gpr_strdup(lrs_load_reporting_server_name)));
} else {
*error = GRPC_ERROR_CREATE_FROM_VECTOR("Xds Parser", &error_list);
return nullptr;

@ -736,8 +736,11 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
}
}
// Start load reporting if needed.
LrsCallState* lrs_calld = ads_calld->chand()->lrs_calld_->calld();
if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
auto& lrs_call = ads_calld->chand()->lrs_calld_;
if (lrs_call != nullptr) {
LrsCallState* lrs_calld = lrs_call->calld();
if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
}
// Ignore identical update.
const EdsUpdate& prev_update = xds_client->cluster_state_.eds_update;
const bool priority_list_changed =
@ -1319,14 +1322,20 @@ void XdsClient::CancelEndpointDataWatch(StringView /*cluster*/,
}
}
void XdsClient::AddClientStats(StringView /*cluster*/,
void XdsClient::AddClientStats(StringView /*lrs_server*/,
StringView /*cluster*/,
XdsClientStats* client_stats) {
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
cluster_state_.client_stats.insert(client_stats);
chand_->MaybeStartLrsCall();
}
void XdsClient::RemoveClientStats(StringView /*cluster*/,
void XdsClient::RemoveClientStats(StringView /*lrs_server*/,
StringView /*cluster*/,
XdsClientStats* client_stats) {
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
// TODO(roth): In principle, we should try to send a final load report
// containing whatever final stats have been accumulated since the
// last load report.
@ -1365,7 +1374,9 @@ void XdsClient::NotifyOnServiceConfig(void* arg, grpc_error* error) {
static const char* json =
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"xds_experimental\":{} }\n"
" { \"xds_experimental\":{\n"
" \"lrsLoadReportingServerName\": \"\"\n"
" } }\n"
" ]\n"
"}";
RefCountedPtr<ServiceConfig> service_config =

@ -100,8 +100,10 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
EndpointWatcherInterface* watcher);
// Adds and removes client stats for cluster.
void AddClientStats(StringView cluster, XdsClientStats* client_stats);
void RemoveClientStats(StringView cluster, XdsClientStats* client_stats);
void AddClientStats(StringView lrs_server, StringView cluster,
XdsClientStats* client_stats);
void RemoveClientStats(StringView lrs_server, StringView cluster,
XdsClientStats* client_stats);
// Resets connection backoff state.
void ResetBackoff();
@ -208,8 +210,9 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
// The channel for communicating with the xds server.
OrphanablePtr<ChannelState> chand_;
// TODO(roth): When we need support for multiple clusters, replace
// cluster_state_ with a map keyed by cluster name.
// TODO(juanlishen): As part of adding CDS support, replace
// cluster_state_ with a map keyed by cluster name, so that we can
// support multiple clusters for both CDS and EDS.
ClusterState cluster_state_;
// Map<StringView /*cluster*/, ClusterState, StringLess> clusters_;

@ -585,7 +585,27 @@ class LrsServiceImpl : public LrsService {
bool load_report_ready_ = false;
};
class XdsEnd2endTest : public ::testing::TestWithParam<bool> {
class TestType {
public:
TestType(bool use_xds_resolver, bool enable_load_reporting)
: use_xds_resolver_(use_xds_resolver),
enable_load_reporting_(enable_load_reporting) {}
bool use_xds_resolver() const { return use_xds_resolver_; }
bool enable_load_reporting() const { return enable_load_reporting_; }
grpc::string AsString() const {
grpc::string retval = (use_xds_resolver_ ? "XdsResolver" : "FakeResolver");
if (enable_load_reporting_) retval += "WithLoadReporting";
return retval;
}
private:
const bool use_xds_resolver_;
const bool enable_load_reporting_;
};
class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
protected:
XdsEnd2endTest(size_t num_backends, size_t num_balancers,
int client_load_reporting_interval_seconds)
@ -665,12 +685,14 @@ class XdsEnd2endTest : public ::testing::TestWithParam<bool> {
// channel never uses a response generator, and we inject the xds
// channel's response generator here.
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
GetParam() ? lb_channel_response_generator_.get()
: response_generator_.get());
GetParam().use_xds_resolver()
? lb_channel_response_generator_.get()
: response_generator_.get());
if (!expected_targets.empty()) {
args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, expected_targets);
}
grpc::string scheme = GetParam() ? "xds-experimental" : "fake";
grpc::string scheme =
GetParam().use_xds_resolver() ? "xds-experimental" : "fake";
std::ostringstream uri;
uri << scheme << ":///" << kApplicationTargetName_;
// TODO(dgq): templatize tests to run everything using both secure and
@ -760,19 +782,20 @@ class XdsEnd2endTest : public ::testing::TestWithParam<bool> {
}
void SetNextResolution(const std::vector<int>& ports,
const char* service_config_json = nullptr,
grpc_core::FakeResolverResponseGenerator*
lb_channel_response_generator = nullptr) {
if (GetParam()) return; // Not used with xds resolver.
if (GetParam().use_xds_resolver()) return; // Not used with xds resolver.
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(ports);
if (service_config_json != nullptr) {
grpc_error* error = GRPC_ERROR_NONE;
result.service_config =
grpc_core::ServiceConfig::Create(service_config_json, &error);
GRPC_ERROR_UNREF(error);
}
grpc_error* error = GRPC_ERROR_NONE;
const char* service_config_json =
GetParam().enable_load_reporting()
? kDefaultServiceConfig_
: kDefaultServiceConfigWithoutLoadReporting_;
result.service_config =
grpc_core::ServiceConfig::Create(service_config_json, &error);
GRPC_ERROR_UNREF(error);
grpc_arg arg = grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
lb_channel_response_generator == nullptr
? lb_channel_response_generator_.get()
@ -988,11 +1011,21 @@ class XdsEnd2endTest : public ::testing::TestWithParam<bool> {
lb_channel_response_generator_;
const grpc::string kRequestMessage_ = "Live long and prosper.";
const grpc::string kApplicationTargetName_ = "application_target_name";
const grpc::string kDefaultServiceConfig_ =
const char* kDefaultServiceConfig_ =
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"does_not_exist\":{} },\n"
" { \"xds_experimental\":{} }\n"
" { \"xds_experimental\":{\n"
" \"lrsLoadReportingServerName\": \"\"\n"
" } }\n"
" ]\n"
"}";
const char* kDefaultServiceConfigWithoutLoadReporting_ =
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"does_not_exist\":{} },\n"
" { \"xds_experimental\":{\n"
" } }\n"
" ]\n"
"}";
};
@ -1005,7 +1038,7 @@ class BasicTest : public XdsEnd2endTest {
// Tests that the balancer sends the correct response to the client, and the
// client sends RPCs to the backends using the default child policy.
TEST_P(BasicTest, Vanilla) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcsPerAddress = 100;
AdsServiceImpl::ResponseArgs args({
@ -1033,7 +1066,7 @@ TEST_P(BasicTest, Vanilla) {
// Tests that subchannel sharing works when the same backend is listed multiple
// times.
TEST_P(BasicTest, SameBackendListedMultipleTimes) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Same backend listed twice.
std::vector<int> ports(2, backends_[0]->port());
@ -1056,7 +1089,7 @@ TEST_P(BasicTest, SameBackendListedMultipleTimes) {
// Tests that RPCs will be blocked until a non-empty serverlist is received.
TEST_P(BasicTest, InitiallyEmptyServerlist) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const int kCallDeadlineMs = kServerlistDelayMs * 2;
@ -1092,7 +1125,7 @@ TEST_P(BasicTest, InitiallyEmptyServerlist) {
// Tests that RPCs will fail with UNAVAILABLE instead of DEADLINE_EXCEEDED if
// all the servers are unreachable.
TEST_P(BasicTest, AllServersUnreachableFailFast) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumUnreachableServers = 5;
std::vector<int> ports;
@ -1114,7 +1147,7 @@ TEST_P(BasicTest, AllServersUnreachableFailFast) {
// Tests that RPCs fail when the backends are down, and will succeed again after
// the backends are restarted.
TEST_P(BasicTest, BackendsRestart) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts()},
@ -1136,7 +1169,7 @@ using SecureNamingTest = BasicTest;
TEST_P(SecureNamingTest, TargetNameIsExpected) {
// TODO(juanlishen): Use separate fake creds for the balancer channel.
ResetStub(0, 0, kApplicationTargetName_ + ";lb");
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannel({balancers_[0]->port()});
const size_t kNumRpcsPerAddress = 100;
AdsServiceImpl::ResponseArgs args({
@ -1168,7 +1201,7 @@ TEST_P(SecureNamingTest, TargetNameIsUnexpected) {
ASSERT_DEATH_IF_SUPPORTED(
{
ResetStub(0, 0, kApplicationTargetName_ + ";lb");
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannel({balancers_[0]->port()});
channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(1));
},
@ -1180,7 +1213,7 @@ using LocalityMapTest = BasicTest;
// Tests that the localities in a locality map are picked according to their
// weights.
TEST_P(LocalityMapTest, WeightedRoundRobin) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 5000;
const int kLocalityWeight0 = 2;
@ -1224,7 +1257,7 @@ TEST_P(LocalityMapTest, WeightedRoundRobin) {
// Tests that the locality map can work properly even when it contains a large
// number of localities.
TEST_P(LocalityMapTest, StressTest) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumLocalities = 100;
// The first ADS response contains kNumLocalities localities, each of which
@ -1259,7 +1292,7 @@ TEST_P(LocalityMapTest, StressTest) {
// Tests that the localities in a locality map are picked correctly after update
// (addition, modification, deletion).
TEST_P(LocalityMapTest, UpdateMap) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 1000;
// The locality weight for the first 3 localities.
@ -1356,7 +1389,7 @@ class FailoverTest : public BasicTest {
// Localities with the highest priority are used when multiple priority exist.
TEST_P(FailoverTest, ChooseHighestPriority) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1},
@ -1377,7 +1410,7 @@ TEST_P(FailoverTest, ChooseHighestPriority) {
// If the higher priority localities are not reachable, failover to the highest
// priority among the rest.
TEST_P(FailoverTest, Failover) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1},
@ -1401,7 +1434,7 @@ TEST_P(FailoverTest, Failover) {
// If a locality with higher priority than the current one becomes ready,
// switch to it.
TEST_P(FailoverTest, SwitchBackToHigherPriority) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 100;
AdsServiceImpl::ResponseArgs args({
@ -1430,7 +1463,7 @@ TEST_P(FailoverTest, SwitchBackToHigherPriority) {
// The first update only contains unavailable priorities. The second update
// contains available priorities.
TEST_P(FailoverTest, UpdateInitialUnavailable) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::ResponseArgs args({
{"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 0},
@ -1465,7 +1498,7 @@ TEST_P(FailoverTest, UpdateInitialUnavailable) {
// Tests that after the localities' priorities are updated, we still choose the
// highest READY priority with the updated localities.
TEST_P(FailoverTest, UpdatePriority) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 100;
AdsServiceImpl::ResponseArgs args({
@ -1498,7 +1531,7 @@ using DropTest = BasicTest;
// Tests that RPCs are dropped according to the drop config.
TEST_P(DropTest, Vanilla) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 5000;
const uint32_t kDropPerMillionForLb = 100000;
@ -1544,7 +1577,7 @@ TEST_P(DropTest, Vanilla) {
// Tests that drop config is converted correctly from per hundred.
TEST_P(DropTest, DropPerHundred) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 5000;
const uint32_t kDropPerHundredForLb = 10;
@ -1585,7 +1618,7 @@ TEST_P(DropTest, DropPerHundred) {
// Tests that drop config is converted correctly from per ten thousand.
TEST_P(DropTest, DropPerTenThousand) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 5000;
const uint32_t kDropPerTenThousandForLb = 1000;
@ -1626,7 +1659,7 @@ TEST_P(DropTest, DropPerTenThousand) {
// Tests that drop is working correctly after update.
TEST_P(DropTest, Update) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 1000;
const uint32_t kDropPerMillionForLb = 100000;
@ -1722,7 +1755,7 @@ TEST_P(DropTest, Update) {
// Tests that all the RPCs are dropped if any drop category drops 100%.
TEST_P(DropTest, DropAll) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 1000;
const uint32_t kDropPerMillionForLb = 100000;
@ -1755,8 +1788,7 @@ TEST_P(FallbackTest, Vanilla) {
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const size_t kNumBackendsInResolution = backends_.size() / 2;
ResetStub(kFallbackTimeoutMs);
SetNextResolution(GetBackendPorts(0, kNumBackendsInResolution),
kDefaultServiceConfig_.c_str());
SetNextResolution(GetBackendPorts(0, kNumBackendsInResolution));
SetNextResolutionForLbChannelAllBalancers();
// Send non-empty serverlist only after kServerlistDelayMs.
AdsServiceImpl::ResponseArgs args({
@ -1805,8 +1837,7 @@ TEST_P(FallbackTest, Update) {
const size_t kNumBackendsInResolution = backends_.size() / 3;
const size_t kNumBackendsInResolutionUpdate = backends_.size() / 3;
ResetStub(kFallbackTimeoutMs);
SetNextResolution(GetBackendPorts(0, kNumBackendsInResolution),
kDefaultServiceConfig_.c_str());
SetNextResolution(GetBackendPorts(0, kNumBackendsInResolution));
SetNextResolutionForLbChannelAllBalancers();
// Send non-empty serverlist only after kServerlistDelayMs.
AdsServiceImpl::ResponseArgs args({
@ -1829,10 +1860,9 @@ TEST_P(FallbackTest, Update) {
for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) {
EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
}
SetNextResolution(GetBackendPorts(kNumBackendsInResolution,
kNumBackendsInResolution +
kNumBackendsInResolutionUpdate),
kDefaultServiceConfig_.c_str());
SetNextResolution(GetBackendPorts(
kNumBackendsInResolution,
kNumBackendsInResolution + kNumBackendsInResolutionUpdate));
// Wait until the resolution update has been processed and all the new
// fallback backends are reachable.
WaitForAllBackends(kNumBackendsInResolution /* start_index */,
@ -1882,7 +1912,7 @@ TEST_P(FallbackTest, FallbackEarlyWhenBalancerChannelFails) {
const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor();
ResetStub(kFallbackTimeoutMs);
// Return an unreachable balancer and one fallback backend.
SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str());
SetNextResolution({backends_[0]->port()});
SetNextResolutionForLbChannel({g_port_saver->GetPort()});
// Send RPC with deadline less than the fallback timeout and make sure it
// succeeds.
@ -1895,7 +1925,7 @@ TEST_P(FallbackTest, FallbackEarlyWhenBalancerCallFails) {
const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor();
ResetStub(kFallbackTimeoutMs);
// Return one balancer and one fallback backend.
SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str());
SetNextResolution({backends_[0]->port()});
SetNextResolutionForLbChannelAllBalancers();
// Balancer drops call without sending a serverlist.
balancers_[0]->ads_service()->NotifyDoneWithAdsCall();
@ -1910,7 +1940,7 @@ TEST_P(FallbackTest, FallbackEarlyWhenBalancerCallFails) {
TEST_P(FallbackTest, FallbackIfResponseReceivedButChildNotReady) {
const int kFallbackTimeoutMs = 500 * grpc_test_slowdown_factor();
ResetStub(kFallbackTimeoutMs);
SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str());
SetNextResolution({backends_[0]->port()});
SetNextResolutionForLbChannelAllBalancers();
// Send a serverlist that only contains an unreachable backend before fallback
// timeout.
@ -1927,7 +1957,7 @@ TEST_P(FallbackTest, FallbackIfResponseReceivedButChildNotReady) {
// all the calls.
TEST_P(FallbackTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) {
// Return an unreachable balancer and one fallback backend.
SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str());
SetNextResolution({backends_[0]->port()});
SetNextResolutionForLbChannel({g_port_saver->GetPort()});
// Enter fallback mode because the LB channel fails to connect.
WaitForBackend(0);
@ -1951,7 +1981,7 @@ TEST_P(FallbackTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) {
// Tests that fallback mode is exited if the child policy becomes ready.
TEST_P(FallbackTest, FallbackModeIsExitedAfterChildRready) {
// Return an unreachable balancer and one fallback backend.
SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str());
SetNextResolution({backends_[0]->port()});
SetNextResolutionForLbChannel({g_port_saver->GetPort()});
// Enter fallback mode because the LB channel fails to connect.
WaitForBackend(0);
@ -1989,7 +2019,7 @@ class BalancerUpdateTest : public XdsEnd2endTest {
// Tests that the old LB call is still used after the balancer address update as
// long as that call is still alive.
TEST_P(BalancerUpdateTest, UpdateBalancersButKeepUsingOriginalBalancer) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::ResponseArgs args({
{"locality0", {backends_[0]->port()}},
@ -2042,7 +2072,7 @@ TEST_P(BalancerUpdateTest, UpdateBalancersButKeepUsingOriginalBalancer) {
// xds keeps the initial connection (which by definition is also present in the
// update).
TEST_P(BalancerUpdateTest, Repeated) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
AdsServiceImpl::ResponseArgs args({
{"locality0", {backends_[0]->port()}},
@ -2107,7 +2137,7 @@ TEST_P(BalancerUpdateTest, Repeated) {
// backends according to the last balancer response, until a new balancer is
// reachable.
TEST_P(BalancerUpdateTest, DeadUpdate) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannel({balancers_[0]->port()});
AdsServiceImpl::ResponseArgs args({
{"locality0", {backends_[0]->port()}},
@ -2186,7 +2216,7 @@ class ClientLoadReportingTest : public XdsEnd2endTest {
// Tests that the load report received at the balancer is correct.
TEST_P(ClientLoadReportingTest, Vanilla) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannel({balancers_[0]->port()});
const size_t kNumRpcsPerAddress = 100;
// TODO(juanlishen): Partition the backends after multiple localities is
@ -2227,7 +2257,7 @@ TEST_P(ClientLoadReportingTest, Vanilla) {
// Tests that if the balancer restarts, the client load report contains the
// stats before and after the restart correctly.
TEST_P(ClientLoadReportingTest, BalancerRestart) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannel({balancers_[0]->port()});
const size_t kNumBackendsFirstPass = backends_.size() / 2;
const size_t kNumBackendsSecondPass =
@ -2293,7 +2323,7 @@ class ClientLoadReportingWithDropTest : public XdsEnd2endTest {
// Tests that the drop stats are correctly reported by client load reporting.
TEST_P(ClientLoadReportingWithDropTest, Vanilla) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 3000;
const uint32_t kDropPerMillionForLb = 100000;
@ -2355,28 +2385,66 @@ TEST_P(ClientLoadReportingWithDropTest, Vanilla) {
EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count());
}
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, BasicTest, ::testing::Bool());
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, SecureNamingTest, ::testing::Bool());
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, LocalityMapTest, ::testing::Bool());
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, FailoverTest, ::testing::Bool());
grpc::string TestTypeName(const ::testing::TestParamInfo<TestType>& info) {
return info.param.AsString();
}
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, DropTest, ::testing::Bool());
// TODO(juanlishen): Load reporting disabled is currently tested only with DNS
// resolver. Once we implement CDS, test it via the xds resolver too.
INSTANTIATE_TEST_SUITE_P(XdsTest, BasicTest,
::testing::Values(TestType(false, true),
TestType(false, false),
TestType(true, true)),
&TestTypeName);
INSTANTIATE_TEST_SUITE_P(XdsTest, SecureNamingTest,
::testing::Values(TestType(false, true),
TestType(false, false),
TestType(true, true)),
&TestTypeName);
INSTANTIATE_TEST_SUITE_P(XdsTest, LocalityMapTest,
::testing::Values(TestType(false, true),
TestType(false, false),
TestType(true, true)),
&TestTypeName);
INSTANTIATE_TEST_SUITE_P(XdsTest, FailoverTest,
::testing::Values(TestType(false, true),
TestType(false, false),
TestType(true, true)),
&TestTypeName);
INSTANTIATE_TEST_SUITE_P(XdsTest, DropTest,
::testing::Values(TestType(false, true),
TestType(false, false),
TestType(true, true)),
&TestTypeName);
// Fallback does not work with xds resolver.
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, FallbackTest,
::testing::Values(false));
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, BalancerUpdateTest,
::testing::Bool());
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, ClientLoadReportingTest,
::testing::Bool());
INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, ClientLoadReportingWithDropTest,
::testing::Bool());
INSTANTIATE_TEST_SUITE_P(XdsTest, FallbackTest,
::testing::Values(TestType(false, true),
TestType(false, false)),
&TestTypeName);
INSTANTIATE_TEST_SUITE_P(XdsTest, BalancerUpdateTest,
::testing::Values(TestType(false, true),
TestType(false, false),
TestType(true, true)),
&TestTypeName);
// Load reporting tests are not run with load reporting disabled.
INSTANTIATE_TEST_SUITE_P(XdsTest, ClientLoadReportingTest,
::testing::Values(TestType(false, true),
TestType(true, true)),
&TestTypeName);
// Load reporting tests are not run with load reporting disabled.
INSTANTIATE_TEST_SUITE_P(XdsTest, ClientLoadReportingWithDropTest,
::testing::Values(TestType(false, true),
TestType(true, true)),
&TestTypeName);
} // namespace
} // namespace testing

Loading…
Cancel
Save