XdsClient: don't ignore resources after an invalid Resource wrapper (#30819)

pull/30820/head
Mark D. Roth 3 years ago committed by GitHub
parent fc4ce88e3a
commit 2142183ef4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      src/core/ext/xds/xds_api.cc
  2. 7
      src/core/ext/xds/xds_api.h
  3. 58
      src/core/ext/xds/xds_client.cc
  4. 23
      test/cpp/end2end/xds/xds_core_end2end_test.cc
  5. 23
      test/cpp/end2end/xds/xds_server.h

@ -390,13 +390,14 @@ absl::Status XdsApi::ParseAdsResponse(const XdsBootstrap::XdsServer& server,
absl::string_view serialized_resource = absl::string_view serialized_resource =
UpbStringToAbsl(google_protobuf_Any_value(resources[i])); UpbStringToAbsl(google_protobuf_Any_value(resources[i]));
// Unwrap Resource messages, if so wrapped. // Unwrap Resource messages, if so wrapped.
absl::string_view resource_name;
if (type_url == "envoy.api.v2.Resource" || if (type_url == "envoy.api.v2.Resource" ||
type_url == "envoy.service.discovery.v3.Resource") { type_url == "envoy.service.discovery.v3.Resource") {
const auto* resource_wrapper = envoy_service_discovery_v3_Resource_parse( const auto* resource_wrapper = envoy_service_discovery_v3_Resource_parse(
serialized_resource.data(), serialized_resource.size(), arena.ptr()); serialized_resource.data(), serialized_resource.size(), arena.ptr());
if (resource_wrapper == nullptr) { if (resource_wrapper == nullptr) {
return absl::InvalidArgumentError( parser->ResourceWrapperParsingFailed(i);
"Can't decode Resource proto wrapper"); continue;
} }
const auto* resource = const auto* resource =
envoy_service_discovery_v3_Resource_resource(resource_wrapper); envoy_service_discovery_v3_Resource_resource(resource_wrapper);
@ -405,8 +406,11 @@ absl::Status XdsApi::ParseAdsResponse(const XdsBootstrap::XdsServer& server,
"type.googleapis.com/"); "type.googleapis.com/");
serialized_resource = serialized_resource =
UpbStringToAbsl(google_protobuf_Any_value(resource)); UpbStringToAbsl(google_protobuf_Any_value(resource));
resource_name = UpbStringToAbsl(
envoy_service_discovery_v3_Resource_name(resource_wrapper));
} }
parser->ParseResource(context.arena, i, type_url, serialized_resource); parser->ParseResource(context.arena, i, type_url, resource_name,
serialized_resource);
} }
return absl::OkStatus(); return absl::OkStatus();
} }

