XdsClient: simplify XdsResourceType::Decode() API (#31070)

* simplify XdsResourceType::Decode() API

* fix xds_client_test

* fix sanity
pull/31084/head
Mark D. Roth 2 years ago committed by GitHub
parent bd6fab6ab7
commit b340c37305
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      src/core/ext/xds/xds_client.cc
  2. 15
      src/core/ext/xds/xds_cluster.cc
  3. 7
      src/core/ext/xds/xds_cluster.h
  4. 14
      src/core/ext/xds/xds_endpoint.cc
  5. 7
      src/core/ext/xds/xds_endpoint.h
  6. 15
      src/core/ext/xds/xds_listener.cc
  7. 7
      src/core/ext/xds/xds_listener.h
  8. 17
      src/core/ext/xds/xds_resource_type.h
  9. 16
      src/core/ext/xds/xds_route_config.cc
  10. 6
      src/core/ext/xds/xds_route_config.h
  11. 38
      test/core/xds/xds_client_test.cc

@ -733,27 +733,25 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
XdsResourceType::DecodeContext context = {
xds_client(), ads_call_state_->chand()->server_, &grpc_xds_client_trace,
xds_client()->symtab_.ptr(), arena};
absl::StatusOr<XdsResourceType::DecodeResult> decode_result =
XdsResourceType::DecodeResult decode_result =
result_.type->Decode(context, serialized_resource, is_v2);
// If we didn't already have the resource name from the Resource
// wrapper, try to get it from the decoding result.
if (resource_name.empty()) {
if (decode_result.ok()) {
resource_name = decode_result->name;
if (decode_result.name.has_value()) {
resource_name = *decode_result.name;
error_prefix =
absl::StrCat("resource index ", idx, ": ", resource_name, ": ");
} else {
// We don't have any way of determining the resource name, so
// there's nothing more we can do here.
result_.errors.emplace_back(
absl::StrCat(error_prefix, decode_result.status().ToString()));
result_.errors.emplace_back(absl::StrCat(
error_prefix, decode_result.resource.status().ToString()));
return;
}
}
// If decoding failed, make sure we include the error in the NACK.
const absl::Status& decode_status = decode_result.ok()
? decode_result->resource.status()
: decode_result.status();
const absl::Status& decode_status = decode_result.resource.status();
if (!decode_status.ok()) {
result_.errors.emplace_back(
absl::StrCat(error_prefix, decode_status.ToString()));
@ -829,7 +827,7 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
// If it didn't change, ignore it.
if (resource_state.resource != nullptr &&
result_.type->ResourcesEqual(resource_state.resource.get(),
decode_result->resource->get())) {
decode_result.resource->get())) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] %s resource %s identical to current, ignoring.",
@ -839,7 +837,7 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
return;
}
// Update the resource state.
resource_state.resource = std::move(*decode_result->resource);
resource_state.resource = std::move(*decode_result.resource);
resource_state.meta = CreateResourceMetadataAcked(
std::string(serialized_resource), result_.version, update_time_);
// Notify watchers.

@ -24,6 +24,7 @@
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/strip.h"
@ -511,38 +512,40 @@ void MaybeLogCluster(const XdsResourceType::DecodeContext& context,
} // namespace
absl::StatusOr<XdsResourceType::DecodeResult> XdsClusterResourceType::Decode(
XdsResourceType::DecodeResult XdsClusterResourceType::Decode(
const XdsResourceType::DecodeContext& context,
absl::string_view serialized_resource, bool is_v2) const {
DecodeResult result;
// Parse serialized proto.
auto* resource = envoy_config_cluster_v3_Cluster_parse(
serialized_resource.data(), serialized_resource.size(), context.arena);
if (resource == nullptr) {
return absl::InvalidArgumentError("Can't parse Cluster resource.");
result.resource =
absl::InvalidArgumentError("Can't parse Cluster resource.");
return result;
}
MaybeLogCluster(context, resource);
// Validate resource.
DecodeResult result;
result.name =
UpbStringToStdString(envoy_config_cluster_v3_Cluster_name(resource));
auto cds_resource = CdsResourceParse(context, resource, is_v2);
if (!cds_resource.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_ERROR, "[xds_client %p] invalid Cluster %s: %s",
context.client, result.name.c_str(),
context.client, result.name->c_str(),
cds_resource.status().ToString().c_str());
}
result.resource = cds_resource.status();
} else {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_INFO, "[xds_client %p] parsed Cluster %s: %s", context.client,
result.name.c_str(), cds_resource->ToString().c_str());
result.name->c_str(), cds_resource->ToString().c_str());
}
auto resource = absl::make_unique<ResourceDataSubclass>();
resource->resource = std::move(*cds_resource);
result.resource = std::move(resource);
}
return std::move(result);
return result;
}
} // namespace grpc_core

