From 8574ccc57b71385d7cfbe00c722918caf9fdae7b Mon Sep 17 00:00:00 2001 From: nanahpang <31627465+nanahpang@users.noreply.github.com> Date: Tue, 22 Oct 2019 14:49:59 -0700 Subject: [PATCH] Revert "Add new fields to xds policy config." --- .../client_channel/lb_policy/xds/xds.cc | 198 +++++----------- .../filters/client_channel/xds/xds_client.cc | 19 +- .../filters/client_channel/xds/xds_client.h | 11 +- test/cpp/end2end/xds_end2end_test.cc | 212 ++++++------------ 4 files changed, 141 insertions(+), 299 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index e2e6c4f3332..8449566d534 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -77,14 +77,9 @@ constexpr char kXds[] = "xds_experimental"; class ParsedXdsConfig : public LoadBalancingPolicy::Config { public: ParsedXdsConfig(RefCountedPtr child_policy, - RefCountedPtr fallback_policy, - UniquePtr eds_service_name, - UniquePtr lrs_load_reporting_server_name) + RefCountedPtr fallback_policy) : child_policy_(std::move(child_policy)), - fallback_policy_(std::move(fallback_policy)), - eds_service_name_(std::move(eds_service_name)), - lrs_load_reporting_server_name_( - std::move(lrs_load_reporting_server_name)) {} + fallback_policy_(std::move(fallback_policy)) {} const char* name() const override { return kXds; } @@ -96,17 +91,9 @@ class ParsedXdsConfig : public LoadBalancingPolicy::Config { return fallback_policy_; } - const char* eds_service_name() const { return eds_service_name_.get(); }; - - const char* lrs_load_reporting_server_name() const { - return lrs_load_reporting_server_name_.get(); - }; - private: RefCountedPtr child_policy_; RefCountedPtr fallback_policy_; - UniquePtr eds_service_name_; - UniquePtr lrs_load_reporting_server_name_; }; class XdsLb : public LoadBalancingPolicy { @@ -124,17 +111,16 @@ class XdsLb : public LoadBalancingPolicy { // We need this wrapper for the following reasons: // 1. To process per-locality load reporting. // 2. Since pickers are UniquePtrs we use this RefCounted wrapper to control - // references to it by the xds picker and the locality. - class EndpointPickerWrapper : public RefCounted { + // references to it by the xds picker and the locality. + class PickerWrapper : public RefCounted { public: - EndpointPickerWrapper( - UniquePtr picker, - RefCountedPtr locality_stats) + PickerWrapper(UniquePtr picker, + RefCountedPtr locality_stats) : picker_(std::move(picker)), locality_stats_(std::move(locality_stats)) { locality_stats_->RefByPicker(); } - ~EndpointPickerWrapper() { locality_stats_->UnrefByPicker(); } + ~PickerWrapper() { locality_stats_->UnrefByPicker(); } PickResult Pick(PickArgs args); @@ -145,16 +131,15 @@ class XdsLb : public LoadBalancingPolicy { // The picker will use a stateless weighting algorithm to pick the locality to // use for each request. - class LocalityPicker : public SubchannelPicker { + class Picker : public SubchannelPicker { public: // Maintains a weighted list of pickers from each locality that is in ready // state. The first element in the pair represents the end of a range // proportional to the locality's weight. The start of the range is the // previous value in the vector and is 0 for the first element. using PickerList = - InlinedVector>, - 1>; - LocalityPicker(RefCountedPtr xds_policy, PickerList pickers) + InlinedVector>, 1>; + Picker(RefCountedPtr xds_policy, PickerList pickers) : xds_policy_(std::move(xds_policy)), pickers_(std::move(pickers)), drop_config_(xds_policy_->drop_config_) {} @@ -219,7 +204,7 @@ class XdsLb : public LoadBalancingPolicy { return connectivity_state_; } uint32_t weight() const { return weight_; } - RefCountedPtr picker_wrapper() const { + RefCountedPtr picker_wrapper() const { return picker_wrapper_; } @@ -271,7 +256,7 @@ class XdsLb : public LoadBalancingPolicy { RefCountedPtr name_; OrphanablePtr child_policy_; OrphanablePtr pending_child_policy_; - RefCountedPtr picker_wrapper_; + RefCountedPtr picker_wrapper_; grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE; uint32_t weight_; @@ -392,24 +377,16 @@ class XdsLb : public LoadBalancingPolicy { const char* name, const grpc_channel_args* args); void MaybeExitFallbackMode(); - const char* eds_service_name() const { - if (config_ != nullptr && config_->eds_service_name() != nullptr) { - return config_->eds_service_name(); - } - return server_name_.get(); - } - XdsClient* xds_client() const { return xds_client_from_channel_ != nullptr ? xds_client_from_channel_.get() : xds_client_.get(); } - // Server name from target URI. - UniquePtr server_name_; + // Name of the backend server to connect to. + const char* server_name_ = nullptr; - // Current channel args and config from the resolver. + // Current channel args from the resolver. const grpc_channel_args* args_ = nullptr; - RefCountedPtr config_; // Internal state. bool shutting_down_ = false; @@ -441,10 +418,14 @@ class XdsLb : public LoadBalancingPolicy { grpc_timer lb_fallback_timer_; grpc_closure lb_on_fallback_; + // The policy to use for the fallback backends. + RefCountedPtr fallback_policy_config_; // Non-null iff we are in fallback mode. OrphanablePtr fallback_policy_; OrphanablePtr pending_fallback_policy_; + // The policy to use for the backends. + RefCountedPtr child_policy_config_; const grpc_millis locality_retention_interval_ms_; const grpc_millis locality_map_failover_timeout_ms_; // A list of locality maps indexed by priority. @@ -460,10 +441,10 @@ class XdsLb : public LoadBalancingPolicy { }; // -// XdsLb::EndpointPickerWrapper +// XdsLb::PickerWrapper::Pick // -LoadBalancingPolicy::PickResult XdsLb::EndpointPickerWrapper::Pick( +LoadBalancingPolicy::PickResult XdsLb::PickerWrapper::Pick( LoadBalancingPolicy::PickArgs args) { // Forward the pick to the picker returned from the child policy. PickResult result = picker_->Pick(args); @@ -489,10 +470,10 @@ LoadBalancingPolicy::PickResult XdsLb::EndpointPickerWrapper::Pick( } // -// XdsLb::LocalityPicker +// XdsLb::Picker // -XdsLb::PickResult XdsLb::LocalityPicker::Pick(PickArgs args) { +XdsLb::PickResult XdsLb::Picker::Pick(PickArgs args) { // Handle drop. const UniquePtr* drop_category; if (drop_config_->ShouldDrop(&drop_category)) { @@ -508,8 +489,8 @@ XdsLb::PickResult XdsLb::LocalityPicker::Pick(PickArgs args) { return PickFromLocality(key, args); } -XdsLb::PickResult XdsLb::LocalityPicker::PickFromLocality(const uint32_t key, - PickArgs args) { +XdsLb::PickResult XdsLb::Picker::PickFromLocality(const uint32_t key, + PickArgs args) { size_t mid = 0; size_t start_index = 0; size_t end_index = pickers_.size() - 1; @@ -705,11 +686,11 @@ XdsLb::XdsLb(Args args) GPR_ASSERT(server_uri != nullptr); grpc_uri* uri = grpc_uri_parse(server_uri, true); GPR_ASSERT(uri->path[0] != '\0'); - server_name_.reset( - gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path)); + server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { - gpr_log(GPR_INFO, "[xdslb %p] server name from channel: %s", this, - server_name_.get()); + gpr_log(GPR_INFO, + "[xdslb %p] Will use '%s' as the server name for LB request.", this, + server_name_); } grpc_uri_destroy(uri); } @@ -718,6 +699,7 @@ XdsLb::~XdsLb() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, "[xdslb %p] destroying xds LB policy", this); } + gpr_free((void*)server_name_); grpc_channel_args_destroy(args_); } @@ -740,13 +722,9 @@ void XdsLb::ShutdownLocked() { pending_fallback_policy_.reset(); // Cancel the endpoint watch here instead of in our dtor, because the // watcher holds a ref to us. - xds_client()->CancelEndpointDataWatch(StringView(eds_service_name()), + xds_client()->CancelEndpointDataWatch(StringView(server_name_), endpoint_watcher_); - if (config_->lrs_load_reporting_server_name() != nullptr) { - xds_client()->RemoveClientStats( - StringView(config_->lrs_load_reporting_server_name()), - StringView(eds_service_name()), &client_stats_); - } + xds_client()->RemoveClientStats(StringView(server_name_), &client_stats_); xds_client_from_channel_.reset(); xds_client_.reset(); } @@ -775,9 +753,9 @@ void XdsLb::UpdateLocked(UpdateArgs args) { } const bool is_initial_update = args_ == nullptr; // Update config. - const char* old_eds_service_name = eds_service_name(); - auto old_config = std::move(config_); - config_ = std::move(args.config); + auto* xds_config = static_cast(args.config.get()); + child_policy_config_ = xds_config->child_policy(); + fallback_policy_config_ = xds_config->fallback_policy(); // Update fallback address list. fallback_backend_addresses_ = std::move(args.addresses); // Update args. @@ -794,7 +772,7 @@ void XdsLb::UpdateLocked(UpdateArgs args) { if (xds_client_from_channel_ == nullptr) { grpc_error* error = GRPC_ERROR_NONE; xds_client_ = MakeOrphanable( - combiner(), interested_parties(), StringView(eds_service_name()), + combiner(), interested_parties(), StringView(server_name_), nullptr /* service config watcher */, *args_, &error); // TODO(roth): If we decide that we care about fallback mode, add // proper error handling here. @@ -804,6 +782,11 @@ void XdsLb::UpdateLocked(UpdateArgs args) { xds_client_.get()); } } + auto watcher = MakeUnique(Ref()); + endpoint_watcher_ = watcher.get(); + xds_client()->WatchEndpointData(StringView(server_name_), + std::move(watcher)); + xds_client()->AddClientStats(StringView(server_name_), &client_stats_); // Start fallback-at-startup checks. grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure @@ -812,42 +795,6 @@ void XdsLb::UpdateLocked(UpdateArgs args) { fallback_at_startup_checks_pending_ = true; grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); } - // Update endpoint watcher if needed. - if (is_initial_update || - strcmp(old_eds_service_name, eds_service_name()) != 0) { - if (!is_initial_update) { - xds_client()->CancelEndpointDataWatch(StringView(old_eds_service_name), - endpoint_watcher_); - } - auto watcher = MakeUnique(Ref()); - endpoint_watcher_ = watcher.get(); - xds_client()->WatchEndpointData(StringView(eds_service_name()), - std::move(watcher)); - } - // Update load reporting if needed. - // TODO(roth): Ideally, we should not collect any stats if load reporting - // is disabled, which would require changing this code to recreate - // all of the pickers whenever load reporting is enabled or disabled - // here. - if (is_initial_update || - (config_->lrs_load_reporting_server_name() == nullptr) != - (old_config->lrs_load_reporting_server_name() == nullptr) || - (config_->lrs_load_reporting_server_name() != nullptr && - old_config->lrs_load_reporting_server_name() != nullptr && - strcmp(config_->lrs_load_reporting_server_name(), - old_config->lrs_load_reporting_server_name()) != 0)) { - if (old_config != nullptr && - old_config->lrs_load_reporting_server_name() != nullptr) { - xds_client()->RemoveClientStats( - StringView(old_config->lrs_load_reporting_server_name()), - StringView(old_eds_service_name), &client_stats_); - } - if (config_->lrs_load_reporting_server_name() != nullptr) { - xds_client()->AddClientStats( - StringView(config_->lrs_load_reporting_server_name()), - StringView(eds_service_name()), &client_stats_); - } - } } // @@ -892,7 +839,9 @@ void XdsLb::UpdateFallbackPolicyLocked() { // Construct update args. UpdateArgs update_args; update_args.addresses = fallback_backend_addresses_; - update_args.config = config_->fallback_policy(); + update_args.config = fallback_policy_config_ == nullptr + ? nullptr + : fallback_policy_config_->Ref(); update_args.args = grpc_channel_args_copy(args_); // If the child policy name changes, we need to create a new child // policy. When this happens, we leave child_policy_ as-is and store @@ -943,9 +892,9 @@ void XdsLb::UpdateFallbackPolicyLocked() { // that was there before, which will be immediately shut down) // and will later be swapped into child_policy_ by the helper // when the new child transitions into state READY. - const char* fallback_policy_name = update_args.config == nullptr + const char* fallback_policy_name = fallback_policy_config_ == nullptr ? "round_robin" - : update_args.config->name(); + : fallback_policy_config_->name(); const bool create_policy = // case 1 fallback_policy_ == nullptr || @@ -1226,7 +1175,7 @@ void XdsLb::PriorityList::LocalityMap::UpdateXdsPickerLocked() { // that are ready. Each locality is represented by a portion of the range // proportional to its weight, such that the total range is the sum of the // weights of all localities. - LocalityPicker::PickerList picker_list; + Picker::PickerList picker_list; uint32_t end = 0; for (const auto& p : localities_) { const auto& locality_name = p.first; @@ -1238,9 +1187,9 @@ void XdsLb::PriorityList::LocalityMap::UpdateXdsPickerLocked() { picker_list.push_back(std::make_pair(end, locality->picker_wrapper())); } xds_policy()->channel_control_helper()->UpdateState( - GRPC_CHANNEL_READY, MakeUnique( - xds_policy_->Ref(DEBUG_LOCATION, "XdsLb+Picker"), - std::move(picker_list))); + GRPC_CHANNEL_READY, + MakeUnique(xds_policy_->Ref(DEBUG_LOCATION, "XdsLb+Picker"), + std::move(picker_list))); } OrphanablePtr @@ -1541,7 +1490,9 @@ void XdsLb::PriorityList::LocalityMap::Locality::UpdateLocked( // Construct update args. UpdateArgs update_args; update_args.addresses = std::move(serverlist); - update_args.config = xds_policy()->config_->child_policy(); + update_args.config = xds_policy()->child_policy_config_ == nullptr + ? nullptr + : xds_policy()->child_policy_config_->Ref(); update_args.args = CreateChildPolicyArgsLocked(xds_policy()->args_); // If the child policy name changes, we need to create a new child // policy. When this happens, we leave child_policy_ as-is and store @@ -1594,9 +1545,10 @@ void XdsLb::PriorityList::LocalityMap::Locality::UpdateLocked( // when the new child transitions into state READY. // TODO(juanlishen): If the child policy is not configured via service config, // use whatever algorithm is specified by the balancer. - const char* child_policy_name = update_args.config == nullptr - ? "round_robin" - : update_args.config->name(); + const char* child_policy_name = + xds_policy()->child_policy_config_ == nullptr + ? "round_robin" + : xds_policy()->child_policy_config_->name(); const bool create_policy = // case 1 child_policy_ == nullptr || @@ -1763,11 +1715,7 @@ void XdsLb::PriorityList::LocalityMap::Locality::Helper::UpdateState( return; } // Cache the picker and its state in the locality. - // TODO(roth): If load reporting is not configured, we should ideally - // pass a null LocalityStats ref to the EndpointPickerWrapper and have it - // not collect any stats, since they're not going to be used. This would - // require recreating all of the pickers whenever we get a config update. - locality_->picker_wrapper_ = MakeRefCounted( + locality_->picker_wrapper_ = MakeRefCounted( std::move(picker), locality_->xds_policy()->client_stats_.FindLocalityStats( locality_->name_)); @@ -1814,8 +1762,6 @@ class XdsFactory : public LoadBalancingPolicyFactory { InlinedVector error_list; RefCountedPtr child_policy; RefCountedPtr fallback_policy; - const char* eds_service_name = nullptr; - const char* lrs_load_reporting_server_name = nullptr; for (const grpc_json* field = json->child; field != nullptr; field = field->next) { if (field->key == nullptr) continue; @@ -1843,35 +1789,11 @@ class XdsFactory : public LoadBalancingPolicyFactory { GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); error_list.push_back(parse_error); } - } else if (strcmp(field->key, "edsServiceName") == 0) { - if (eds_service_name != nullptr) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:edsServiceName error:Duplicate entry")); - } - if (field->type != GRPC_JSON_STRING) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:edsServiceName error:type should be string")); - continue; - } - eds_service_name = field->value; - } else if (strcmp(field->key, "lrsLoadReportingServerName") == 0) { - if (lrs_load_reporting_server_name != nullptr) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:lrsLoadReportingServerName error:Duplicate entry")); - } - if (field->type != GRPC_JSON_STRING) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:lrsLoadReportingServerName error:type should be string")); - continue; - } - lrs_load_reporting_server_name = field->value; } } if (error_list.empty()) { - return MakeRefCounted( - std::move(child_policy), std::move(fallback_policy), - UniquePtr(gpr_strdup(eds_service_name)), - UniquePtr(gpr_strdup(lrs_load_reporting_server_name))); + return RefCountedPtr(New( + std::move(child_policy), std::move(fallback_policy))); } else { *error = GRPC_ERROR_CREATE_FROM_VECTOR("Xds Parser", &error_list); return nullptr; diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index 2f09e12ab94..7bb835071a6 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -736,11 +736,8 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( } } // Start load reporting if needed. - auto& lrs_call = ads_calld->chand()->lrs_calld_; - if (lrs_call != nullptr) { - LrsCallState* lrs_calld = lrs_call->calld(); - if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked(); - } + LrsCallState* lrs_calld = ads_calld->chand()->lrs_calld_->calld(); + if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked(); // Ignore identical update. const EdsUpdate& prev_update = xds_client->cluster_state_.eds_update; const bool priority_list_changed = @@ -1322,18 +1319,14 @@ void XdsClient::CancelEndpointDataWatch(StringView cluster, } } -void XdsClient::AddClientStats(StringView lrs_server, StringView cluster, +void XdsClient::AddClientStats(StringView cluster, XdsClientStats* client_stats) { - // TODO(roth): When we add support for direct federation, use the - // server name specified in lrs_server. cluster_state_.client_stats.insert(client_stats); chand_->MaybeStartLrsCall(); } -void XdsClient::RemoveClientStats(StringView lrs_server, StringView cluster, +void XdsClient::RemoveClientStats(StringView cluster, XdsClientStats* client_stats) { - // TODO(roth): When we add support for direct federation, use the - // server name specified in lrs_server. // TODO(roth): In principle, we should try to send a final load report // containing whatever final stats have been accumulated since the // last load report. @@ -1372,9 +1365,7 @@ void XdsClient::NotifyOnServiceConfig(void* arg, grpc_error* error) { static const char* json = "{\n" " \"loadBalancingConfig\":[\n" - " { \"xds_experimental\":{\n" - " \"lrsLoadReportingServerName\": \"\"\n" - " } }\n" + " { \"xds_experimental\":{} }\n" " ]\n" "}"; RefCountedPtr service_config = diff --git a/src/core/ext/filters/client_channel/xds/xds_client.h b/src/core/ext/filters/client_channel/xds/xds_client.h index c5fc08cf0c2..d2d9dc7558a 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.h +++ b/src/core/ext/filters/client_channel/xds/xds_client.h @@ -100,10 +100,8 @@ class XdsClient : public InternallyRefCounted { EndpointWatcherInterface* watcher); // Adds and removes client stats for cluster. - void AddClientStats(StringView lrs_server, StringView cluster, - XdsClientStats* client_stats); - void RemoveClientStats(StringView lrs_server, StringView cluster, - XdsClientStats* client_stats); + void AddClientStats(StringView cluster, XdsClientStats* client_stats); + void RemoveClientStats(StringView cluster, XdsClientStats* client_stats); // Resets connection backoff state. void ResetBackoff(); @@ -210,9 +208,8 @@ class XdsClient : public InternallyRefCounted { // The channel for communicating with the xds server. OrphanablePtr chand_; - // TODO(juanlishen): As part of adding CDS support, replace - // cluster_state_ with a map keyed by cluster name, so that we can - // support multiple clusters for both CDS and EDS. + // TODO(roth): When we need support for multiple clusters, replace + // cluster_state_ with a map keyed by cluster name. ClusterState cluster_state_; // Map clusters_; diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 69685c9c5e8..7533f458c7c 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -575,27 +575,7 @@ class LrsServiceImpl : public LrsService { bool load_report_ready_ = false; }; -class TestType { - public: - TestType(bool use_xds_resolver, bool enable_load_reporting) - : use_xds_resolver_(use_xds_resolver), - enable_load_reporting_(enable_load_reporting) {} - - bool use_xds_resolver() const { return use_xds_resolver_; } - bool enable_load_reporting() const { return enable_load_reporting_; } - - grpc::string AsString() const { - grpc::string retval = (use_xds_resolver_ ? "XdsResolver" : "FakeResolver"); - if (enable_load_reporting_) retval += "WithLoadReporting"; - return retval; - } - - private: - const bool use_xds_resolver_; - const bool enable_load_reporting_; -}; - -class XdsEnd2endTest : public ::testing::TestWithParam { +class XdsEnd2endTest : public ::testing::TestWithParam { protected: XdsEnd2endTest(size_t num_backends, size_t num_balancers, int client_load_reporting_interval_seconds) @@ -674,14 +654,12 @@ class XdsEnd2endTest : public ::testing::TestWithParam { // channel never uses a response generator, and we inject the xds // channel's response generator here. args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, - GetParam().use_xds_resolver() - ? lb_channel_response_generator_.get() - : response_generator_.get()); + GetParam() ? lb_channel_response_generator_.get() + : response_generator_.get()); if (!expected_targets.empty()) { args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, expected_targets); } - grpc::string scheme = - GetParam().use_xds_resolver() ? "xds-experimental" : "fake"; + grpc::string scheme = GetParam() ? "xds-experimental" : "fake"; std::ostringstream uri; uri << scheme << ":///" << kApplicationTargetName_; // TODO(dgq): templatize tests to run everything using both secure and @@ -771,20 +749,19 @@ class XdsEnd2endTest : public ::testing::TestWithParam { } void SetNextResolution(const std::vector& ports, + const char* service_config_json = nullptr, grpc_core::FakeResolverResponseGenerator* lb_channel_response_generator = nullptr) { - if (GetParam().use_xds_resolver()) return; // Not used with xds resolver. + if (GetParam()) return; // Not used with xds resolver. grpc_core::ExecCtx exec_ctx; grpc_core::Resolver::Result result; result.addresses = CreateAddressListFromPortList(ports); - grpc_error* error = GRPC_ERROR_NONE; - const char* service_config_json = - GetParam().enable_load_reporting() - ? kDefaultServiceConfig_ - : kDefaultServiceConfigWithoutLoadReporting_; - result.service_config = - grpc_core::ServiceConfig::Create(service_config_json, &error); - GRPC_ERROR_UNREF(error); + if (service_config_json != nullptr) { + grpc_error* error = GRPC_ERROR_NONE; + result.service_config = + grpc_core::ServiceConfig::Create(service_config_json, &error); + GRPC_ERROR_UNREF(error); + } grpc_arg arg = grpc_core::FakeResolverResponseGenerator::MakeChannelArg( lb_channel_response_generator == nullptr ? lb_channel_response_generator_.get() @@ -1000,21 +977,11 @@ class XdsEnd2endTest : public ::testing::TestWithParam { lb_channel_response_generator_; const grpc::string kRequestMessage_ = "Live long and prosper."; const grpc::string kApplicationTargetName_ = "application_target_name"; - const char* kDefaultServiceConfig_ = + const grpc::string kDefaultServiceConfig_ = "{\n" " \"loadBalancingConfig\":[\n" " { \"does_not_exist\":{} },\n" - " { \"xds_experimental\":{\n" - " \"lrsLoadReportingServerName\": \"\"\n" - " } }\n" - " ]\n" - "}"; - const char* kDefaultServiceConfigWithoutLoadReporting_ = - "{\n" - " \"loadBalancingConfig\":[\n" - " { \"does_not_exist\":{} },\n" - " { \"xds_experimental\":{\n" - " } }\n" + " { \"xds_experimental\":{} }\n" " ]\n" "}"; }; @@ -1027,7 +994,7 @@ class BasicTest : public XdsEnd2endTest { // Tests that the balancer sends the correct response to the client, and the // client sends RPCs to the backends using the default child policy. TEST_P(BasicTest, Vanilla) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcsPerAddress = 100; AdsServiceImpl::ResponseArgs args({ @@ -1085,7 +1052,7 @@ TEST_P(BasicTest, IgnoresUnhealthyEndpoints) { // Tests that subchannel sharing works when the same backend is listed multiple // times. TEST_P(BasicTest, SameBackendListedMultipleTimes) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); // Same backend listed twice. std::vector ports(2, backends_[0]->port()); @@ -1108,7 +1075,7 @@ TEST_P(BasicTest, SameBackendListedMultipleTimes) { // Tests that RPCs will be blocked until a non-empty serverlist is received. TEST_P(BasicTest, InitiallyEmptyServerlist) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); const int kCallDeadlineMs = kServerlistDelayMs * 2; @@ -1144,7 +1111,7 @@ TEST_P(BasicTest, InitiallyEmptyServerlist) { // Tests that RPCs will fail with UNAVAILABLE instead of DEADLINE_EXCEEDED if // all the servers are unreachable. TEST_P(BasicTest, AllServersUnreachableFailFast) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumUnreachableServers = 5; std::vector ports; @@ -1166,7 +1133,7 @@ TEST_P(BasicTest, AllServersUnreachableFailFast) { // Tests that RPCs fail when the backends are down, and will succeed again after // the backends are restarted. TEST_P(BasicTest, BackendsRestart) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ {"locality0", GetBackendPorts()}, @@ -1188,7 +1155,7 @@ using SecureNamingTest = BasicTest; TEST_P(SecureNamingTest, TargetNameIsExpected) { // TODO(juanlishen): Use separate fake creds for the balancer channel. ResetStub(0, 0, kApplicationTargetName_ + ";lb"); - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannel({balancers_[0]->port()}); const size_t kNumRpcsPerAddress = 100; AdsServiceImpl::ResponseArgs args({ @@ -1220,7 +1187,7 @@ TEST_P(SecureNamingTest, TargetNameIsUnexpected) { ASSERT_DEATH_IF_SUPPORTED( { ResetStub(0, 0, kApplicationTargetName_ + ";lb"); - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannel({balancers_[0]->port()}); channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(1)); }, @@ -1232,7 +1199,7 @@ using LocalityMapTest = BasicTest; // Tests that the localities in a locality map are picked according to their // weights. TEST_P(LocalityMapTest, WeightedRoundRobin) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 5000; const int kLocalityWeight0 = 2; @@ -1276,7 +1243,7 @@ TEST_P(LocalityMapTest, WeightedRoundRobin) { // Tests that the locality map can work properly even when it contains a large // number of localities. TEST_P(LocalityMapTest, StressTest) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumLocalities = 100; // The first ADS response contains kNumLocalities localities, each of which @@ -1311,7 +1278,7 @@ TEST_P(LocalityMapTest, StressTest) { // Tests that the localities in a locality map are picked correctly after update // (addition, modification, deletion). TEST_P(LocalityMapTest, UpdateMap) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 1000; // The locality weight for the first 3 localities. @@ -1408,7 +1375,7 @@ class FailoverTest : public BasicTest { // Localities with the highest priority are used when multiple priority exist. TEST_P(FailoverTest, ChooseHighestPriority) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1}, @@ -1429,7 +1396,7 @@ TEST_P(FailoverTest, ChooseHighestPriority) { // If the higher priority localities are not reachable, failover to the highest // priority among the rest. TEST_P(FailoverTest, Failover) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1}, @@ -1453,7 +1420,7 @@ TEST_P(FailoverTest, Failover) { // If a locality with higher priority than the current one becomes ready, // switch to it. TEST_P(FailoverTest, SwitchBackToHigherPriority) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 100; AdsServiceImpl::ResponseArgs args({ @@ -1482,7 +1449,7 @@ TEST_P(FailoverTest, SwitchBackToHigherPriority) { // The first update only contains unavailable priorities. The second update // contains available priorities. TEST_P(FailoverTest, UpdateInitialUnavailable) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 0}, @@ -1517,7 +1484,7 @@ TEST_P(FailoverTest, UpdateInitialUnavailable) { // Tests that after the localities' priorities are updated, we still choose the // highest READY priority with the updated localities. TEST_P(FailoverTest, UpdatePriority) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 100; AdsServiceImpl::ResponseArgs args({ @@ -1550,7 +1517,7 @@ using DropTest = BasicTest; // Tests that RPCs are dropped according to the drop config. TEST_P(DropTest, Vanilla) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 5000; const uint32_t kDropPerMillionForLb = 100000; @@ -1596,7 +1563,7 @@ TEST_P(DropTest, Vanilla) { // Tests that drop config is converted correctly from per hundred. TEST_P(DropTest, DropPerHundred) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 5000; const uint32_t kDropPerHundredForLb = 10; @@ -1637,7 +1604,7 @@ TEST_P(DropTest, DropPerHundred) { // Tests that drop config is converted correctly from per ten thousand. TEST_P(DropTest, DropPerTenThousand) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 5000; const uint32_t kDropPerTenThousandForLb = 1000; @@ -1678,7 +1645,7 @@ TEST_P(DropTest, DropPerTenThousand) { // Tests that drop is working correctly after update. TEST_P(DropTest, Update) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 1000; const uint32_t kDropPerMillionForLb = 100000; @@ -1774,7 +1741,7 @@ TEST_P(DropTest, Update) { // Tests that all the RPCs are dropped if any drop category drops 100%. TEST_P(DropTest, DropAll) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 1000; const uint32_t kDropPerMillionForLb = 100000; @@ -1807,7 +1774,8 @@ TEST_P(FallbackTest, Vanilla) { const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); const size_t kNumBackendsInResolution = backends_.size() / 2; ResetStub(kFallbackTimeoutMs); - SetNextResolution(GetBackendPorts(0, kNumBackendsInResolution)); + SetNextResolution(GetBackendPorts(0, kNumBackendsInResolution), + kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); // Send non-empty serverlist only after kServerlistDelayMs. AdsServiceImpl::ResponseArgs args({ @@ -1856,7 +1824,8 @@ TEST_P(FallbackTest, Update) { const size_t kNumBackendsInResolution = backends_.size() / 3; const size_t kNumBackendsInResolutionUpdate = backends_.size() / 3; ResetStub(kFallbackTimeoutMs); - SetNextResolution(GetBackendPorts(0, kNumBackendsInResolution)); + SetNextResolution(GetBackendPorts(0, kNumBackendsInResolution), + kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); // Send non-empty serverlist only after kServerlistDelayMs. AdsServiceImpl::ResponseArgs args({ @@ -1879,9 +1848,10 @@ TEST_P(FallbackTest, Update) { for (size_t i = kNumBackendsInResolution; i < backends_.size(); ++i) { EXPECT_EQ(0U, backends_[i]->backend_service()->request_count()); } - SetNextResolution(GetBackendPorts( - kNumBackendsInResolution, - kNumBackendsInResolution + kNumBackendsInResolutionUpdate)); + SetNextResolution(GetBackendPorts(kNumBackendsInResolution, + kNumBackendsInResolution + + kNumBackendsInResolutionUpdate), + kDefaultServiceConfig_.c_str()); // Wait until the resolution update has been processed and all the new // fallback backends are reachable. WaitForAllBackends(kNumBackendsInResolution /* start_index */, @@ -1931,7 +1901,7 @@ TEST_P(FallbackTest, FallbackEarlyWhenBalancerChannelFails) { const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor(); ResetStub(kFallbackTimeoutMs); // Return an unreachable balancer and one fallback backend. - SetNextResolution({backends_[0]->port()}); + SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannel({grpc_pick_unused_port_or_die()}); // Send RPC with deadline less than the fallback timeout and make sure it // succeeds. @@ -1944,7 +1914,7 @@ TEST_P(FallbackTest, FallbackEarlyWhenBalancerCallFails) { const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor(); ResetStub(kFallbackTimeoutMs); // Return one balancer and one fallback backend. - SetNextResolution({backends_[0]->port()}); + SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); // Balancer drops call without sending a serverlist. balancers_[0]->ads_service()->NotifyDoneWithAdsCall(); @@ -1959,7 +1929,7 @@ TEST_P(FallbackTest, FallbackEarlyWhenBalancerCallFails) { TEST_P(FallbackTest, FallbackIfResponseReceivedButChildNotReady) { const int kFallbackTimeoutMs = 500 * grpc_test_slowdown_factor(); ResetStub(kFallbackTimeoutMs); - SetNextResolution({backends_[0]->port()}); + SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); // Send a serverlist that only contains an unreachable backend before fallback // timeout. @@ -1976,7 +1946,7 @@ TEST_P(FallbackTest, FallbackIfResponseReceivedButChildNotReady) { // all the calls. TEST_P(FallbackTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) { // Return an unreachable balancer and one fallback backend. - SetNextResolution({backends_[0]->port()}); + SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannel({grpc_pick_unused_port_or_die()}); // Enter fallback mode because the LB channel fails to connect. WaitForBackend(0); @@ -2000,7 +1970,7 @@ TEST_P(FallbackTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) { // Tests that fallback mode is exited if the child policy becomes ready. TEST_P(FallbackTest, FallbackModeIsExitedAfterChildRready) { // Return an unreachable balancer and one fallback backend. - SetNextResolution({backends_[0]->port()}); + SetNextResolution({backends_[0]->port()}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannel({grpc_pick_unused_port_or_die()}); // Enter fallback mode because the LB channel fails to connect. WaitForBackend(0); @@ -2038,7 +2008,7 @@ class BalancerUpdateTest : public XdsEnd2endTest { // Tests that the old LB call is still used after the balancer address update as // long as that call is still alive. TEST_P(BalancerUpdateTest, UpdateBalancersButKeepUsingOriginalBalancer) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ {"locality0", {backends_[0]->port()}}, @@ -2091,7 +2061,7 @@ TEST_P(BalancerUpdateTest, UpdateBalancersButKeepUsingOriginalBalancer) { // xds keeps the initial connection (which by definition is also present in the // update). TEST_P(BalancerUpdateTest, Repeated) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); AdsServiceImpl::ResponseArgs args({ {"locality0", {backends_[0]->port()}}, @@ -2156,7 +2126,7 @@ TEST_P(BalancerUpdateTest, Repeated) { // backends according to the last balancer response, until a new balancer is // reachable. TEST_P(BalancerUpdateTest, DeadUpdate) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannel({balancers_[0]->port()}); AdsServiceImpl::ResponseArgs args({ {"locality0", {backends_[0]->port()}}, @@ -2235,7 +2205,7 @@ class ClientLoadReportingTest : public XdsEnd2endTest { // Tests that the load report received at the balancer is correct. TEST_P(ClientLoadReportingTest, Vanilla) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannel({balancers_[0]->port()}); const size_t kNumRpcsPerAddress = 100; // TODO(juanlishen): Partition the backends after multiple localities is @@ -2276,7 +2246,7 @@ TEST_P(ClientLoadReportingTest, Vanilla) { // Tests that if the balancer restarts, the client load report contains the // stats before and after the restart correctly. TEST_P(ClientLoadReportingTest, BalancerRestart) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannel({balancers_[0]->port()}); const size_t kNumBackendsFirstPass = backends_.size() / 2; const size_t kNumBackendsSecondPass = @@ -2342,7 +2312,7 @@ class ClientLoadReportingWithDropTest : public XdsEnd2endTest { // Tests that the drop stats are correctly reported by client load reporting. TEST_P(ClientLoadReportingWithDropTest, Vanilla) { - SetNextResolution({}); + SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcs = 3000; const uint32_t kDropPerMillionForLb = 100000; @@ -2404,66 +2374,28 @@ TEST_P(ClientLoadReportingWithDropTest, Vanilla) { EXPECT_EQ(1U, balancers_[0]->ads_service()->response_count()); } -grpc::string TestTypeName(const ::testing::TestParamInfo& info) { - return info.param.AsString(); -} +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, BasicTest, ::testing::Bool()); -// TODO(juanlishen): Load reporting disabled is currently tested only with DNS -// resolver. Once we implement CDS, test it via the xds resolver too. - -INSTANTIATE_TEST_SUITE_P(XdsTest, BasicTest, - ::testing::Values(TestType(false, true), - TestType(false, false), - TestType(true, true)), - &TestTypeName); - -INSTANTIATE_TEST_SUITE_P(XdsTest, SecureNamingTest, - ::testing::Values(TestType(false, true), - TestType(false, false), - TestType(true, true)), - &TestTypeName); - -INSTANTIATE_TEST_SUITE_P(XdsTest, LocalityMapTest, - ::testing::Values(TestType(false, true), - TestType(false, false), - TestType(true, true)), - &TestTypeName); - -INSTANTIATE_TEST_SUITE_P(XdsTest, FailoverTest, - ::testing::Values(TestType(false, true), - TestType(false, false), - TestType(true, true)), - &TestTypeName); - -INSTANTIATE_TEST_SUITE_P(XdsTest, DropTest, - ::testing::Values(TestType(false, true), - TestType(false, false), - TestType(true, true)), - &TestTypeName); +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, SecureNamingTest, ::testing::Bool()); + +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, LocalityMapTest, ::testing::Bool()); + +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, FailoverTest, ::testing::Bool()); + +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, DropTest, ::testing::Bool()); // Fallback does not work with xds resolver. -INSTANTIATE_TEST_SUITE_P(XdsTest, FallbackTest, - ::testing::Values(TestType(false, true), - TestType(false, false)), - &TestTypeName); - -INSTANTIATE_TEST_SUITE_P(XdsTest, BalancerUpdateTest, - ::testing::Values(TestType(false, true), - TestType(false, false), - TestType(true, true)), - &TestTypeName); - -// Load reporting tests are not run with load reporting disabled. -INSTANTIATE_TEST_SUITE_P(XdsTest, ClientLoadReportingTest, - ::testing::Values(TestType(false, true), - TestType(true, true)), - &TestTypeName); - -// Load reporting tests are not run with load reporting disabled. -INSTANTIATE_TEST_SUITE_P(XdsTest, ClientLoadReportingWithDropTest, - ::testing::Values(TestType(false, true), - TestType(true, true)), - &TestTypeName); +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, FallbackTest, + ::testing::Values(false)); + +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, BalancerUpdateTest, + ::testing::Bool()); + +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, ClientLoadReportingTest, + ::testing::Bool()); + +INSTANTIATE_TEST_SUITE_P(UsesXdsResolver, ClientLoadReportingWithDropTest, + ::testing::Bool()); } // namespace } // namespace testing