@ -67,9 +67,16 @@ class XdsApi {
virtual absl::Status ProcessAdsResponseFields(AdsResponseFields fields) = 0; virtual absl::Status ProcessAdsResponseFields(AdsResponseFields fields) = 0;
// Called to parse each individual resource in the ADS response. // Called to parse each individual resource in the ADS response.
// Note that resource_name is non-empty only when the resource was
// wrapped in a Resource wrapper proto.
virtual void ParseResource(upb_Arena* arena, size_t idx, virtual void ParseResource(upb_Arena* arena, size_t idx,
absl::string_view type_url, absl::string_view type_url,
absl::string_view resource_name,
absl::string_view serialized_resource) = 0; absl::string_view serialized_resource) = 0;
// Called when a resource is wrapped in a Resource wrapper proto but
// we fail to deserialize the wrapper proto.
virtual void ResourceWrapperParsingFailed(size_t idx) = 0;
}; };
struct ClusterLoadReport { struct ClusterLoadReport {

@ -150,9 +150,12 @@ class XdsClient::ChannelState::AdsCallState
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void ParseResource(upb_Arena* arena, size_t idx, absl::string_view type_url, void ParseResource(upb_Arena* arena, size_t idx, absl::string_view type_url,
absl::string_view resource_name,
absl::string_view serialized_resource) override absl::string_view serialized_resource) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void ResourceWrapperParsingFailed(size_t idx) override;
Result TakeResult() { return std::move(result_); } Result TakeResult() { return std::move(result_); }
private: private:
@ -689,13 +692,16 @@ void UpdateResourceMetadataNacked(const std::string& version,
void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
upb_Arena* arena, size_t idx, absl::string_view type_url, upb_Arena* arena, size_t idx, absl::string_view type_url,
absl::string_view serialized_resource) { absl::string_view resource_name, absl::string_view serialized_resource) {
std::string error_prefix = absl::StrCat(
"resource index ", idx, ": ",
resource_name.empty() ? "" : absl::StrCat(resource_name, ": "));
// Check the type_url of the resource. // Check the type_url of the resource.
bool is_v2 = false; bool is_v2 = false;
if (!result_.type->IsType(type_url, &is_v2)) { if (!result_.type->IsType(type_url, &is_v2)) {
result_.errors.emplace_back( result_.errors.emplace_back(
absl::StrCat("resource index ", idx, ": incorrect resource type ", absl::StrCat(error_prefix, "incorrect resource type ", type_url,
type_url, " (should be ", result_.type_url, ")")); " (should be ", result_.type_url, ")"));
return; return;
} }
// Parse the resource. // Parse the resource.
@ -706,25 +712,29 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
result_.type->Decode(context, serialized_resource, is_v2); result_.type->Decode(context, serialized_resource, is_v2);
if (!result.ok()) { if (!result.ok()) {
result_.errors.emplace_back( result_.errors.emplace_back(
absl::StrCat("resource index ", idx, ": ", result.status().ToString())); absl::StrCat(error_prefix, result.status().ToString()));
return; return;
} }
// Check the resource name. // Check the resource name.
auto resource_name = if (resource_name.empty()) {
xds_client()->ParseXdsResourceName(result->name, result_.type); resource_name = result->name;
if (!resource_name.ok()) { error_prefix =
result_.errors.emplace_back(absl::StrCat( absl::StrCat("resource index ", idx, ": ", resource_name, ": ");
"resource index ", idx, ": Cannot parse xDS resource name \"", }
result->name, "\"")); auto parsed_resource_name =
xds_client()->ParseXdsResourceName(resource_name, result_.type);
if (!parsed_resource_name.ok()) {
result_.errors.emplace_back(
absl::StrCat(error_prefix, "Cannot parse xDS resource name"));
return; return;
} }
// Cancel resource-does-not-exist timer, if needed. // Cancel resource-does-not-exist timer, if needed.
auto timer_it = ads_call_state_->state_map_.find(result_.type); auto timer_it = ads_call_state_->state_map_.find(result_.type);
if (timer_it != ads_call_state_->state_map_.end()) { if (timer_it != ads_call_state_->state_map_.end()) {
auto it = auto it = timer_it->second.subscribed_resources.find(
timer_it->second.subscribed_resources.find(resource_name->authority); parsed_resource_name->authority);
if (it != timer_it->second.subscribed_resources.end()) { if (it != timer_it->second.subscribed_resources.end()) {
auto res_it = it->second.find(resource_name->key); auto res_it = it->second.find(parsed_resource_name->key);
if (res_it != it->second.end()) { if (res_it != it->second.end()) {
res_it->second->MaybeCancelTimer(); res_it->second->MaybeCancelTimer();
} }
@ -732,7 +742,7 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
} }
// Lookup the authority in the cache. // Lookup the authority in the cache.
auto authority_it = auto authority_it =
xds_client()->authority_state_map_.find(resource_name->authority); xds_client()->authority_state_map_.find(parsed_resource_name->authority);
if (authority_it == xds_client()->authority_state_map_.end()) { if (authority_it == xds_client()->authority_state_map_.end()) {
return; // Skip resource -- we don't have a subscription for it. return; // Skip resource -- we don't have a subscription for it.
} }
@ -744,14 +754,15 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
} }
auto& type_map = type_it->second; auto& type_map = type_it->second;
// Found type, so look up resource key. // Found type, so look up resource key.
auto it = type_map.find(resource_name->key); auto it = type_map.find(parsed_resource_name->key);
if (it == type_map.end()) { if (it == type_map.end()) {
return; // Skip resource -- we don't have a subscription for it. return; // Skip resource -- we don't have a subscription for it.
} }
ResourceState& resource_state = it->second; ResourceState& resource_state = it->second;
// If needed, record that we've seen this resource. // If needed, record that we've seen this resource.
if (result_.type->AllResourcesRequiredInSotW()) { if (result_.type->AllResourcesRequiredInSotW()) {
result_.resources_seen[resource_name->authority].insert(resource_name->key); result_.resources_seen[parsed_resource_name->authority].insert(
parsed_resource_name->key);
} }
// If we previously ignored the resource's deletion, log that we're // If we previously ignored the resource's deletion, log that we're
// now re-adding it. // now re-adding it.
@ -761,14 +772,14 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
"resource for which we previously ignored a deletion: type %s " "resource for which we previously ignored a deletion: type %s "
"name %s", "name %s",
xds_client(), ads_call_state_->chand()->server_.server_uri.c_str(), xds_client(), ads_call_state_->chand()->server_.server_uri.c_str(),
std::string(type_url).c_str(), result->name.c_str()); std::string(type_url).c_str(), std::string(resource_name).c_str());
resource_state.ignored_deletion = false; resource_state.ignored_deletion = false;
} }
// Update resource state based on whether the resource is valid. // Update resource state based on whether the resource is valid.
if (!result->resource.ok()) { if (!result->resource.ok()) {
result_.errors.emplace_back(absl::StrCat( result_.errors.emplace_back(absl::StrCat(
"resource index ", idx, ": ", result->name, error_prefix,
": validation error: ", result->resource.status().ToString())); "validation error: ", result->resource.status().ToString()));
xds_client()->NotifyWatchersOnErrorLocked( xds_client()->NotifyWatchersOnErrorLocked(
resource_state.watchers, resource_state.watchers,
absl::UnavailableError(absl::StrCat( absl::UnavailableError(absl::StrCat(
@ -787,7 +798,8 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[xds_client %p] %s resource %s identical to current, ignoring.", "[xds_client %p] %s resource %s identical to current, ignoring.",
xds_client(), result_.type_url.c_str(), result->name.c_str()); xds_client(), result_.type_url.c_str(),
std::string(resource_name).c_str());
} }
return; return;
} }
@ -810,6 +822,12 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
DEBUG_LOCATION); DEBUG_LOCATION);
} }
void XdsClient::ChannelState::AdsCallState::AdsResponseParser::
ResourceWrapperParsingFailed(size_t idx) {
result_.errors.emplace_back(absl::StrCat(
"resource index ", idx, ": Can't decode Resource proto wrapper"));
}
// //
// XdsClient::ChannelState::AdsCallState // XdsClient::ChannelState::AdsCallState
// //