@ -26,7 +26,6 @@
#include <string>
#include <vector>
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "envoy/config/cluster/v3/cluster.upbdefs.h"
@ -102,9 +101,9 @@ class XdsClusterResourceType
return "envoy.api.v2.Cluster";
}
absl::StatusOr<DecodeResult> Decode(
const XdsResourceType::DecodeContext& context,
absl::string_view serialized_resource, bool is_v2) const override;
DecodeResult Decode(const XdsResourceType::DecodeContext& context,
absl::string_view serialized_resource,
bool is_v2) const override;
bool AllResourcesRequiredInSotW() const override { return true; }

@ -25,6 +25,7 @@
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/types/optional.h"
@ -342,40 +343,41 @@ absl::StatusOr<XdsEndpointResource> EdsResourceParse(
} // namespace
absl::StatusOr<XdsResourceType::DecodeResult> XdsEndpointResourceType::Decode(
XdsResourceType::DecodeResult XdsEndpointResourceType::Decode(
const XdsResourceType::DecodeContext& context,
absl::string_view serialized_resource, bool is_v2) const {
DecodeResult result;
// Parse serialized proto.
auto* resource = envoy_config_endpoint_v3_ClusterLoadAssignment_parse(
serialized_resource.data(), serialized_resource.size(), context.arena);
if (resource == nullptr) {
return absl::InvalidArgumentError(
result.resource = absl::InvalidArgumentError(
"Can't parse ClusterLoadAssignment resource.");
return result;
}
MaybeLogClusterLoadAssignment(context, resource);
// Validate resource.
DecodeResult result;
result.name = UpbStringToStdString(
envoy_config_endpoint_v3_ClusterLoadAssignment_cluster_name(resource));
auto eds_resource = EdsResourceParse(context, resource, is_v2);
if (!eds_resource.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_ERROR, "[xds_client %p] invalid ClusterLoadAssignment %s: %s",
context.client, result.name.c_str(),
context.client, result.name->c_str(),
eds_resource.status().ToString().c_str());
}
result.resource = eds_resource.status();
} else {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_INFO, "[xds_client %p] parsed ClusterLoadAssignment %s: %s",
context.client, result.name.c_str(),
context.client, result.name->c_str(),
eds_resource->ToString().c_str());
}
auto resource = absl::make_unique<ResourceDataSubclass>();
resource->resource = std::move(*eds_resource);
result.resource = std::move(resource);
}
return std::move(result);
return result;
}
} // namespace grpc_core

