diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 362dfd9f1a1..f2378b36f02 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -77,9 +77,14 @@ constexpr char kXds[] = "xds_experimental"; class ParsedXdsConfig : public LoadBalancingPolicy::Config { public: ParsedXdsConfig(RefCountedPtr child_policy, - RefCountedPtr fallback_policy) + RefCountedPtr fallback_policy, + UniquePtr eds_service_name, + UniquePtr lrs_load_reporting_server_name) : child_policy_(std::move(child_policy)), - fallback_policy_(std::move(fallback_policy)) {} + fallback_policy_(std::move(fallback_policy)), + eds_service_name_(std::move(eds_service_name)), + lrs_load_reporting_server_name_( + std::move(lrs_load_reporting_server_name)) {} const char* name() const override { return kXds; } @@ -91,9 +96,17 @@ class ParsedXdsConfig : public LoadBalancingPolicy::Config { return fallback_policy_; } + const char* eds_service_name() const { return eds_service_name_.get(); }; + + const char* lrs_load_reporting_server_name() const { + return lrs_load_reporting_server_name_.get(); + }; + private: RefCountedPtr child_policy_; RefCountedPtr fallback_policy_; + UniquePtr eds_service_name_; + UniquePtr lrs_load_reporting_server_name_; }; class XdsLb : public LoadBalancingPolicy { @@ -111,16 +124,17 @@ class XdsLb : public LoadBalancingPolicy { // We need this wrapper for the following reasons: // 1. To process per-locality load reporting. // 2. Since pickers are UniquePtrs we use this RefCounted wrapper to control - // references to it by the xds picker and the locality. - class PickerWrapper : public RefCounted { + // references to it by the xds picker and the locality. + class EndpointPickerWrapper : public RefCounted { public: - PickerWrapper(UniquePtr picker, - RefCountedPtr locality_stats) + EndpointPickerWrapper( + UniquePtr picker, + RefCountedPtr locality_stats) : picker_(std::move(picker)), locality_stats_(std::move(locality_stats)) { locality_stats_->RefByPicker(); } - ~PickerWrapper() { locality_stats_->UnrefByPicker(); } + ~EndpointPickerWrapper() { locality_stats_->UnrefByPicker(); } PickResult Pick(PickArgs args); @@ -131,15 +145,16 @@ class XdsLb : public LoadBalancingPolicy { // The picker will use a stateless weighting algorithm to pick the locality to // use for each request. - class Picker : public SubchannelPicker { + class LocalityPicker : public SubchannelPicker { public: // Maintains a weighted list of pickers from each locality that is in ready // state. The first element in the pair represents the end of a range // proportional to the locality's weight. The start of the range is the // previous value in the vector and is 0 for the first element. using PickerList = - InlinedVector>, 1>; - Picker(RefCountedPtr xds_policy, PickerList pickers) + InlinedVector>, + 1>; + LocalityPicker(RefCountedPtr xds_policy, PickerList pickers) : xds_policy_(std::move(xds_policy)), pickers_(std::move(pickers)), drop_config_(xds_policy_->drop_config_) {} @@ -204,7 +219,7 @@ class XdsLb : public LoadBalancingPolicy { return connectivity_state_; } uint32_t weight() const { return weight_; } - RefCountedPtr picker_wrapper() const { + RefCountedPtr picker_wrapper() const { return picker_wrapper_; } @@ -256,7 +271,7 @@ class XdsLb : public LoadBalancingPolicy { RefCountedPtr name_; OrphanablePtr child_policy_; OrphanablePtr pending_child_policy_; - RefCountedPtr picker_wrapper_; + RefCountedPtr picker_wrapper_; grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE; uint32_t weight_; @@ -377,16 +392,24 @@ class XdsLb : public LoadBalancingPolicy { const char* name, const grpc_channel_args* args); void MaybeExitFallbackMode(); + const char* eds_service_name() const { + if (config_ != nullptr && config_->eds_service_name() != nullptr) { + return config_->eds_service_name(); + } + return server_name_.get(); + } + XdsClient* xds_client() const { return xds_client_from_channel_ != nullptr ? xds_client_from_channel_.get() : xds_client_.get(); } - // Name of the backend server to connect to. - const char* server_name_ = nullptr; + // Server name from target URI. + UniquePtr server_name_; - // Current channel args from the resolver. + // Current channel args and config from the resolver. const grpc_channel_args* args_ = nullptr; + RefCountedPtr config_; // Internal state. bool shutting_down_ = false; @@ -418,14 +441,10 @@ class XdsLb : public LoadBalancingPolicy { grpc_timer lb_fallback_timer_; grpc_closure lb_on_fallback_; - // The policy to use for the fallback backends. - RefCountedPtr fallback_policy_config_; // Non-null iff we are in fallback mode. OrphanablePtr fallback_policy_; OrphanablePtr pending_fallback_policy_; - // The policy to use for the backends. - RefCountedPtr child_policy_config_; const grpc_millis locality_retention_interval_ms_; const grpc_millis locality_map_failover_timeout_ms_; // A list of locality maps indexed by priority. @@ -441,10 +460,10 @@ class XdsLb : public LoadBalancingPolicy { }; // -// XdsLb::PickerWrapper::Pick +// XdsLb::EndpointPickerWrapper // -LoadBalancingPolicy::PickResult XdsLb::PickerWrapper::Pick( +LoadBalancingPolicy::PickResult XdsLb::EndpointPickerWrapper::Pick( LoadBalancingPolicy::PickArgs args) { // Forward the pick to the picker returned from the child policy. PickResult result = picker_->Pick(args); @@ -470,10 +489,10 @@ LoadBalancingPolicy::PickResult XdsLb::PickerWrapper::Pick( } // -// XdsLb::Picker +// XdsLb::LocalityPicker // -XdsLb::PickResult XdsLb::Picker::Pick(PickArgs args) { +XdsLb::PickResult XdsLb::LocalityPicker::Pick(PickArgs args) { // Handle drop. const UniquePtr* drop_category; if (drop_config_->ShouldDrop(&drop_category)) { @@ -489,8 +508,8 @@ XdsLb::PickResult XdsLb::Picker::Pick(PickArgs args) { return PickFromLocality(key, args); } -XdsLb::PickResult XdsLb::Picker::PickFromLocality(const uint32_t key, - PickArgs args) { +XdsLb::PickResult XdsLb::LocalityPicker::PickFromLocality(const uint32_t key, + PickArgs args) { size_t mid = 0; size_t start_index = 0; size_t end_index = pickers_.size() - 1; @@ -686,11 +705,11 @@ XdsLb::XdsLb(Args args) GPR_ASSERT(server_uri != nullptr); grpc_uri* uri = grpc_uri_parse(server_uri, true); GPR_ASSERT(uri->path[0] != '\0'); - server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); + server_name_.reset( + gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path)); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { - gpr_log(GPR_INFO, - "[xdslb %p] Will use '%s' as the server name for LB request.", this, - server_name_); + gpr_log(GPR_INFO, "[xdslb %p] server name from channel: %s", this, + server_name_.get()); } grpc_uri_destroy(uri); } @@ -699,7 +718,6 @@ XdsLb::~XdsLb() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, "[xdslb %p] destroying xds LB policy", this); } - gpr_free((void*)server_name_); grpc_channel_args_destroy(args_); } @@ -722,9 +740,13 @@ void XdsLb::ShutdownLocked() { pending_fallback_policy_.reset(); // Cancel the endpoint watch here instead of in our dtor, because the // watcher holds a ref to us. - xds_client()->CancelEndpointDataWatch(StringView(server_name_), + xds_client()->CancelEndpointDataWatch(StringView(eds_service_name()), endpoint_watcher_); - xds_client()->RemoveClientStats(StringView(server_name_), &client_stats_); + if (config_->lrs_load_reporting_server_name() != nullptr) { + xds_client()->RemoveClientStats( + StringView(config_->lrs_load_reporting_server_name()), + StringView(eds_service_name()), &client_stats_); + } xds_client_from_channel_.reset(); xds_client_.reset(); } @@ -753,9 +775,9 @@ void XdsLb::UpdateLocked(UpdateArgs args) { } const bool is_initial_update = args_ == nullptr; // Update config. - auto* xds_config = static_cast(args.config.get()); - child_policy_config_ = xds_config->child_policy(); - fallback_policy_config_ = xds_config->fallback_policy(); + const char* old_eds_service_name = eds_service_name(); + auto old_config = std::move(config_); + config_ = std::move(args.config); // Update fallback address list. fallback_backend_addresses_ = std::move(args.addresses); // Update args. @@ -772,7 +794,7 @@ void XdsLb::UpdateLocked(UpdateArgs args) { if (xds_client_from_channel_ == nullptr) { grpc_error* error = GRPC_ERROR_NONE; xds_client_ = MakeOrphanable( - combiner(), interested_parties(), StringView(server_name_), + combiner(), interested_parties(), StringView(eds_service_name()), nullptr /* service config watcher */, *args_, &error); // TODO(roth): If we decide that we care about fallback mode, add // proper error handling here. @@ -782,11 +804,6 @@ void XdsLb::UpdateLocked(UpdateArgs args) { xds_client_.get()); } } - auto watcher = MakeUnique(Ref()); - endpoint_watcher_ = watcher.get(); - xds_client()->WatchEndpointData(StringView(server_name_), - std::move(watcher)); - xds_client()->AddClientStats(StringView(server_name_), &client_stats_); // Start fallback-at-startup checks. grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure @@ -795,6 +812,42 @@ void XdsLb::UpdateLocked(UpdateArgs args) { fallback_at_startup_checks_pending_ = true; grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); } + // Update endpoint watcher if needed. + if (is_initial_update || + strcmp(old_eds_service_name, eds_service_name()) != 0) { + if (!is_initial_update) { + xds_client()->CancelEndpointDataWatch(StringView(old_eds_service_name), + endpoint_watcher_); + } + auto watcher = MakeUnique(Ref()); + endpoint_watcher_ = watcher.get(); + xds_client()->WatchEndpointData(StringView(eds_service_name()), + std::move(watcher)); + } + // Update load reporting if needed. + // TODO(roth): Ideally, we should not collect any stats if load reporting + // is disabled, which would require changing this code to recreate + // all of the pickers whenever load reporting is enabled or disabled + // here. + if (is_initial_update || + (config_->lrs_load_reporting_server_name() == nullptr) != + (old_config->lrs_load_reporting_server_name() == nullptr) || + (config_->lrs_load_reporting_server_name() != nullptr && + old_config->lrs_load_reporting_server_name() != nullptr && + strcmp(config_->lrs_load_reporting_server_name(), + old_config->lrs_load_reporting_server_name()) != 0)) { + if (old_config != nullptr && + old_config->lrs_load_reporting_server_name() != nullptr) { + xds_client()->RemoveClientStats( + StringView(old_config->lrs_load_reporting_server_name()), + StringView(old_eds_service_name), &client_stats_); + } + if (config_->lrs_load_reporting_server_name() != nullptr) { + xds_client()->AddClientStats( + StringView(config_->lrs_load_reporting_server_name()), + StringView(eds_service_name()), &client_stats_); + } + } } // @@ -839,9 +892,7 @@ void XdsLb::UpdateFallbackPolicyLocked() { // Construct update args. UpdateArgs update_args; update_args.addresses = fallback_backend_addresses_; - update_args.config = fallback_policy_config_ == nullptr - ? nullptr - : fallback_policy_config_->Ref(); + update_args.config = config_->fallback_policy(); update_args.args = grpc_channel_args_copy(args_); // If the child policy name changes, we need to create a new child // policy. When this happens, we leave child_policy_ as-is and store @@ -892,9 +943,9 @@ void XdsLb::UpdateFallbackPolicyLocked() { // that was there before, which will be immediately shut down) // and will later be swapped into child_policy_ by the helper // when the new child transitions into state READY. - const char* fallback_policy_name = fallback_policy_config_ == nullptr + const char* fallback_policy_name = update_args.config == nullptr ? "round_robin" - : fallback_policy_config_->name(); + : update_args.config->name(); const bool create_policy = // case 1 fallback_policy_ == nullptr || @@ -1175,7 +1226,7 @@ void XdsLb::PriorityList::LocalityMap::UpdateXdsPickerLocked() { // that are ready. Each locality is represented by a portion of the range // proportional to its weight, such that the total range is the sum of the // weights of all localities. - Picker::PickerList picker_list; + LocalityPicker::PickerList picker_list; uint32_t end = 0; for (const auto& p : localities_) { const auto& locality_name = p.first; @@ -1187,9 +1238,9 @@ void XdsLb::PriorityList::LocalityMap::UpdateXdsPickerLocked() { picker_list.push_back(std::make_pair(end, locality->picker_wrapper())); } xds_policy()->channel_control_helper()->UpdateState( - GRPC_CHANNEL_READY, - MakeUnique(xds_policy_->Ref(DEBUG_LOCATION, "XdsLb+Picker"), - std::move(picker_list))); + GRPC_CHANNEL_READY, MakeUnique( + xds_policy_->Ref(DEBUG_LOCATION, "XdsLb+Picker"), + std::move(picker_list))); } OrphanablePtr @@ -1490,9 +1541,7 @@ void XdsLb::PriorityList::LocalityMap::Locality::UpdateLocked( // Construct update args. UpdateArgs update_args; update_args.addresses = std::move(serverlist); - update_args.config = xds_policy()->child_policy_config_ == nullptr - ? nullptr - : xds_policy()->child_policy_config_->Ref(); + update_args.config = xds_policy()->config_->child_policy(); update_args.args = CreateChildPolicyArgsLocked(xds_policy()->args_); // If the child policy name changes, we need to create a new child // policy. When this happens, we leave child_policy_ as-is and store @@ -1545,10 +1594,9 @@ void XdsLb::PriorityList::LocalityMap::Locality::UpdateLocked( // when the new child transitions into state READY. // TODO(juanlishen): If the child policy is not configured via service config, // use whatever algorithm is specified by the balancer. - const char* child_policy_name = - xds_policy()->child_policy_config_ == nullptr - ? "round_robin" - : xds_policy()->child_policy_config_->name(); + const char* child_policy_name = update_args.config == nullptr + ? "round_robin" + : update_args.config->name(); const bool create_policy = // case 1 child_policy_ == nullptr || @@ -1715,7 +1763,11 @@ void XdsLb::PriorityList::LocalityMap::Locality::Helper::UpdateState( return; } // Cache the picker and its state in the locality. - locality_->picker_wrapper_ = MakeRefCounted( + // TODO(roth): If load reporting is not configured, we should ideally + // pass a null LocalityStats ref to the EndpointPickerWrapper and have it + // not collect any stats, since they're not going to be used. This would + // require recreating all of the pickers whenever we get a config update. + locality_->picker_wrapper_ = MakeRefCounted( std::move(picker), locality_->xds_policy()->client_stats_.FindLocalityStats( locality_->name_)); @@ -1762,6 +1814,8 @@ class XdsFactory : public LoadBalancingPolicyFactory { InlinedVector error_list; RefCountedPtr child_policy; RefCountedPtr fallback_policy; + const char* eds_service_name = nullptr; + const char* lrs_load_reporting_server_name = nullptr; for (const grpc_json* field = json->child; field != nullptr; field = field->next) { if (field->key == nullptr) continue; @@ -1789,11 +1843,35 @@ class XdsFactory : public LoadBalancingPolicyFactory { GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); error_list.push_back(parse_error); } + } else if (strcmp(field->key, "edsServiceName") == 0) { + if (eds_service_name != nullptr) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:edsServiceName error:Duplicate entry")); + } + if (field->type != GRPC_JSON_STRING) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:edsServiceName error:type should be string")); + continue; + } + eds_service_name = field->value; + } else if (strcmp(field->key, "lrsLoadReportingServerName") == 0) { + if (lrs_load_reporting_server_name != nullptr) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:lrsLoadReportingServerName error:Duplicate entry")); + } + if (field->type != GRPC_JSON_STRING) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:lrsLoadReportingServerName error:type should be string")); + continue; + } + lrs_load_reporting_server_name = field->value; } } if (error_list.empty()) { - return RefCountedPtr(New( - std::move(child_policy), std::move(fallback_policy))); + return MakeRefCounted( + std::move(child_policy), std::move(fallback_policy), + UniquePtr(gpr_strdup(eds_service_name)), + UniquePtr(gpr_strdup(lrs_load_reporting_server_name))); } else { *error = GRPC_ERROR_CREATE_FROM_VECTOR("Xds Parser", &error_list); return nullptr; diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index bf04d21ed1d..a2e7f7d5922 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -736,8 +736,11 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( } } // Start load reporting if needed. - LrsCallState* lrs_calld = ads_calld->chand()->lrs_calld_->calld(); - if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked(); + auto& lrs_call = ads_calld->chand()->lrs_calld_; + if (lrs_call != nullptr) { + LrsCallState* lrs_calld = lrs_call->calld(); + if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked(); + } // Ignore identical update. const EdsUpdate& prev_update = xds_client->cluster_state_.eds_update; const bool priority_list_changed = @@ -1319,14 +1322,20 @@ void XdsClient::CancelEndpointDataWatch(StringView /*cluster*/, } } -void XdsClient::AddClientStats(StringView /*cluster*/, +void XdsClient::AddClientStats(StringView /*lrs_server*/, + StringView /*cluster*/, XdsClientStats* client_stats) { + // TODO(roth): When we add support for direct federation, use the + // server name specified in lrs_server. cluster_state_.client_stats.insert(client_stats); chand_->MaybeStartLrsCall(); } -void XdsClient::RemoveClientStats(StringView /*cluster*/, +void XdsClient::RemoveClientStats(StringView /*lrs_server*/, + StringView /*cluster*/, XdsClientStats* client_stats) { + // TODO(roth): When we add support for direct federation, use the + // server name specified in lrs_server. // TODO(roth): In principle, we should try to send a final load report // containing whatever final stats have been accumulated since the // last load report. @@ -1365,7 +1374,9 @@ void XdsClient::NotifyOnServiceConfig(void* arg, grpc_error* error) { static const char* json = "{\n" " \"loadBalancingConfig\":[\n" - " { \"xds_experimental\":{} }\n" + " { \"xds_experimental\":{\n" + " \"lrsLoadReportingServerName\": \"\"\n" + " } }\n" " ]\n" "}"; RefCountedPtr service_config = diff --git a/src/core/ext/filters/client_channel/xds/xds_client.h b/src/core/ext/filters/client_channel/xds/xds_client.h index d2d9dc7558a..c5fc08cf0c2 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.h +++ b/src/core/ext/filters/client_channel/xds/xds_client.h @@ -100,8 +100,10 @@ class XdsClient : public InternallyRefCounted { EndpointWatcherInterface* watcher); // Adds and removes client stats for cluster. - void AddClientStats(StringView cluster, XdsClientStats* client_stats); - void RemoveClientStats(StringView cluster, XdsClientStats* client_stats); + void AddClientStats(StringView lrs_server, StringView cluster, + XdsClientStats* client_stats); + void RemoveClientStats(StringView lrs_server, StringView cluster, + XdsClientStats* client_stats); // Resets connection backoff state. void ResetBackoff(); @@ -208,8 +210,9 @@ class XdsClient : public InternallyRefCounted { // The channel for communicating with the xds server. OrphanablePtr chand_; - // TODO(roth): When we need support for multiple clusters, replace - // cluster_state_ with a map keyed by cluster name. + // TODO(juanlishen): As part of adding CDS support, replace + // cluster_state_ with a map keyed by cluster name, so that we can + // support multiple clusters for both CDS and EDS. ClusterState cluster_state_; // Map clusters_; diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 9576d3090e4..3a8fe675ead 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -585,7 +585,27 @@ class LrsServiceImpl : public LrsService { bool load_report_ready_ = false; }; -class XdsEnd2endTest : public ::testing::TestWithParam { +class TestType { + public: + TestType(bool use_xds_resolver, bool enable_load_reporting) + : use_xds_resolver_(use_xds_resolver), + enable_load_reporting_(enable_load_reporting) {} + + bool use_xds_resolver() const { return use_xds_resolver_; } + bool enable_load_reporting() const { return enable_load_reporting_; } + + grpc::string AsString() const { + grpc::string retval = (use_xds_resolver_ ? "XdsResolver" : "FakeResolver"); + if (enable_load_reporting_) retval += "WithLoadReporting"; + return retval; + } + + private: + const bool use_xds_resolver_; + const bool enable_load_reporting_; +}; + +class XdsEnd2endTest : public ::testing::TestWithParam { protected: XdsEnd2endTest(size_t num_backends, size_t num_balancers, int client_load_reporting_interval_seconds) @@ -665,12 +685,14 @@ class XdsEnd2endTest : public ::testing::TestWithParam { // channel never uses a response generator, and we inject the xds // channel's response generator here. args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, - GetParam() ? lb_channel_response_generator_.get() - : response_generator_.get()); + GetParam().use_xds_resolver() + ? lb_channel_response_generator_.get() + : response_generator_.get()); if (!expected_targets.empty()) { args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, expected_targets); } - grpc::string scheme = GetParam() ? "xds-experimental" : "fake"; + grpc::string scheme = + GetParam().use_xds_resolver() ? "xds-experimental" : "fake"; std::ostringstream uri; uri << scheme << ":///" << kApplicationTargetName_; // TODO(dgq): templatize tests to run everything using both secure and @@ -760,19 +782,20 @@ class XdsEnd2endTest : public ::testing::TestWithParam { } void SetNextResolution(const std::vector& ports, - const char* service_config_json = nullptr, grpc_core::FakeResolverResponseGenerator* lb_channel_response_generator = nullptr) { - if (GetParam()) return; // Not used with xds resolver. + if (GetParam().use_xds_resolver()) return; // Not used with xds resolver. grpc_core::ExecCtx exec_ctx; grpc_core::Resolver::Result result; result.addresses = CreateAddressListFromPortList(ports); - if (service_config_json != nullptr) { - grpc_error* error = GRPC_ERROR_NONE; - result.service_config = - grpc_core::ServiceConfig::Create(service_config_json, &error); - GRPC_ERROR_UNREF(error); - } + grpc_error* error = GRPC_ERROR_NONE; + const char* service_config_json = + GetParam().enable_load_reporting() + ? kDefaultServiceConfig_ + : kDefaultServiceConfigWithoutLoadReporting_; + result.service_config = + grpc_core::ServiceConfig::Create(service_config_json, &error); + GRPC_ERROR_UNREF(error); grpc_arg arg = grpc_core::FakeResolverResponseGenerator::MakeChannelArg( lb_channel_response_generator == nullptr ? lb_channel_response_generator_.get() @@ -988,11 +1011,21 @@ class XdsEnd2endTest : public ::testing::TestWithParam { lb_channel_response_generator_; const grpc::string kRequestMessage_ = "Live long and prosper."; const grpc::string kApplicationTargetName_ = "application_target_name"; - const grpc::string kDefaultServiceConfig_ = + const char* kDefaultServiceConfig_ = "{\n" " \"loadBalancingConfig\":[\n" " { \"does_not_exist\":{} },\n" - " { \"xds_experimental\":{} }\n" + " { \"xds_experimental\":{\n" + " \"lrsLoadReportingServerName\": \"\"\n" + " } }\n" + " ]\n" + "}"; + const char* kDefaultServiceConfigWithoutLoadReporting_ = + "{\n" + " \"loadBalancingConfig\":[\n" + " { \"does_not_exist\":{} },\n" + " { \"xds_experimental\":{\n" + " } }\n" " ]\n" "}"; }; @@ -1005,7 +1038,7 @@ class BasicTest : public XdsEnd2endTest { // Tests that the balancer sends the correct response to the client, and the // client sends RPCs to the backends using the default child policy. TEST_P(BasicTest, Vanilla) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcsPerAddress = 100; AdsServiceImpl::ResponseArgs args({ @@ -1033,7 +1066,7 @@ TEST_P(BasicTest, Vanilla) { // Tests that subchannel sharing works when the same backend is listed multiple // times. TEST_P(BasicTest, SameBackendListedMultipleTimes) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); // Same backend listed twice. std::vector ports(2, backends_[0]->port()); @@ -1056,7 +1089,7 @@ TEST_P(BasicTest, SameBackendListedMultipleTimes) { // Tests that RPCs will be blocked until a non-empty serverlist is received. TEST_P(BasicTest, InitiallyEmptyServerlist) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); const int kCallDeadlineMs = kServerlistDelayMs * 2; @@ -1092,7 +1125,7 @@ TEST_P(BasicTest, InitiallyEmptyServerlist) { // Tests that RPCs will fail with UNAVAILABLE instead of DEADLINE_EXCEEDED if // all the servers are unreachable. TEST_P(BasicTest, AllServersUnreachableFailFast) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumUnreachableServers = 5; std::vector ports; @@ -1114,7 +1147,7 @@ TEST_P(BasicTest, AllServersUnreachableFailFast) { // Tests that RPCs fail when the backends are down, and will succeed again after // the backends are restarted. TEST_P(BasicTest, BackendsRestart) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ {"locality0", GetBackendPorts()}, @@ -1136,7 +1169,7 @@ using SecureNamingTest = BasicTest; TEST_P(SecureNamingTest, TargetNameIsExpected) { // TODO(juanlishen): Use separate fake creds for the balancer channel. ResetStub(0, 0, kApplicationTargetName_ + ";lb"); - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannel({balancers_[0]->port()}); const size_t kNumRpcsPerAddress = 100; AdsServiceImpl::ResponseArgs args({ @@ -1168,7 +1201,7 @@ TEST_P(SecureNamingTest, TargetNameIsUnexpected) { ASSERT_DEATH_IF_SUPPORTED( { ResetStub(0, 0, kApplicationTargetName_ + ";lb"); - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannel({balancers_[0]->port()}); channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(1)); }, @@ -1180,7 +1213,7 @@ using LocalityMapTest = BasicTest; // Tests that the localities in a locality map are picked according to their // weights. TEST_P(LocalityMapTest, WeightedRoundRobin) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 5000; const int kLocalityWeight0 = 2; @@ -1224,7 +1257,7 @@ TEST_P(LocalityMapTest, WeightedRoundRobin) { // Tests that the locality map can work properly even when it contains a large // number of localities. TEST_P(LocalityMapTest, StressTest) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumLocalities = 100; // The first ADS response contains kNumLocalities localities, each of which @@ -1259,7 +1292,7 @@ TEST_P(LocalityMapTest, StressTest) { // Tests that the localities in a locality map are picked correctly after update // (addition, modification, deletion). TEST_P(LocalityMapTest, UpdateMap) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 1000; // The locality weight for the first 3 localities. @@ -1356,7 +1389,7 @@ class FailoverTest : public BasicTest { // Localities with the highest priority are used when multiple priority exist. TEST_P(FailoverTest, ChooseHighestPriority) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1}, @@ -1377,7 +1410,7 @@ TEST_P(FailoverTest, ChooseHighestPriority) { // If the higher priority localities are not reachable, failover to the highest // priority among the rest. TEST_P(FailoverTest, Failover) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1}, @@ -1401,7 +1434,7 @@ TEST_P(FailoverTest, Failover) { // If a locality with higher priority than the current one becomes ready, // switch to it. TEST_P(FailoverTest, SwitchBackToHigherPriority) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 100; AdsServiceImpl::ResponseArgs args({ @@ -1430,7 +1463,7 @@ TEST_P(FailoverTest, SwitchBackToHigherPriority) { // The first update only contains unavailable priorities. The second update // contains available priorities. TEST_P(FailoverTest, UpdateInitialUnavailable) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 0}, @@ -1465,7 +1498,7 @@ TEST_P(FailoverTest, UpdateInitialUnavailable) { // Tests that after the localities' priorities are updated, we still choose the // highest READY priority with the updated localities. TEST_P(FailoverTest, UpdatePriority) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 100; AdsServiceImpl::ResponseArgs args({ @@ -1498,7 +1531,7 @@ using DropTest = BasicTest; // Tests that RPCs are dropped according to the drop config. TEST_P(DropTest, Vanilla) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 5000; const uint32_t kDropPerMillionForLb = 100000; @@ -1544,7 +1577,7 @@ TEST_P(DropTest, Vanilla) { // Tests that drop config is converted correctly from per hundred. TEST_P(DropTest, DropPerHundred) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 5000; const uint32_t kDropPerHundredForLb = 10; @@ -1585,7 +1618,7 @@ TEST_P(DropTest, DropPerHundred) { // Tests that drop config is converted correctly from per ten thousand. TEST_P(DropTest, DropPerTenThousand) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 5000; const uint32_t kDropPerTenThousandForLb = 1000; @@ -1626,7 +1659,7 @@ TEST_P(DropTest, DropPerTenThousand) { // Tests that drop is working correctly after update. TEST_P(DropTest, Update) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 1000; const uint32_t kDropPerMillionForLb = 100000; @@ -1722,7 +1755,7 @@ TEST_P(DropTest, Update) { // Tests that all the RPCs are dropped if any drop category drops 100%. TEST_P(DropTest, DropAll) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 1000; const uint32_t kDropPerMillionForLb = 100000; @@ -1755,8 +1788,7 @@ TEST_P(FallbackTest, Vanilla) { const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); const size_t kNumBackendsInResolution = backends_.size() / 2; ResetStub(kFallbackTimeoutMs); - SetNextResolution(GetBackendPorts(0, kNumBackendsInResolution), - kDefaultServiceConfig_.c_str()); + SetNextResolution(GetBackendPorts(0, kNumBackendsInResolution)); SetNextResolutionForLbChannelAllBalancers(); // Send non-empty serverlist only after kServerlistDelayMs. AdsServiceImpl::ResponseArgs args({ @@ -1805,8 +1837,7 @@ TEST_P(FallbackTest, Update) { const size_t kNumBackendsInResolution = backends_.size() / 3; const size_t kNumBackendsInResolutionUpdate = backends_.size() / 3; ResetStub(kFallbackTimeoutMs); - SetNextResolution(GetBackendPorts(0, kNumBackendsInResolution), - kDefaultServiceConfig_.c_str()); + SetNextResolution(GetBackendPorts(0, kNumBackendsInResolution)); SetNextResolutionForLbChannelAllBalancers(); // Send non-empty serverlist only after kServerlistDelayMs. AdsServiceImpl::ResponseArgs args({ @@ -1829,10 +1860,9 @@ TEST_P(FallbackTest, Update) { for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) { EXPECT_EQ(0U, backends_[i]->backend_service()->request_count()); } - SetNextResolution(GetBackendPorts(kNumBackendsInResolution, - kNumBackendsInResolution + - kNumBackendsInResolutionUpdate), - kDefaultServiceConfig_.c_str()); + SetNextResolution(GetBackendPorts( + kNumBackendsInResolution, + kNumBackendsInResolution + kNumBackendsInResolutionUpdate)); // Wait until the resolution update has been processed and all the new // fallback backends are reachable. WaitForAllBackends(kNumBackendsInResolution /* start_index */, @@ -1882,7 +1912,7 @@ TEST_P(FallbackTest, FallbackEarlyWhenBalancerChannelFails) { const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor(); ResetStub(kFallbackTimeoutMs); // Return an unreachable balancer and one fallback backend. - SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str()); + SetNextResolution({backends_[0]->port()}); SetNextResolutionForLbChannel({g_port_saver->GetPort()}); // Send RPC with deadline less than the fallback timeout and make sure it // succeeds. @@ -1895,7 +1925,7 @@ TEST_P(FallbackTest, FallbackEarlyWhenBalancerCallFails) { const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor(); ResetStub(kFallbackTimeoutMs); // Return one balancer and one fallback backend. - SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str()); + SetNextResolution({backends_[0]->port()}); SetNextResolutionForLbChannelAllBalancers(); // Balancer drops call without sending a serverlist. balancers_[0]->ads_service()->NotifyDoneWithAdsCall(); @@ -1910,7 +1940,7 @@ TEST_P(FallbackTest, FallbackEarlyWhenBalancerCallFails) { TEST_P(FallbackTest, FallbackIfResponseReceivedButChildNotReady) { const int kFallbackTimeoutMs = 500 * grpc_test_slowdown_factor(); ResetStub(kFallbackTimeoutMs); - SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str()); + SetNextResolution({backends_[0]->port()}); SetNextResolutionForLbChannelAllBalancers(); // Send a serverlist that only contains an unreachable backend before fallback // timeout. @@ -1927,7 +1957,7 @@ TEST_P(FallbackTest, FallbackIfResponseReceivedButChildNotReady) { // all the calls. TEST_P(FallbackTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) { // Return an unreachable balancer and one fallback backend. - SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str()); + SetNextResolution({backends_[0]->port()}); SetNextResolutionForLbChannel({g_port_saver->GetPort()}); // Enter fallback mode because the LB channel fails to connect. WaitForBackend(0); @@ -1951,7 +1981,7 @@ TEST_P(FallbackTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) { // Tests that fallback mode is exited if the child policy becomes ready. TEST_P(FallbackTest, FallbackModeIsExitedAfterChildRready) { // Return an unreachable balancer and one fallback backend. - SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str()); + SetNextResolution({backends_[0]->port()}); SetNextResolutionForLbChannel({g_port_saver->GetPort()}); // Enter fallback mode because the LB channel fails to connect. WaitForBackend(0); @@ -1989,7 +2019,7 @@ class BalancerUpdateTest : public XdsEnd2endTest { // Tests that the old LB call is still used after the balancer address update as // long as that call is still alive. TEST_P(BalancerUpdateTest, UpdateBalancersButKeepUsingOriginalBalancer) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ {"locality0", {backends_[0]->port()}}, @@ -2042,7 +2072,7 @@ TEST_P(BalancerUpdateTest, UpdateBalancersButKeepUsingOriginalBalancer) { // xds keeps the initial connection (which by definition is also present in the // update). TEST_P(BalancerUpdateTest, Repeated) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ {"locality0", {backends_[0]->port()}}, @@ -2107,7 +2137,7 @@ TEST_P(BalancerUpdateTest, Repeated) { // backends according to the last balancer response, until a new balancer is // reachable. TEST_P(BalancerUpdateTest, DeadUpdate) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannel({balancers_[0]->port()}); AdsServiceImpl::ResponseArgs args({ {"locality0", {backends_[0]->port()}}, @@ -2186,7 +2216,7 @@ class ClientLoadReportingTest : public XdsEnd2endTest { // Tests that the load report received at the balancer is correct. TEST_P(ClientLoadReportingTest, Vanilla) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannel({balancers_[0]->port()}); const size_t kNumRpcsPerAddress = 100; // TODO(juanlishen): Partition the backends after multiple localities is @@ -2227,7 +2257,7 @@ TEST_P(ClientLoadReportingTest, Vanilla) { // Tests that if the balancer restarts, the client load report contains the // stats before and after the restart correctly. TEST_P(ClientLoadReportingTest, BalancerRestart) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannel({balancers_[0]->port()}); const size_t kNumBackendsFirstPass = backends_.size() / 2; const size_t kNumBackendsSecondPass = @@ -2293,7 +2323,7 @@ class ClientLoadReportingWithDropTest : public XdsEnd2endTest { // Tests that the drop stats are correctly reported by client load reporting. TEST_P(ClientLoadReportingWithDropTest, Vanilla) { - SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 3000; const uint32_t kDropPerMillionForLb = 100000; @@ -2355,28 +2385,66 @@ TEST_P(ClientLoadReportingWithDropTest, Vanilla) { EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count()); } -INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, BasicTest, ::testing::Bool()); - -INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, SecureNamingTest, ::testing::Bool()); - -INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, LocalityMapTest, ::testing::Bool()); - -INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, FailoverTest, ::testing::Bool()); +grpc::string TestTypeName(const ::testing::TestParamInfo& info) { + return info.param.AsString(); +} -INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, DropTest, ::testing::Bool()); +// TODO(juanlishen): Load reporting disabled is currently tested only with DNS +// resolver. Once we implement CDS, test it via the xds resolver too. + +INSTANTIATE_TEST_SUITE_P(XdsTest, BasicTest, + ::testing::Values(TestType(false, true), + TestType(false, false), + TestType(true, true)), + &TestTypeName); + +INSTANTIATE_TEST_SUITE_P(XdsTest, SecureNamingTest, + ::testing::Values(TestType(false, true), + TestType(false, false), + TestType(true, true)), + &TestTypeName); + +INSTANTIATE_TEST_SUITE_P(XdsTest, LocalityMapTest, + ::testing::Values(TestType(false, true), + TestType(false, false), + TestType(true, true)), + &TestTypeName); + +INSTANTIATE_TEST_SUITE_P(XdsTest, FailoverTest, + ::testing::Values(TestType(false, true), + TestType(false, false), + TestType(true, true)), + &TestTypeName); + +INSTANTIATE_TEST_SUITE_P(XdsTest, DropTest, + ::testing::Values(TestType(false, true), + TestType(false, false), + TestType(true, true)), + &TestTypeName); // Fallback does not work with xds resolver. -INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, FallbackTest, - ::testing::Values(false)); - -INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, BalancerUpdateTest, - ::testing::Bool()); - -INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, ClientLoadReportingTest, - ::testing::Bool()); - -INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, ClientLoadReportingWithDropTest, - ::testing::Bool()); +INSTANTIATE_TEST_SUITE_P(XdsTest, FallbackTest, + ::testing::Values(TestType(false, true), + TestType(false, false)), + &TestTypeName); + +INSTANTIATE_TEST_SUITE_P(XdsTest, BalancerUpdateTest, + ::testing::Values(TestType(false, true), + TestType(false, false), + TestType(true, true)), + &TestTypeName); + +// Load reporting tests are not run with load reporting disabled. +INSTANTIATE_TEST_SUITE_P(XdsTest, ClientLoadReportingTest, + ::testing::Values(TestType(false, true), + TestType(true, true)), + &TestTypeName); + +// Load reporting tests are not run with load reporting disabled. +INSTANTIATE_TEST_SUITE_P(XdsTest, ClientLoadReportingWithDropTest, + ::testing::Values(TestType(false, true), + TestType(true, true)), + &TestTypeName); } // namespace } // namespace testing