@ -142,6 +142,8 @@ TEST_P(XdsClientTest, MultipleBadCdsResources) {
constexpr char kClusterName2[] = "cluster_name_2"; constexpr char kClusterName2[] = "cluster_name_2";
constexpr char kClusterName3[] = "cluster_name_3"; constexpr char kClusterName3[] = "cluster_name_3";
CreateAndStartBackends(1); CreateAndStartBackends(1);
balancer_->ads_service()->set_inject_bad_resources_for_resource_type(
kCdsTypeUrl);
// Add cluster with unsupported type. // Add cluster with unsupported type.
auto cluster = default_cluster_; auto cluster = default_cluster_;
cluster.set_name(kClusterName2); cluster.set_name(kClusterName2);
@ -178,14 +180,21 @@ TEST_P(XdsClientTest, MultipleBadCdsResources) {
// Send RPC. // Send RPC.
const auto response_state = WaitForCdsNack(DEBUG_LOCATION); const auto response_state = WaitForCdsNack(DEBUG_LOCATION);
ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK"; ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
EXPECT_THAT( EXPECT_EQ(
response_state->error_message, response_state->error_message,
::testing::ContainsRegex(absl::StrCat(kClusterName2, absl::StrCat(
": validation error.*" "xDS response validation errors: ["
"DiscoveryType is not valid.*", "resource index 0: Can't decode Resource proto wrapper; ",
kClusterName3, "resource index 1: foo: "
": validation error.*" "INVALID_ARGUMENT: Can't parse Cluster resource.; "
"DiscoveryType is not valid"))); "resource index 3: ",
kClusterName2,
": validation error: INVALID_ARGUMENT: errors parsing CDS resource: "
"[DiscoveryType is not valid.]; "
"resource index 4: ",
kClusterName3,
": validation error: INVALID_ARGUMENT: errors parsing CDS resource: "
"[DiscoveryType is not valid.]]"));
// RPCs for default cluster should succeed. // RPCs for default cluster should succeed.
std::vector<std::pair<std::string, std::string>> metadata_default_cluster = { std::vector<std::pair<std::string, std::string>> metadata_default_cluster = {
{"cluster", kDefaultClusterName}, {"cluster", kDefaultClusterName},

@ -99,6 +99,11 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
wrap_resources_ = wrap_resources; wrap_resources_ = wrap_resources;
} }
void set_inject_bad_resources_for_resource_type(const std::string& type_url) {
grpc_core::MutexLock lock(&ads_mu_);
inject_bad_resources_for_resource_type_ = type_url;
}
// Sets a resource to a particular value, overwriting any previous value. // Sets a resource to a particular value, overwriting any previous value.
void SetResource(google::protobuf::Any resource, const std::string& type_url, void SetResource(google::protobuf::Any resource, const std::string& type_url,
const std::string& name); const std::string& name);
@ -448,6 +453,23 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
parent_->resource_types_to_ignore_.end()) { parent_->resource_types_to_ignore_.end()) {
return; return;
} }
// Inject bad resources if needed.
if (parent_->inject_bad_resources_for_resource_type_ ==
v3_resource_type) {
response->emplace();
// Unparseable Resource wrapper.
auto* resource = (*response)->add_resources();
resource->set_type_url(
"type.googleapis.com/envoy.service.discovery.v3.Resource");
resource->set_value(std::string("\0", 1));
// Unparseable resource within Resource wrapper.
envoy::service::discovery::v3::Resource resource_wrapper;
resource_wrapper.set_name("foo");
resource = resource_wrapper.mutable_resource();
resource->set_type_url(v3_resource_type);
resource->set_value(std::string("\0", 1));
(*response)->add_resources()->PackFrom(resource_wrapper);
}
// Look at all the resource names in the request. // Look at all the resource names in the request.
auto& subscription_name_map = (*subscription_map)[v3_resource_type]; auto& subscription_name_map = (*subscription_map)[v3_resource_type];
auto& resource_type_state = parent_->resource_map_[v3_resource_type]; auto& resource_type_state = parent_->resource_map_[v3_resource_type];
@ -690,6 +712,7 @@ class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
ResourceMap resource_map_ ABSL_GUARDED_BY(ads_mu_); ResourceMap resource_map_ ABSL_GUARDED_BY(ads_mu_);
absl::optional<Status> forced_ads_failure_ ABSL_GUARDED_BY(ads_mu_); absl::optional<Status> forced_ads_failure_ ABSL_GUARDED_BY(ads_mu_);
bool wrap_resources_ ABSL_GUARDED_BY(ads_mu_) = false; bool wrap_resources_ ABSL_GUARDED_BY(ads_mu_) = false;
std::string inject_bad_resources_for_resource_type_ ABSL_GUARDED_BY(ads_mu_);
grpc_core::Mutex clients_mu_; grpc_core::Mutex clients_mu_;
std::set<std::string> clients_ ABSL_GUARDED_BY(clients_mu_); std::set<std::string> clients_ ABSL_GUARDED_BY(clients_mu_);

Loading…
Cancel
Save