@ -28,7 +28,6 @@
#include <utility>
#include <vector>
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "envoy/config/endpoint/v3/endpoint.upbdefs.h"
#include "upb/def.h"
@ -129,9 +128,9 @@ class XdsEndpointResourceType
return "envoy.api.v2.ClusterLoadAssignment";
}
absl::StatusOr<DecodeResult> Decode(
const XdsResourceType::DecodeContext& context,
absl::string_view serialized_resource, bool is_v2) const override;
DecodeResult Decode(const XdsResourceType::DecodeContext& context,
absl::string_view serialized_resource,
bool is_v2) const override;
void InitUpbSymtab(upb_DefPool* symtab) const override {
envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef(symtab);

@ -25,6 +25,7 @@
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
@ -1061,39 +1062,41 @@ void MaybeLogListener(const XdsResourceType::DecodeContext& context,
} // namespace
absl::StatusOr<XdsResourceType::DecodeResult> XdsListenerResourceType::Decode(
XdsResourceType::DecodeResult XdsListenerResourceType::Decode(
const XdsResourceType::DecodeContext& context,
absl::string_view serialized_resource, bool is_v2) const {
DecodeResult result;
// Parse serialized proto.
auto* resource = envoy_config_listener_v3_Listener_parse(
serialized_resource.data(), serialized_resource.size(), context.arena);
if (resource == nullptr) {
return absl::InvalidArgumentError("Can't parse Listener resource.");
result.resource =
absl::InvalidArgumentError("Can't parse Listener resource.");
return result;
}
MaybeLogListener(context, resource);
// Validate resource.
DecodeResult result;
result.name =
UpbStringToStdString(envoy_config_listener_v3_Listener_name(resource));
auto listener = LdsResourceParse(context, resource, is_v2);
if (!listener.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_ERROR, "[xds_client %p] invalid Listener %s: %s",
context.client, result.name.c_str(),
context.client, result.name->c_str(),
listener.status().ToString().c_str());
}
result.resource = listener.status();
} else {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_INFO, "[xds_client %p] parsed Listener %s: %s",
context.client, result.name.c_str(),
context.client, result.name->c_str(),
listener->ToString().c_str());
}
auto resource = absl::make_unique<ResourceDataSubclass>();
resource->resource = std::move(*listener);
result.resource = std::move(resource);
}
return std::move(result);
return result;
}
} // namespace grpc_core

@ -30,7 +30,6 @@
#include <string>
#include <vector>
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "envoy/config/listener/v3/listener.upbdefs.h"
@ -210,9 +209,9 @@ class XdsListenerResourceType
return "envoy.api.v2.Listener";
}
absl::StatusOr<DecodeResult> Decode(
const XdsResourceType::DecodeContext& context,
absl::string_view serialized_resource, bool is_v2) const override;
DecodeResult Decode(const XdsResourceType::DecodeContext& context,
absl::string_view serialized_resource,
bool is_v2) const override;
bool AllResourcesRequiredInSotW() const override { return true; }

@ -23,6 +23,7 @@
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "upb/arena.h"
#include "upb/def.h"
@ -55,7 +56,11 @@ class XdsResourceType {
// Result returned by Decode().
struct DecodeResult {
std::string name;
// The resource's name, if it can be determined.
// If the name is not returned, the resource field should contain a
// non-OK status.
absl::optional<std::string> name;
// The parsed and validated resource, or an error status.
absl::StatusOr<std::unique_ptr<ResourceData>> resource;
};
@ -68,13 +73,9 @@ class XdsResourceType {
virtual absl::string_view v2_type_url() const = 0;
// Decodes and validates a serialized resource proto.
// If the resource fails protobuf deserialization, returns non-OK status.
// If the deserialized resource fails validation, returns a DecodeResult
// whose resource field is set to a non-OK status.
// Otherwise, returns a DecodeResult with a valid resource.
virtual absl::StatusOr<DecodeResult> Decode(
const DecodeContext& context, absl::string_view serialized_resource,
bool is_v2) const = 0;
virtual DecodeResult Decode(const DecodeContext& context,
absl::string_view serialized_resource,
bool is_v2) const = 0;
// Returns true if r1 and r2 are equal.
// Must be invoked only on resources returned by this object's Decode()

@ -1112,41 +1112,41 @@ void MaybeLogRouteConfiguration(
} // namespace
absl::StatusOr<XdsResourceType::DecodeResult>
XdsRouteConfigResourceType::Decode(
XdsResourceType::DecodeResult XdsRouteConfigResourceType::Decode(
const XdsResourceType::DecodeContext& context,
absl::string_view serialized_resource, bool /*is_v2*/) const {
DecodeResult result;
// Parse serialized proto.
auto* resource = envoy_config_route_v3_RouteConfiguration_parse(
serialized_resource.data(), serialized_resource.size(), context.arena);
if (resource == nullptr) {
return absl::InvalidArgumentError(
"Can't parse RouteConfiguration resource.");
result.resource =
absl::InvalidArgumentError("Can't parse RouteConfiguration resource.");
return result;
}
MaybeLogRouteConfiguration(context, resource);
// Validate resource.
DecodeResult result;
result.name = UpbStringToStdString(
envoy_config_route_v3_RouteConfiguration_name(resource));
auto rds_update = XdsRouteConfigResource::Parse(context, resource);
if (!rds_update.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_ERROR, "[xds_client %p] invalid RouteConfiguration %s: %s",
context.client, result.name.c_str(),
context.client, result.name->c_str(),
rds_update.status().ToString().c_str());
}
result.resource = rds_update.status();
} else {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_INFO, "[xds_client %p] parsed RouteConfiguration %s: %s",
context.client, result.name.c_str(),
context.client, result.name->c_str(),
rds_update->ToString().c_str());
}
auto resource = absl::make_unique<ResourceDataSubclass>();
resource->resource = std::move(*rds_update);
result.resource = std::move(resource);
}
return std::move(result);
return result;
}
} // namespace grpc_core

