From 2142183ef4b3837aa9860f667088580ff276a195 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 1 Sep 2022 14:02:00 -0700 Subject: [PATCH] XdsClient: don't ignore resources after an invalid Resource wrapper (#30819) --- src/core/ext/xds/xds_api.cc | 10 +++- src/core/ext/xds/xds_api.h | 7 +++ src/core/ext/xds/xds_client.cc | 58 ++++++++++++------- test/cpp/end2end/xds/xds_core_end2end_test.cc | 23 +++++--- test/cpp/end2end/xds/xds_server.h | 23 ++++++++ 5 files changed, 91 insertions(+), 30 deletions(-) diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc index b05736f624c..6e0afb7dbe8 100644 --- a/src/core/ext/xds/xds_api.cc +++ b/src/core/ext/xds/xds_api.cc @@ -390,13 +390,14 @@ absl::Status XdsApi::ParseAdsResponse(const XdsBootstrap::XdsServer& server, absl::string_view serialized_resource = UpbStringToAbsl(google_protobuf_Any_value(resources[i])); // Unwrap Resource messages, if so wrapped. + absl::string_view resource_name; if (type_url == "envoy.api.v2.Resource" || type_url == "envoy.service.discovery.v3.Resource") { const auto* resource_wrapper = envoy_service_discovery_v3_Resource_parse( serialized_resource.data(), serialized_resource.size(), arena.ptr()); if (resource_wrapper == nullptr) { - return absl::InvalidArgumentError( - "Can't decode Resource proto wrapper"); + parser->ResourceWrapperParsingFailed(i); + continue; } const auto* resource = envoy_service_discovery_v3_Resource_resource(resource_wrapper); @@ -405,8 +406,11 @@ absl::Status XdsApi::ParseAdsResponse(const XdsBootstrap::XdsServer& server, "type.googleapis.com/"); serialized_resource = UpbStringToAbsl(google_protobuf_Any_value(resource)); + resource_name = UpbStringToAbsl( + envoy_service_discovery_v3_Resource_name(resource_wrapper)); } - parser->ParseResource(context.arena, i, type_url, serialized_resource); + parser->ParseResource(context.arena, i, type_url, resource_name, + serialized_resource); } return absl::OkStatus(); } diff --git a/src/core/ext/xds/xds_api.h b/src/core/ext/xds/xds_api.h index 82024ab7240..86b6c353250 100644 --- a/src/core/ext/xds/xds_api.h +++ b/src/core/ext/xds/xds_api.h @@ -67,9 +67,16 @@ class XdsApi { virtual absl::Status ProcessAdsResponseFields(AdsResponseFields fields) = 0; // Called to parse each individual resource in the ADS response. + // Note that resource_name is non-empty only when the resource was + // wrapped in a Resource wrapper proto. virtual void ParseResource(upb_Arena* arena, size_t idx, absl::string_view type_url, + absl::string_view resource_name, absl::string_view serialized_resource) = 0; + + // Called when a resource is wrapped in a Resource wrapper proto but + // we fail to deserialize the wrapper proto. + virtual void ResourceWrapperParsingFailed(size_t idx) = 0; }; struct ClusterLoadReport { diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 9d26eca3f2e..817812dd78b 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -150,9 +150,12 @@ class XdsClient::ChannelState::AdsCallState ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); void ParseResource(upb_Arena* arena, size_t idx, absl::string_view type_url, + absl::string_view resource_name, absl::string_view serialized_resource) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + void ResourceWrapperParsingFailed(size_t idx) override; + Result TakeResult() { return std::move(result_); } private: @@ -689,13 +692,16 @@ void UpdateResourceMetadataNacked(const std::string& version, void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( upb_Arena* arena, size_t idx, absl::string_view type_url, - absl::string_view serialized_resource) { + absl::string_view resource_name, absl::string_view serialized_resource) { + std::string error_prefix = absl::StrCat( + "resource index ", idx, ": ", + resource_name.empty() ? "" : absl::StrCat(resource_name, ": ")); // Check the type_url of the resource. bool is_v2 = false; if (!result_.type->IsType(type_url, &is_v2)) { result_.errors.emplace_back( - absl::StrCat("resource index ", idx, ": incorrect resource type ", - type_url, " (should be ", result_.type_url, ")")); + absl::StrCat(error_prefix, "incorrect resource type ", type_url, + " (should be ", result_.type_url, ")")); return; } // Parse the resource. @@ -706,25 +712,29 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( result_.type->Decode(context, serialized_resource, is_v2); if (!result.ok()) { result_.errors.emplace_back( - absl::StrCat("resource index ", idx, ": ", result.status().ToString())); + absl::StrCat(error_prefix, result.status().ToString())); return; } // Check the resource name. - auto resource_name = - xds_client()->ParseXdsResourceName(result->name, result_.type); - if (!resource_name.ok()) { - result_.errors.emplace_back(absl::StrCat( - "resource index ", idx, ": Cannot parse xDS resource name \"", - result->name, "\"")); + if (resource_name.empty()) { + resource_name = result->name; + error_prefix = + absl::StrCat("resource index ", idx, ": ", resource_name, ": "); + } + auto parsed_resource_name = + xds_client()->ParseXdsResourceName(resource_name, result_.type); + if (!parsed_resource_name.ok()) { + result_.errors.emplace_back( + absl::StrCat(error_prefix, "Cannot parse xDS resource name")); return; } // Cancel resource-does-not-exist timer, if needed. auto timer_it = ads_call_state_->state_map_.find(result_.type); if (timer_it != ads_call_state_->state_map_.end()) { - auto it = - timer_it->second.subscribed_resources.find(resource_name->authority); + auto it = timer_it->second.subscribed_resources.find( + parsed_resource_name->authority); if (it != timer_it->second.subscribed_resources.end()) { - auto res_it = it->second.find(resource_name->key); + auto res_it = it->second.find(parsed_resource_name->key); if (res_it != it->second.end()) { res_it->second->MaybeCancelTimer(); } @@ -732,7 +742,7 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( } // Lookup the authority in the cache. auto authority_it = - xds_client()->authority_state_map_.find(resource_name->authority); + xds_client()->authority_state_map_.find(parsed_resource_name->authority); if (authority_it == xds_client()->authority_state_map_.end()) { return; // Skip resource -- we don't have a subscription for it. } @@ -744,14 +754,15 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( } auto& type_map = type_it->second; // Found type, so look up resource key. - auto it = type_map.find(resource_name->key); + auto it = type_map.find(parsed_resource_name->key); if (it == type_map.end()) { return; // Skip resource -- we don't have a subscription for it. } ResourceState& resource_state = it->second; // If needed, record that we've seen this resource. if (result_.type->AllResourcesRequiredInSotW()) { - result_.resources_seen[resource_name->authority].insert(resource_name->key); + result_.resources_seen[parsed_resource_name->authority].insert( + parsed_resource_name->key); } // If we previously ignored the resource's deletion, log that we're // now re-adding it. @@ -761,14 +772,14 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( "resource for which we previously ignored a deletion: type %s " "name %s", xds_client(), ads_call_state_->chand()->server_.server_uri.c_str(), - std::string(type_url).c_str(), result->name.c_str()); + std::string(type_url).c_str(), std::string(resource_name).c_str()); resource_state.ignored_deletion = false; } // Update resource state based on whether the resource is valid. if (!result->resource.ok()) { result_.errors.emplace_back(absl::StrCat( - "resource index ", idx, ": ", result->name, - ": validation error: ", result->resource.status().ToString())); + error_prefix, + "validation error: ", result->resource.status().ToString())); xds_client()->NotifyWatchersOnErrorLocked( resource_state.watchers, absl::UnavailableError(absl::StrCat( @@ -787,7 +798,8 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] %s resource %s identical to current, ignoring.", - xds_client(), result_.type_url.c_str(), result->name.c_str()); + xds_client(), result_.type_url.c_str(), + std::string(resource_name).c_str()); } return; } @@ -810,6 +822,12 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( DEBUG_LOCATION); } +void XdsClient::ChannelState::AdsCallState::AdsResponseParser:: + ResourceWrapperParsingFailed(size_t idx) { + result_.errors.emplace_back(absl::StrCat( + "resource index ", idx, ": Can't decode Resource proto wrapper")); +} + // // XdsClient::ChannelState::AdsCallState // diff --git a/test/cpp/end2end/xds/xds_core_end2end_test.cc b/test/cpp/end2end/xds/xds_core_end2end_test.cc index 13a061dc2f6..38cc2bc7c79 100644 --- a/test/cpp/end2end/xds/xds_core_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_core_end2end_test.cc @@ -142,6 +142,8 @@ TEST_P(XdsClientTest, MultipleBadCdsResources) { constexpr char kClusterName2[] = "cluster_name_2"; constexpr char kClusterName3[] = "cluster_name_3"; CreateAndStartBackends(1); + balancer_->ads_service()->set_inject_bad_resources_for_resource_type( + kCdsTypeUrl); // Add cluster with unsupported type. auto cluster = default_cluster_; cluster.set_name(kClusterName2); @@ -178,14 +180,21 @@ TEST_P(XdsClientTest, MultipleBadCdsResources) { // Send RPC. const auto response_state = WaitForCdsNack(DEBUG_LOCATION); ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; - EXPECT_THAT( + EXPECT_EQ( response_state->error_message, - ::testing::ContainsRegex(absl::StrCat(kClusterName2, - ": validation error.*" - "DiscoveryType is not valid.*", - kClusterName3, - ": validation error.*" - "DiscoveryType is not valid"))); + absl::StrCat( + "xDS response validation errors: [" + "resource index 0: Can't decode Resource proto wrapper; ", + "resource index 1: foo: " + "INVALID_ARGUMENT: Can't parse Cluster resource.; " + "resource index 3: ", + kClusterName2, + ": validation error: INVALID_ARGUMENT: errors parsing CDS resource: " + "[DiscoveryType is not valid.]; " + "resource index 4: ", + kClusterName3, + ": validation error: INVALID_ARGUMENT: errors parsing CDS resource: " + "[DiscoveryType is not valid.]]")); // RPCs for default cluster should succeed. std::vector> metadata_default_cluster = { {"cluster", kDefaultClusterName}, diff --git a/test/cpp/end2end/xds/xds_server.h b/test/cpp/end2end/xds/xds_server.h index 12e226c33c3..f45726e97da 100644 --- a/test/cpp/end2end/xds/xds_server.h +++ b/test/cpp/end2end/xds/xds_server.h @@ -99,6 +99,11 @@ class AdsServiceImpl : public std::enable_shared_from_this { wrap_resources_ = wrap_resources; } + void set_inject_bad_resources_for_resource_type(const std::string& type_url) { + grpc_core::MutexLock lock(&ads_mu_); + inject_bad_resources_for_resource_type_ = type_url; + } + // Sets a resource to a particular value, overwriting any previous value. void SetResource(google::protobuf::Any resource, const std::string& type_url, const std::string& name); @@ -448,6 +453,23 @@ class AdsServiceImpl : public std::enable_shared_from_this { parent_->resource_types_to_ignore_.end()) { return; } + // Inject bad resources if needed. + if (parent_->inject_bad_resources_for_resource_type_ == + v3_resource_type) { + response->emplace(); + // Unparseable Resource wrapper. + auto* resource = (*response)->add_resources(); + resource->set_type_url( + "type.googleapis.com/envoy.service.discovery.v3.Resource"); + resource->set_value(std::string("\0", 1)); + // Unparseable resource within Resource wrapper. + envoy::service::discovery::v3::Resource resource_wrapper; + resource_wrapper.set_name("foo"); + resource = resource_wrapper.mutable_resource(); + resource->set_type_url(v3_resource_type); + resource->set_value(std::string("\0", 1)); + (*response)->add_resources()->PackFrom(resource_wrapper); + } // Look at all the resource names in the request. auto& subscription_name_map = (*subscription_map)[v3_resource_type]; auto& resource_type_state = parent_->resource_map_[v3_resource_type]; @@ -690,6 +712,7 @@ class AdsServiceImpl : public std::enable_shared_from_this { ResourceMap resource_map_ ABSL_GUARDED_BY(ads_mu_); absl::optional forced_ads_failure_ ABSL_GUARDED_BY(ads_mu_); bool wrap_resources_ ABSL_GUARDED_BY(ads_mu_) = false; + std::string inject_bad_resources_for_resource_type_ ABSL_GUARDED_BY(ads_mu_); grpc_core::Mutex clients_mu_; std::set clients_ ABSL_GUARDED_BY(clients_mu_);