@ -226,9 +226,9 @@ class XdsRouteConfigResourceType
return "envoy.api.v2.RouteConfiguration";
}
absl::StatusOr<DecodeResult> Decode(
const XdsResourceType::DecodeContext& context,
absl::string_view serialized_resource, bool /*is_v2*/) const override;
DecodeResult Decode(const XdsResourceType::DecodeContext& context,
absl::string_view serialized_resource,
bool /*is_v2*/) const override;
void InitUpbSymtab(upb_DefPool* symtab) const override {
envoy_config_route_v3_RouteConfiguration_getmsgdef(symtab);

@ -318,30 +318,32 @@ class XdsClientTest : public ::testing::Test {
absl::string_view v2_type_url() const override {
return ResourceStruct::TypeUrlV2();
}
absl::StatusOr<XdsResourceType::DecodeResult> Decode(
XdsResourceType::DecodeResult Decode(
const XdsResourceType::DecodeContext& /*context*/,
absl::string_view serialized_resource, bool /*is_v2*/) const override {
auto json = Json::Parse(serialized_resource);
if (!json.ok()) return json.status();
absl::StatusOr<ResourceStruct> foo = LoadFromJson<ResourceStruct>(*json);
XdsResourceType::DecodeResult result;
if (!foo.ok()) {
auto it = json->object_value().find("name");
if (it == json->object_value().end()) {
return absl::InvalidArgumentError(
"cannot determine name for invalid resource");
}
result.name = it->second.string_value();
result.resource = foo.status();
if (!json.ok()) {
result.resource = json.status();
} else {
result.name = foo->name;
auto resource = absl::make_unique<typename XdsResourceTypeImpl<
XdsTestResourceType<ResourceStruct>,
ResourceStruct>::ResourceDataSubclass>();
resource->resource = std::move(*foo);
result.resource = std::move(resource);
absl::StatusOr<ResourceStruct> foo =
LoadFromJson<ResourceStruct>(*json);
if (!foo.ok()) {
auto it = json->object_value().find("name");
if (it != json->object_value().end()) {
result.name = it->second.string_value();
}
result.resource = foo.status();
} else {
result.name = foo->name;
auto resource = absl::make_unique<typename XdsResourceTypeImpl<
XdsTestResourceType<ResourceStruct>,
ResourceStruct>::ResourceDataSubclass>();
resource->resource = std::move(*foo);
result.resource = std::move(resource);
}
}
return std::move(result);
return result;
}
void InitUpbSymtab(upb_DefPool* /*symtab*/) const override {}

Loading…
Cancel
Save