XdsClient: use XdsResourceType abstraction throughout XdsClient (#28165)

* WIP

* introduce XdsResourceType API and change Listener parsing to use it

* converted RouteConfig parsing

* convert cluster and endpoint parsing

* cleanup

* clang-format

* attempt to work around compiler problems

* move XdsResourceType to its own file, and move endpoint code out of XdsApi

* move cluster parsing to its own file

* move route config parsing to its own file

* move listener parsing to its own file

* clang-format

* minor cleanup

* plumbed XdsResourceType throughout XdsClient

* a bit of cleanup

* more cleanup

* construct full resource names before calling XdsApi::CreateAdsRequest()

* remove some unneeded code

* clean up includes and have XdsResourceType initialize the upb symtab

* more cleanup of unnecessary code

* more cleanup

* update comment

* clang-format

* add missing virtual dtor

* fix build

* remove comment

* add missing virtual dtor
pull/28231/head^2
Mark D. Roth 3 years ago committed by GitHub
parent 7cfd399dd6
commit d1448872fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 1
      CMakeLists.txt
  3. 2
      Makefile
  4. 1
      build_autogenerated.yaml
  5. 1
      config.m4
  6. 1
      config.w32
  7. 1
      gRPC-Core.podspec
  8. 1
      grpc.gemspec
  9. 1
      grpc.gyp
  10. 1
      package.xml
  11. 3
      src/core/ext/xds/upb_utils.h
  12. 447
      src/core/ext/xds/xds_api.cc
  13. 137
      src/core/ext/xds/xds_api.h
  14. 1
      src/core/ext/xds/xds_certificate_provider.h
  15. 1369
      src/core/ext/xds/xds_client.cc
  16. 174
      src/core/ext/xds/xds_client.h
  17. 13
      src/core/ext/xds/xds_cluster.cc
  18. 26
      src/core/ext/xds/xds_cluster.h
  19. 15
      src/core/ext/xds/xds_endpoint.cc
  20. 19
      src/core/ext/xds/xds_endpoint.h
  21. 12
      src/core/ext/xds/xds_listener.cc
  22. 25
      src/core/ext/xds/xds_listener.h
  23. 71
      src/core/ext/xds/xds_resource_type.cc
  24. 64
      src/core/ext/xds/xds_resource_type.h
  25. 15
      src/core/ext/xds/xds_route_config.cc
  26. 19
      src/core/ext/xds/xds_route_config.h
  27. 3
      src/core/ext/xds/xds_routing.h
  28. 2
      src/core/lib/security/credentials/xds/xds_credentials.h
  29. 1
      src/python/grpcio/grpc_core_dependencies.py
  30. 1
      tools/doxygen/Doxyfile.c++.internal
  31. 1
      tools/doxygen/Doxyfile.core.internal

@ -2621,6 +2621,7 @@ grpc_cc_library(
"src/core/ext/xds/xds_http_fault_filter.cc",
"src/core/ext/xds/xds_http_filters.cc",
"src/core/ext/xds/xds_listener.cc",
"src/core/ext/xds/xds_resource_type.cc",
"src/core/ext/xds/xds_route_config.cc",
"src/core/ext/xds/xds_routing.cc",
"src/core/lib/security/credentials/xds/xds_credentials.cc",

1
CMakeLists.txt generated

@ -1855,6 +1855,7 @@ add_library(grpc
src/core/ext/xds/xds_http_fault_filter.cc
src/core/ext/xds/xds_http_filters.cc
src/core/ext/xds/xds_listener.cc
src/core/ext/xds/xds_resource_type.cc
src/core/ext/xds/xds_route_config.cc
src/core/ext/xds/xds_routing.cc
src/core/ext/xds/xds_server_config_fetcher.cc

2
Makefile generated

@ -1353,6 +1353,7 @@ LIBGRPC_SRC = \
src/core/ext/xds/xds_http_fault_filter.cc \
src/core/ext/xds/xds_http_filters.cc \
src/core/ext/xds/xds_listener.cc \
src/core/ext/xds/xds_resource_type.cc \
src/core/ext/xds/xds_route_config.cc \
src/core/ext/xds/xds_routing.cc \
src/core/ext/xds/xds_server_config_fetcher.cc \
@ -2963,6 +2964,7 @@ src/core/ext/xds/xds_endpoint.cc: $(OPENSSL_DEP)
src/core/ext/xds/xds_http_fault_filter.cc: $(OPENSSL_DEP)
src/core/ext/xds/xds_http_filters.cc: $(OPENSSL_DEP)
src/core/ext/xds/xds_listener.cc: $(OPENSSL_DEP)
src/core/ext/xds/xds_resource_type.cc: $(OPENSSL_DEP)
src/core/ext/xds/xds_route_config.cc: $(OPENSSL_DEP)
src/core/ext/xds/xds_routing.cc: $(OPENSSL_DEP)
src/core/ext/xds/xds_server_config_fetcher.cc: $(OPENSSL_DEP)

@ -1318,6 +1318,7 @@ libs:
- src/core/ext/xds/xds_http_fault_filter.cc
- src/core/ext/xds/xds_http_filters.cc
- src/core/ext/xds/xds_listener.cc
- src/core/ext/xds/xds_resource_type.cc
- src/core/ext/xds/xds_route_config.cc
- src/core/ext/xds/xds_routing.cc
- src/core/ext/xds/xds_server_config_fetcher.cc

1
config.m4 generated

@ -371,6 +371,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/xds/xds_http_fault_filter.cc \
src/core/ext/xds/xds_http_filters.cc \
src/core/ext/xds/xds_listener.cc \
src/core/ext/xds/xds_resource_type.cc \
src/core/ext/xds/xds_route_config.cc \
src/core/ext/xds/xds_routing.cc \
src/core/ext/xds/xds_server_config_fetcher.cc \

1
config.w32 generated

@ -337,6 +337,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\xds\\xds_http_fault_filter.cc " +
"src\\core\\ext\\xds\\xds_http_filters.cc " +
"src\\core\\ext\\xds\\xds_listener.cc " +
"src\\core\\ext\\xds\\xds_resource_type.cc " +
"src\\core\\ext\\xds\\xds_route_config.cc " +
"src\\core\\ext\\xds\\xds_routing.cc " +
"src\\core\\ext\\xds\\xds_server_config_fetcher.cc " +

1
gRPC-Core.podspec generated

@ -834,6 +834,7 @@ Pod::Spec.new do |s|
'src/core/ext/xds/xds_http_filters.h',
'src/core/ext/xds/xds_listener.cc',
'src/core/ext/xds/xds_listener.h',
'src/core/ext/xds/xds_resource_type.cc',
'src/core/ext/xds/xds_resource_type.h',
'src/core/ext/xds/xds_route_config.cc',
'src/core/ext/xds/xds_route_config.h',

1
grpc.gemspec generated

@ -753,6 +753,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/xds/xds_http_filters.h )
s.files += %w( src/core/ext/xds/xds_listener.cc )
s.files += %w( src/core/ext/xds/xds_listener.h )
s.files += %w( src/core/ext/xds/xds_resource_type.cc )
s.files += %w( src/core/ext/xds/xds_resource_type.h )
s.files += %w( src/core/ext/xds/xds_route_config.cc )
s.files += %w( src/core/ext/xds/xds_route_config.h )

1
grpc.gyp generated

@ -808,6 +808,7 @@
'src/core/ext/xds/xds_http_fault_filter.cc',
'src/core/ext/xds/xds_http_filters.cc',
'src/core/ext/xds/xds_listener.cc',
'src/core/ext/xds/xds_resource_type.cc',
'src/core/ext/xds/xds_route_config.cc',
'src/core/ext/xds/xds_routing.cc',
'src/core/ext/xds/xds_server_config_fetcher.cc',

1
package.xml generated

@ -733,6 +733,7 @@
<file baseinstalldir="/" name="src/core/ext/xds/xds_http_filters.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/xds/xds_listener.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/xds/xds_listener.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/xds/xds_resource_type.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/xds/xds_resource_type.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/xds/xds_route_config.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/xds/xds_route_config.h" role="src" />

@ -33,6 +33,9 @@ namespace grpc_core {
class XdsClient;
// TODO(roth): Rethink this. All fields except symtab and arena should come
// from XdsClient, injected into XdsResourceType::Decode() somehow without
// passing through XdsApi code, maybe via the AdsResponseParser.
struct XdsEncodingContext {
XdsClient* client; // Used only for logging. Unsafe for dereferencing.
TraceFlag* tracer;

@ -18,64 +18,21 @@
#include "src/core/ext/xds/xds_api.h"
#include <algorithm>
#include <cctype>
#include <cstdint>
#include <cstdlib>
#include <set>
#include <string>
#include <vector>
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include "envoy/admin/v3/config_dump.upb.h"
#include "envoy/config/cluster/v3/circuit_breaker.upb.h"
#include "envoy/config/cluster/v3/cluster.upb.h"
#include "envoy/config/cluster/v3/cluster.upbdefs.h"
#include "envoy/config/core/v3/address.upb.h"
#include "envoy/config/core/v3/base.upb.h"
#include "envoy/config/core/v3/base.upbdefs.h"
#include "envoy/config/core/v3/config_source.upb.h"
#include "envoy/config/core/v3/health_check.upb.h"
#include "envoy/config/core/v3/protocol.upb.h"
#include "envoy/config/endpoint/v3/endpoint.upb.h"
#include "envoy/config/endpoint/v3/endpoint.upbdefs.h"
#include "envoy/config/endpoint/v3/endpoint_components.upb.h"
#include "envoy/config/endpoint/v3/load_report.upb.h"
#include "envoy/config/listener/v3/api_listener.upb.h"
#include "envoy/config/listener/v3/listener.upb.h"
#include "envoy/config/listener/v3/listener.upbdefs.h"
#include "envoy/config/listener/v3/listener_components.upb.h"
#include "envoy/config/route/v3/route.upb.h"
#include "envoy/config/route/v3/route.upbdefs.h"
#include "envoy/config/route/v3/route_components.upb.h"
#include "envoy/config/route/v3/route_components.upbdefs.h"
#include "envoy/extensions/clusters/aggregate/v3/cluster.upb.h"
#include "envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h"
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h"
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h"
#include "envoy/extensions/transport_sockets/tls/v3/common.upb.h"
#include "envoy/extensions/transport_sockets/tls/v3/tls.upb.h"
#include "envoy/extensions/transport_sockets/tls/v3/tls.upbdefs.h"
#include "envoy/service/cluster/v3/cds.upb.h"
#include "envoy/service/cluster/v3/cds.upbdefs.h"
#include "envoy/service/discovery/v3/discovery.upb.h"
#include "envoy/service/discovery/v3/discovery.upbdefs.h"
#include "envoy/service/endpoint/v3/eds.upb.h"
#include "envoy/service/endpoint/v3/eds.upbdefs.h"
#include "envoy/service/listener/v3/lds.upb.h"
#include "envoy/service/load_stats/v3/lrs.upb.h"
#include "envoy/service/load_stats/v3/lrs.upbdefs.h"
#include "envoy/service/route/v3/rds.upb.h"
#include "envoy/service/route/v3/rds.upbdefs.h"
#include "envoy/service/status/v3/csds.upb.h"
#include "envoy/service/status/v3/csds.upbdefs.h"
#include "envoy/type/matcher/v3/regex.upb.h"
#include "envoy/type/matcher/v3/string.upb.h"
#include "envoy/type/v3/percent.upb.h"
#include "envoy/type/v3/range.upb.h"
#include "google/protobuf/any.upb.h"
#include "google/protobuf/duration.upb.h"
#include "google/protobuf/struct.upb.h"
#include "google/protobuf/timestamp.upb.h"
#include "google/protobuf/wrappers.upb.h"
@ -83,16 +40,13 @@
#include "upb/text_encode.h"
#include "upb/upb.h"
#include "upb/upb.hpp"
#include "xds/type/v3/typed_struct.upb.h"
#include <grpc/impl/codegen/log.h>
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
#include "src/core/ext/xds/upb_utils.h"
#include "src/core/ext/xds/xds_cluster.h"
#include "src/core/ext/xds/xds_common_types.h"
#include "src/core/ext/xds/xds_endpoint.h"
#include "src/core/ext/xds/xds_resource_type.h"
#include "src/core/ext/xds/xds_routing.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
@ -107,108 +61,6 @@
namespace grpc_core {
//
// XdsApi
//
// TODO(roth): All constants and functions for individual resource types
// should be merged into the XdsResourceType abstraction.
const char* XdsApi::kLdsTypeUrl = "envoy.config.listener.v3.Listener";
const char* XdsApi::kRdsTypeUrl = "envoy.config.route.v3.RouteConfiguration";
const char* XdsApi::kCdsTypeUrl = "envoy.config.cluster.v3.Cluster";
const char* XdsApi::kEdsTypeUrl =
"envoy.config.endpoint.v3.ClusterLoadAssignment";
namespace {
const char* kLdsV2TypeUrl = "envoy.api.v2.Listener";
const char* kRdsV2TypeUrl = "envoy.api.v2.RouteConfiguration";
const char* kCdsV2TypeUrl = "envoy.api.v2.Cluster";
const char* kEdsV2TypeUrl = "envoy.api.v2.ClusterLoadAssignment";
bool IsLdsInternal(absl::string_view type_url, bool* is_v2 = nullptr) {
if (type_url == XdsApi::kLdsTypeUrl) return true;
if (type_url == kLdsV2TypeUrl) {
if (is_v2 != nullptr) *is_v2 = true;
return true;
}
return false;
}
bool IsRdsInternal(absl::string_view type_url, bool* /*is_v2*/ = nullptr) {
return type_url == XdsApi::kRdsTypeUrl || type_url == kRdsV2TypeUrl;
}
bool IsCdsInternal(absl::string_view type_url, bool* /*is_v2*/ = nullptr) {
return type_url == XdsApi::kCdsTypeUrl || type_url == kCdsV2TypeUrl;
}
bool IsEdsInternal(absl::string_view type_url, bool* /*is_v2*/ = nullptr) {
return type_url == XdsApi::kEdsTypeUrl || type_url == kEdsV2TypeUrl;
}
absl::string_view TypeUrlExternalToInternal(bool use_v3,
const std::string& type_url) {
if (!use_v3) {
if (type_url == XdsApi::kLdsTypeUrl) {
return kLdsV2TypeUrl;
}
if (type_url == XdsApi::kRdsTypeUrl) {
return kRdsV2TypeUrl;
}
if (type_url == XdsApi::kCdsTypeUrl) {
return kCdsV2TypeUrl;
}
if (type_url == XdsApi::kEdsTypeUrl) {
return kEdsV2TypeUrl;
}
}
return type_url;
}
std::string TypeUrlInternalToExternal(absl::string_view type_url) {
if (type_url == kLdsV2TypeUrl) {
return XdsApi::kLdsTypeUrl;
} else if (type_url == kRdsV2TypeUrl) {
return XdsApi::kRdsTypeUrl;
} else if (type_url == kCdsV2TypeUrl) {
return XdsApi::kCdsTypeUrl;
} else if (type_url == kEdsV2TypeUrl) {
return XdsApi::kEdsTypeUrl;
}
return std::string(type_url);
}
absl::StatusOr<XdsApi::ResourceName> ParseResourceNameInternal(
absl::string_view name,
std::function<bool(absl::string_view, bool*)> is_expected_type) {
// Old-style names use the empty string for authority.
// authority is prefixed with "old:" to indicate that it's an old-style name.
if (!absl::StartsWith(name, "xdstp:")) {
return XdsApi::ResourceName{"old:", std::string(name)};
}
// New style name. Parse URI.
auto uri = URI::Parse(name);
if (!uri.ok()) return uri.status();
// Split the resource type off of the path to get the id.
std::pair<absl::string_view, absl::string_view> path_parts =
absl::StrSplit(uri->path(), absl::MaxSplits('/', 1));
if (!is_expected_type(path_parts.first, nullptr)) {
return absl::InvalidArgumentError(
"xdstp URI path must indicate valid xDS resource type");
}
std::vector<std::pair<absl::string_view, absl::string_view>> query_parameters(
uri->query_parameter_map().begin(), uri->query_parameter_map().end());
std::sort(query_parameters.begin(), query_parameters.end());
return XdsApi::ResourceName{
absl::StrCat("xdstp:", uri->authority()),
absl::StrCat(
path_parts.second, (query_parameters.empty() ? "?" : ""),
absl::StrJoin(query_parameters, "&", absl::PairFormatter("=")))};
}
} // namespace
// If gRPC is built with -DGRPC_XDS_USER_AGENT_NAME_SUFFIX="...", that string
// will be appended to the user agent name reported to the xDS server.
#ifdef GRPC_XDS_USER_AGENT_NAME_SUFFIX
@ -249,54 +101,12 @@ XdsApi::XdsApi(XdsClient* client, TraceFlag* tracer,
// properly in logs.
// Note: This won't actually work properly until upb adds support for
// Any fields in textproto printing (internal b/178821188).
envoy_config_listener_v3_Listener_getmsgdef(symtab_.ptr());
envoy_config_route_v3_RouteConfiguration_getmsgdef(symtab_.ptr());
envoy_config_cluster_v3_Cluster_getmsgdef(symtab_.ptr());
envoy_extensions_clusters_aggregate_v3_ClusterConfig_getmsgdef(symtab_.ptr());
envoy_config_cluster_v3_Cluster_getmsgdef(symtab_.ptr());
envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef(symtab_.ptr());
envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_getmsgdef(
symtab_.ptr());
envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_getmsgdef(
symtab_.ptr());
// Load HTTP filter proto messages into the upb symtab.
XdsHttpFilterRegistry::PopulateSymtab(symtab_.ptr());
}
bool XdsApi::IsLds(absl::string_view type_url) {
return IsLdsInternal(type_url);
}
bool XdsApi::IsRds(absl::string_view type_url) {
return IsRdsInternal(type_url);
}
bool XdsApi::IsCds(absl::string_view type_url) {
return IsCdsInternal(type_url);
}
bool XdsApi::IsEds(absl::string_view type_url) {
return IsEdsInternal(type_url);
}
absl::StatusOr<XdsApi::ResourceName> XdsApi::ParseResourceName(
absl::string_view name, bool (*is_expected_type)(absl::string_view)) {
return ParseResourceNameInternal(
name, [is_expected_type](absl::string_view type, bool*) {
return is_expected_type(type);
XdsResourceTypeRegistry::GetOrCreate()->ForEach(
[this](const XdsResourceType* type) {
type->InitUpbSymtab(symtab_.ptr());
});
}
std::string XdsApi::ConstructFullResourceName(absl::string_view authority,
absl::string_view resource_type,
absl::string_view name) {
if (absl::ConsumePrefix(&authority, "xdstp:")) {
return absl::StrCat("xdstp://", authority, "/", resource_type, "/", name);
} else {
return std::string(absl::StripPrefix(name, "old:"));
}
}
namespace {
void PopulateMetadataValue(const XdsEncodingContext& context,
@ -467,11 +277,10 @@ grpc_slice SerializeDiscoveryRequest(
} // namespace
grpc_slice XdsApi::CreateAdsRequest(
const XdsBootstrap::XdsServer& server, const std::string& type_url,
const std::map<absl::string_view /*authority*/,
std::set<absl::string_view /*name*/>>& resource_names,
const std::string& version, const std::string& nonce,
grpc_error_handle error, bool populate_node) {
const XdsBootstrap::XdsServer& server, absl::string_view type_url,
absl::string_view version, absl::string_view nonce,
const std::vector<std::string>& resource_names, grpc_error_handle error,
bool populate_node) {
upb::Arena arena;
const XdsEncodingContext context = {client_,
tracer_,
@ -483,12 +292,9 @@ grpc_slice XdsApi::CreateAdsRequest(
envoy_service_discovery_v3_DiscoveryRequest* request =
envoy_service_discovery_v3_DiscoveryRequest_new(arena.ptr());
// Set type_url.
absl::string_view real_type_url =
TypeUrlExternalToInternal(server.ShouldUseV3(), type_url);
std::string real_type_url_str =
absl::StrCat("type.googleapis.com/", real_type_url);
std::string type_url_str = absl::StrCat("type.googleapis.com/", type_url);
envoy_service_discovery_v3_DiscoveryRequest_set_type_url(
request, StdStringToUpbString(real_type_url_str));
request, StdStringToUpbString(type_url_str));
// Set version_info.
if (!version.empty()) {
envoy_service_discovery_v3_DiscoveryRequest_set_version_info(
@ -524,27 +330,10 @@ grpc_slice XdsApi::CreateAdsRequest(
PopulateNode(context, node_, build_version_, user_agent_name_,
user_agent_version_, node_msg);
}
// A vector for temporary local storage of resource name strings.
std::vector<std::string> resource_name_storage;
// Make sure the vector is sized right up-front, so that reallocations
// don't move the strings out from under the upb proto object that
// points to them.
size_t size = 0;
for (const auto& p : resource_names) {
size += p.second.size();
}
resource_name_storage.reserve(size);
// Add resource_names.
for (const auto& a : resource_names) {
absl::string_view authority = a.first;
for (const auto& p : a.second) {
absl::string_view resource_id = p;
resource_name_storage.push_back(
ConstructFullResourceName(authority, real_type_url, resource_id));
for (const std::string& resource_name : resource_names) {
envoy_service_discovery_v3_DiscoveryRequest_add_resource_names(
request, StdStringToUpbString(resource_name_storage.back()),
arena.ptr());
}
request, StdStringToUpbString(resource_name), arena.ptr());
}
MaybeLogDiscoveryRequest(context, request);
return SerializeDiscoveryRequest(context, request);
@ -566,110 +355,11 @@ void MaybeLogDiscoveryResponse(
}
}
grpc_error_handle AdsResourceParse(
const XdsEncodingContext& context, XdsResourceType* type, size_t idx,
const google_protobuf_Any* resource_any,
const std::map<absl::string_view /*authority*/,
std::set<absl::string_view /*name*/>>&
subscribed_resource_names,
std::function<grpc_error_handle(
absl::string_view, XdsApi::ResourceName,
std::unique_ptr<XdsResourceType::ResourceData>, std::string)>
add_result_func,
std::set<XdsApi::ResourceName>* resource_names_failed) {
// Check the type_url of the resource.
absl::string_view type_url = absl::StripPrefix(
UpbStringToAbsl(google_protobuf_Any_type_url(resource_any)),
"type.googleapis.com/");
bool is_v2 = false;
if (!type->IsType(type_url, &is_v2)) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("resource index ", idx, ": found resource type ", type_url,
" in response for type ", type->type_url()));
}
// Parse the resource.
absl::string_view serialized_resource =
UpbStringToAbsl(google_protobuf_Any_value(resource_any));
absl::StatusOr<XdsResourceType::DecodeResult> result =
type->Decode(context, serialized_resource, is_v2);
if (!result.ok()) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("resource index ", idx, ": ", result.status().ToString()));
}
// Check the resource name.
auto resource_name = ParseResourceNameInternal(
result->name, [type](absl::string_view type_url, bool* is_v2) {
return type->IsType(type_url, is_v2);
});
if (!resource_name.ok()) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"resource index ", idx, ": Cannot parse xDS resource name \"",
result->name, "\""));
}
// Ignore unexpected names.
auto iter = subscribed_resource_names.find(resource_name->authority);
if (iter == subscribed_resource_names.end() ||
iter->second.find(resource_name->id) == iter->second.end()) {
return GRPC_ERROR_NONE;
}
// Check that resource was valid.
if (!result->resource.ok()) {
resource_names_failed->insert(*resource_name);
return GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"resource index ", idx, ": ", result->name,
": validation error: ", result->resource.status().ToString()));
}
// Add result.
grpc_error_handle error = add_result_func(result->name, *resource_name,
std::move(*result->resource),
std::string(serialized_resource));
if (error != GRPC_ERROR_NONE) {
resource_names_failed->insert(*resource_name);
return grpc_error_add_child(
GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"resource index ", idx, ": ", result->name, ": validation error")),
error);
}
return GRPC_ERROR_NONE;
}
template <typename UpdateMap, typename ResourceTypeData>
grpc_error_handle AddResult(
UpdateMap* update_map, absl::string_view resource_name_string,
XdsApi::ResourceName resource_name,
std::unique_ptr<XdsResourceType::ResourceData> resource,
std::string serialized_resource) {
// Reject duplicate names.
if (update_map->find(resource_name) != update_map->end()) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("duplicate resource name \"", resource_name_string, "\""));
}
// Save result.
auto& resource_data = (*update_map)[resource_name];
ResourceTypeData* typed_resource =
static_cast<ResourceTypeData*>(resource.get());
resource_data.resource = std::move(typed_resource->resource);
resource_data.serialized_proto = std::move(serialized_resource);
return GRPC_ERROR_NONE;
}
} // namespace
XdsApi::AdsParseResult XdsApi::ParseAdsResponse(
const XdsBootstrap::XdsServer& server, const grpc_slice& encoded_response,
const std::map<absl::string_view /*authority*/,
std::set<absl::string_view /*name*/>>&
subscribed_listener_names,
const std::map<absl::string_view /*authority*/,
std::set<absl::string_view /*name*/>>&
subscribed_route_config_names,
const std::map<absl::string_view /*authority*/,
std::set<absl::string_view /*name*/>>&
subscribed_cluster_names,
const std::map<absl::string_view /*authority*/,
std::set<absl::string_view /*name*/>>&
subscribed_eds_service_names) {
AdsParseResult result;
absl::Status XdsApi::ParseAdsResponse(const XdsBootstrap::XdsServer& server,
const grpc_slice& encoded_response,
AdsResponseParserInterface* parser) {
upb::Arena arena;
const XdsEncodingContext context = {client_,
tracer_,
@ -682,99 +372,38 @@ XdsApi::AdsParseResult XdsApi::ParseAdsResponse(
envoy_service_discovery_v3_DiscoveryResponse_parse(
reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(encoded_response)),
GRPC_SLICE_LENGTH(encoded_response), arena.ptr());
// If decoding fails, output an empty type_url and return.
// If decoding fails, report a fatal error and return.
if (response == nullptr) {
result.parse_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode DiscoveryResponse.");
return result;
return absl::InvalidArgumentError("Can't decode DiscoveryResponse.");
}
MaybeLogDiscoveryResponse(context, response);
// Record the type_url, the version_info, and the nonce of the response.
result.type_url = TypeUrlInternalToExternal(absl::StripPrefix(
// Report the type_url, version, nonce, and number of resources to the parser.
AdsResponseParserInterface::AdsResponseFields fields;
fields.type_url = std::string(absl::StripPrefix(
UpbStringToAbsl(
envoy_service_discovery_v3_DiscoveryResponse_type_url(response)),
"type.googleapis.com/"));
result.version = UpbStringToStdString(
fields.version = UpbStringToStdString(
envoy_service_discovery_v3_DiscoveryResponse_version_info(response));
result.nonce = UpbStringToStdString(
fields.nonce = UpbStringToStdString(
envoy_service_discovery_v3_DiscoveryResponse_nonce(response));
// Get the resources from the response.
std::vector<grpc_error_handle> errors;
size_t size;
size_t num_resources;
const google_protobuf_Any* const* resources =
envoy_service_discovery_v3_DiscoveryResponse_resources(response, &size);
for (size_t i = 0; i < size; ++i) {
// Parse the response according to the resource type.
// TODO(roth): When we have time, change the API here to avoid the need
// for templating and conditionals.
grpc_error_handle parse_error = GRPC_ERROR_NONE;
if (IsLds(result.type_url)) {
XdsListenerResourceType resource_type;
auto& update_map = result.lds_update_map;
parse_error = AdsResourceParse(
context, &resource_type, i, resources[i], subscribed_listener_names,
[&update_map](absl::string_view resource_name_string,
XdsApi::ResourceName resource_name,
std::unique_ptr<XdsResourceType::ResourceData> resource,
std::string serialized_resource) {
return AddResult<LdsUpdateMap,
XdsListenerResourceType::ListenerData>(
&update_map, resource_name_string, std::move(resource_name),
std::move(resource), std::move(serialized_resource));
},
&result.resource_names_failed);
} else if (IsRds(result.type_url)) {
XdsRouteConfigResourceType resource_type;
auto& update_map = result.rds_update_map;
parse_error = AdsResourceParse(
context, &resource_type, i, resources[i],
subscribed_route_config_names,
[&update_map](absl::string_view resource_name_string,
XdsApi::ResourceName resource_name,
std::unique_ptr<XdsResourceType::ResourceData> resource,
std::string serialized_resource) {
return AddResult<RdsUpdateMap,
XdsRouteConfigResourceType::RouteConfigData>(
&update_map, resource_name_string, std::move(resource_name),
std::move(resource), std::move(serialized_resource));
},
&result.resource_names_failed);
} else if (IsCds(result.type_url)) {
XdsClusterResourceType resource_type;
auto& update_map = result.cds_update_map;
parse_error = AdsResourceParse(
context, &resource_type, i, resources[i], subscribed_cluster_names,
[&update_map](absl::string_view resource_name_string,
XdsApi::ResourceName resource_name,
std::unique_ptr<XdsResourceType::ResourceData> resource,
std::string serialized_resource) {
return AddResult<CdsUpdateMap, XdsClusterResourceType::ClusterData>(
&update_map, resource_name_string, std::move(resource_name),
std::move(resource), std::move(serialized_resource));
},
&result.resource_names_failed);
} else if (IsEds(result.type_url)) {
XdsEndpointResourceType resource_type;
auto& update_map = result.eds_update_map;
parse_error = AdsResourceParse(
context, &resource_type, i, resources[i],
subscribed_eds_service_names,
[&update_map](absl::string_view resource_name_string,
XdsApi::ResourceName resource_name,
std::unique_ptr<XdsResourceType::ResourceData> resource,
std::string serialized_resource) {
return AddResult<EdsUpdateMap,
XdsEndpointResourceType::EndpointData>(
&update_map, resource_name_string, std::move(resource_name),
std::move(resource), std::move(serialized_resource));
},
&result.resource_names_failed);
}
if (parse_error != GRPC_ERROR_NONE) errors.push_back(parse_error);
envoy_service_discovery_v3_DiscoveryResponse_resources(response,
&num_resources);
fields.num_resources = num_resources;
absl::Status status = parser->ProcessAdsResponseFields(std::move(fields));
if (!status.ok()) return status;
// Process each resource.
for (size_t i = 0; i < num_resources; ++i) {
absl::string_view type_url = absl::StripPrefix(
UpbStringToAbsl(google_protobuf_Any_type_url(resources[i])),
"type.googleapis.com/");
absl::string_view serialized_resource =
UpbStringToAbsl(google_protobuf_Any_value(resources[i]));
parser->ParseResource(context, i, type_url, serialized_resource);
}
result.parse_error =
GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing ADS response", &errors);
return result;
return absl::OkStatus();
}
namespace {

@ -23,71 +23,48 @@
#include <set>
#include "absl/container/inlined_vector.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include "envoy/admin/v3/config_dump.upb.h"
#include "re2/re2.h"
#include "upb/def.hpp"
#include <grpc/slice_buffer.h>
#include <grpc/slice.h>
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/xds/upb_utils.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/ext/xds/xds_cluster.h"
#include "src/core/ext/xds/xds_endpoint.h"
#include "src/core/ext/xds/xds_http_filters.h"
#include "src/core/ext/xds/xds_listener.h"
#include "src/core/ext/xds/xds_route_config.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/matchers/matchers.h"
namespace grpc_core {
class XdsClient;
// TODO(roth): When we have time, split this into multiple pieces:
// - a common upb-based parsing framework (combine with XdsEncodingContext)
// - ADS request/response handling
// - LRS request/response handling
// - CSDS response generation
class XdsApi {
public:
static const char* kLdsTypeUrl;
static const char* kRdsTypeUrl;
static const char* kCdsTypeUrl;
static const char* kEdsTypeUrl;
struct ResourceName {
std::string authority;
std::string id;
bool operator<(const ResourceName& other) const {
if (authority < other.authority) return true;
if (id < other.id) return true;
return false;
}
};
struct LdsResourceData {
XdsListenerResource resource;
std::string serialized_proto;
// Interface defined by caller and passed to ParseAdsResponse().
class AdsResponseParserInterface {
public:
struct AdsResponseFields {
std::string type_url;
std::string version;
std::string nonce;
size_t num_resources;
};
using LdsUpdateMap = std::map<ResourceName, LdsResourceData>;
struct RdsResourceData {
XdsRouteConfigResource resource;
std::string serialized_proto;
};
using RdsUpdateMap = std::map<ResourceName, RdsResourceData>;
virtual ~AdsResponseParserInterface() = default;
struct CdsResourceData {
XdsClusterResource resource;
std::string serialized_proto;
};
using CdsUpdateMap = std::map<ResourceName, CdsResourceData>;
// Called when the top-level ADS fields are parsed.
// If this returns non-OK, parsing will stop, and the individual
// resources will not be processed.
virtual absl::Status ProcessAdsResponseFields(AdsResponseFields fields) = 0;
struct EdsResourceData {
XdsEndpointResource resource;
std::string serialized_proto;
// Called to parse each individual resource in the ADS response.
virtual void ParseResource(const XdsEncodingContext& context, size_t idx,
absl::string_view type_url,
absl::string_view serialized_resource) = 0;
};
using EdsUpdateMap = std::map<ResourceName, EdsResourceData>;
struct ClusterLoadReport {
XdsClusterDropStats::Snapshot dropped_requests;
@ -156,69 +133,23 @@ class XdsApi {
ResourceMetadata::ClientResourceStatus::NACKED,
"");
// If the response can't be parsed at the top level, the resulting
// type_url will be empty.
// If there is any other type of validation error, the parse_error
// field will be set to something other than GRPC_ERROR_NONE and the
// resource_names_failed field will be populated.
// Otherwise, one of the *_update_map fields will be populated, based
// on the type_url field.
struct AdsParseResult {
grpc_error_handle parse_error = GRPC_ERROR_NONE;
std::string version;
std::string nonce;
std::string type_url;
LdsUpdateMap lds_update_map;
RdsUpdateMap rds_update_map;
CdsUpdateMap cds_update_map;
EdsUpdateMap eds_update_map;
std::set<ResourceName> resource_names_failed;
};
XdsApi(XdsClient* client, TraceFlag* tracer, const XdsBootstrap::Node* node,
const CertificateProviderStore::PluginDefinitionMap* map);
static bool IsLds(absl::string_view type_url);
static bool IsRds(absl::string_view type_url);
static bool IsCds(absl::string_view type_url);
static bool IsEds(absl::string_view type_url);
// A helper method to parse the resource name and return back a ResourceName
// struct. Optionally the parser can check the resource type portion of the
// resource name.
static absl::StatusOr<ResourceName> ParseResourceName(
absl::string_view name,
bool (*is_expected_type)(absl::string_view) = nullptr);
// A helper method to construct the resource name from parts.
static std::string ConstructFullResourceName(absl::string_view authority,
absl::string_view resource_type,
absl::string_view name);
// Creates an ADS request.
// Takes ownership of \a error.
grpc_slice CreateAdsRequest(
const XdsBootstrap::XdsServer& server, const std::string& type_url,
const std::map<absl::string_view /*authority*/,
std::set<absl::string_view /*name*/>>& resource_names,
const std::string& version, const std::string& nonce,
grpc_slice CreateAdsRequest(const XdsBootstrap::XdsServer& server,
absl::string_view type_url,
absl::string_view version,
absl::string_view nonce,
const std::vector<std::string>& resource_names,
grpc_error_handle error, bool populate_node);
// Parses an ADS response.
AdsParseResult ParseAdsResponse(
const XdsBootstrap::XdsServer& server, const grpc_slice& encoded_response,
const std::map<absl::string_view /*authority*/,
std::set<absl::string_view /*name*/>>&
subscribed_listener_names,
const std::map<absl::string_view /*authority*/,
std::set<absl::string_view /*name*/>>&
subscribed_route_config_names,
const std::map<absl::string_view /*authority*/,
std::set<absl::string_view /*name*/>>&
subscribed_cluster_names,
const std::map<absl::string_view /*authority*/,
std::set<absl::string_view /*name*/>>&
subscribed_eds_service_names);
// Returns non-OK when failing to deserialize response message.
// Otherwise, all events are reported to the parser.
absl::Status ParseAdsResponse(const XdsBootstrap::XdsServer& server,
const grpc_slice& encoded_response,
AdsResponseParserInterface* parser);
// Creates an initial LRS request.
grpc_slice CreateLrsInitialRequest(const XdsBootstrap::XdsServer& server);

@ -22,6 +22,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/ext/xds/xds_api.h"
#include "src/core/lib/matchers/matchers.h"
#include "src/core/lib/security/credentials/tls/grpc_tls_certificate_provider.h"
#define GRPC_ARG_XDS_CERTIFICATE_PROVIDER \

File diff suppressed because it is too large Load Diff

@ -28,6 +28,10 @@
#include "src/core/ext/xds/xds_api.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/ext/xds/xds_cluster.h"
#include "src/core/ext/xds/xds_endpoint.h"
#include "src/core/ext/xds/xds_listener.h"
#include "src/core/ext/xds/xds_route_config.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/memory.h"
@ -43,10 +47,11 @@ extern TraceFlag grpc_xds_client_refcount_trace;
class XdsClient : public DualRefCounted<XdsClient> {
public:
// Listener data watcher interface. Implemented by callers.
class ListenerWatcherInterface : public RefCounted<ListenerWatcherInterface> {
// Resource watcher interface. Implemented by callers.
class ResourceWatcherInterface : public RefCounted<ResourceWatcherInterface> {
public:
virtual void OnListenerChanged(XdsListenerResource listener)
virtual void OnResourceChanged(
const XdsResourceType::ResourceData* resource)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnError(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
@ -54,38 +59,65 @@ class XdsClient : public DualRefCounted<XdsClient> {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
};
// TODO(roth): Consider removing these resource-type-specific APIs in
// favor of some mechanism for automatic type-deduction for the generic
// API.
// Listener data watcher interface. Implemented by callers.
class ListenerWatcherInterface : public ResourceWatcherInterface {
public:
virtual void OnListenerChanged(XdsListenerResource listener) = 0;
private:
void OnResourceChanged(
const XdsResourceType::ResourceData* resource) override {
OnListenerChanged(
static_cast<const XdsListenerResourceType::ListenerData*>(resource)
->resource);
}
};
// RouteConfiguration data watcher interface. Implemented by callers.
class RouteConfigWatcherInterface
: public RefCounted<RouteConfigWatcherInterface> {
class RouteConfigWatcherInterface : public ResourceWatcherInterface {
public:
virtual void OnRouteConfigChanged(XdsRouteConfigResource route_config)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnError(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnResourceDoesNotExist()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnRouteConfigChanged(XdsRouteConfigResource route_config) = 0;
private:
void OnResourceChanged(
const XdsResourceType::ResourceData* resource) override {
OnRouteConfigChanged(
static_cast<const XdsRouteConfigResourceType::RouteConfigData*>(
resource)
->resource);
}
};
// Cluster data watcher interface. Implemented by callers.
class ClusterWatcherInterface : public RefCounted<ClusterWatcherInterface> {
class ClusterWatcherInterface : public ResourceWatcherInterface {
public:
virtual void OnClusterChanged(XdsClusterResource cluster_data)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnError(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnResourceDoesNotExist()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnClusterChanged(XdsClusterResource cluster_data) = 0;
private:
void OnResourceChanged(
const XdsResourceType::ResourceData* resource) override {
OnClusterChanged(
static_cast<const XdsClusterResourceType::ClusterData*>(resource)
->resource);
}
};
// Endpoint data watcher interface. Implemented by callers.
class EndpointWatcherInterface : public RefCounted<EndpointWatcherInterface> {
class EndpointWatcherInterface : public ResourceWatcherInterface {
public:
virtual void OnEndpointChanged(XdsEndpointResource update)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnError(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnResourceDoesNotExist()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnEndpointChanged(XdsEndpointResource update) = 0;
private:
void OnResourceChanged(
const XdsResourceType::ResourceData* resource) override {
OnEndpointChanged(
static_cast<const XdsEndpointResourceType::EndpointData*>(resource)
->resource);
}
};
// Factory function to get or create the global XdsClient instance.
@ -113,6 +145,20 @@ class XdsClient : public DualRefCounted<XdsClient> {
void Orphan() override;
// Start and cancel watch for a resource.
// The XdsClient takes ownership of the watcher, but the caller may
// keep a raw pointer to the watcher, which may be used only for
// cancellation. (Because the caller does not own the watcher, the
// pointer must not be used for any other purpose.)
// If the caller is going to start a new watch after cancelling the
// old one, it should set delay_unsubscription to true.
void WatchResource(const XdsResourceType* type, absl::string_view name,
RefCountedPtr<ResourceWatcherInterface> watcher);
void CancelResourceWatch(const XdsResourceType* type,
absl::string_view listener_name,
ResourceWatcherInterface* watcher,
bool delay_unsubscription = false);
// Start and cancel listener data watch for a listener.
// The XdsClient takes ownership of the watcher, but the caller may
// keep a raw pointer to the watcher, which may be used only for
@ -204,6 +250,11 @@ class XdsClient : public DualRefCounted<XdsClient> {
const grpc_channel_args& args);
private:
struct XdsResourceName {
std::string authority;
std::string id;
};
// Contains a channel to the xds server and all the data related to the
// channel. Holds a ref to the xds client object.
class ChannelState : public DualRefCounted<ChannelState> {
@ -234,11 +285,11 @@ class XdsClient : public DualRefCounted<XdsClient> {
void StartConnectivityWatchLocked();
void CancelConnectivityWatchLocked();
void SubscribeLocked(const std::string& type_url,
const XdsApi::ResourceName& name)
void SubscribeLocked(const XdsResourceType* type,
const XdsResourceName& name)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void UnsubscribeLocked(const std::string& type_url,
const XdsApi::ResourceName& name,
void UnsubscribeLocked(const XdsResourceType* type,
const XdsResourceName& name,
bool delay_unsubscription)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
@ -260,55 +311,23 @@ class XdsClient : public DualRefCounted<XdsClient> {
OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_;
// Stores the most recent accepted resource version for each resource type.
std::map<std::string /*type*/, std::string /*version*/>
std::map<const XdsResourceType*, std::string /*version*/>
resource_type_version_map_;
};
struct ListenerState {
std::map<ListenerWatcherInterface*, RefCountedPtr<ListenerWatcherInterface>>
struct ResourceState {
std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>>
watchers;
// The latest data seen from LDS.
absl::optional<XdsListenerResource> update;
XdsApi::ResourceMetadata meta;
};
struct RouteConfigState {
std::map<RouteConfigWatcherInterface*,
RefCountedPtr<RouteConfigWatcherInterface>>
watchers;
// The latest data seen from RDS.
absl::optional<XdsRouteConfigResource> update;
XdsApi::ResourceMetadata meta;
};
struct ClusterState {
std::map<ClusterWatcherInterface*, RefCountedPtr<ClusterWatcherInterface>>
watchers;
// The latest data seen from CDS.
absl::optional<XdsClusterResource> update;
XdsApi::ResourceMetadata meta;
};
struct EndpointState {
std::map<EndpointWatcherInterface*, RefCountedPtr<EndpointWatcherInterface>>
watchers;
// The latest data seen from EDS.
absl::optional<XdsEndpointResource> update;
// The latest data seen for the resource.
std::unique_ptr<XdsResourceType::ResourceData> resource;
XdsApi::ResourceMetadata meta;
};
struct AuthorityState {
RefCountedPtr<ChannelState> channel_state;
std::map<std::string /*listener_name*/, ListenerState> listener_map;
std::map<std::string /*route_config_name*/, RouteConfigState>
route_config_map;
std::map<std::string /*cluster_name*/, ClusterState> cluster_map;
std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map;
bool HasSubscribedResources() {
return !listener_map.empty() || !route_config_map.empty() ||
!cluster_map.empty() || !endpoint_map.empty();
}
std::map<const XdsResourceType*,
std::map<std::string /*id*/, ResourceState>>
resource_map;
};
struct LoadReportState {
@ -331,6 +350,12 @@ class XdsClient : public DualRefCounted<XdsClient> {
void NotifyOnErrorLocked(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
static absl::StatusOr<XdsResourceName> ParseXdsResourceName(
absl::string_view name, const XdsResourceType* type);
static std::string ConstructFullXdsResourceName(
absl::string_view authority, absl::string_view resource_type,
absl::string_view id);
XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
bool send_all_clusters, const std::set<std::string>& clusters)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
@ -363,15 +388,8 @@ class XdsClient : public DualRefCounted<XdsClient> {
// Stores started watchers whose resource name was not parsed successfully,
// waiting to be cancelled or reset in Orphan().
std::map<ListenerWatcherInterface*, RefCountedPtr<ListenerWatcherInterface>>
invalid_listener_watchers_ ABSL_GUARDED_BY(mu_);
std::map<RouteConfigWatcherInterface*,
RefCountedPtr<RouteConfigWatcherInterface>>
invalid_route_config_watchers_ ABSL_GUARDED_BY(mu_);
std::map<ClusterWatcherInterface*, RefCountedPtr<ClusterWatcherInterface>>
invalid_cluster_watchers_ ABSL_GUARDED_BY(mu_);
std::map<EndpointWatcherInterface*, RefCountedPtr<EndpointWatcherInterface>>
invalid_endpoint_watchers_ ABSL_GUARDED_BY(mu_);
std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>>
invalid_watchers_ ABSL_GUARDED_BY(mu_);
bool shutting_down_ ABSL_GUARDED_BY(mu_) = false;
};

@ -420,7 +420,7 @@ absl::StatusOr<XdsResourceType::DecodeResult> XdsClusterResourceType::Decode(
auto* resource = envoy_config_cluster_v3_Cluster_parse(
serialized_resource.data(), serialized_resource.size(), context.arena);
if (resource == nullptr) {
return absl::InvalidArgumentError("Can't parse Listener resource.");
return absl::InvalidArgumentError("Can't parse Cluster resource.");
}
MaybeLogCluster(context, resource);
// Validate resource.
@ -431,9 +431,18 @@ absl::StatusOr<XdsResourceType::DecodeResult> XdsClusterResourceType::Decode(
grpc_error_handle error =
CdsResourceParse(context, resource, is_v2, &cluster_data->resource);
if (error != GRPC_ERROR_NONE) {
result.resource = absl::InvalidArgumentError(grpc_error_std_string(error));
std::string error_str = grpc_error_std_string(error);
GRPC_ERROR_UNREF(error);
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_ERROR, "[xds_client %p] invalid Cluster %s: %s",
context.client, result.name.c_str(), error_str.c_str());
}
result.resource = absl::InvalidArgumentError(error_str);
} else {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_INFO, "[xds_client %p] parsed Cluster %s: %s", context.client,
result.name.c_str(), cluster_data->resource.ToString().c_str());
}
result.resource = std::move(cluster_data);
}
return std::move(result);

@ -23,6 +23,9 @@
#include <vector>
#include "absl/types/optional.h"
#include "envoy/config/cluster/v3/cluster.upbdefs.h"
#include "envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h"
#include "envoy/extensions/transport_sockets/tls/v3/tls.upbdefs.h"
#include "src/core/ext/xds/xds_common_types.h"
#include "src/core/ext/xds/xds_resource_type.h"
@ -94,6 +97,29 @@ class XdsClusterResourceType : public XdsResourceType {
absl::StatusOr<DecodeResult> Decode(const XdsEncodingContext& context,
absl::string_view serialized_resource,
bool is_v2) const override;
bool ResourcesEqual(const ResourceData* r1,
const ResourceData* r2) const override {
return static_cast<const ClusterData*>(r1)->resource ==
static_cast<const ClusterData*>(r2)->resource;
}
std::unique_ptr<ResourceData> CopyResource(
const ResourceData* resource) const override {
auto* resource_copy = new ClusterData();
resource_copy->resource =
static_cast<const ClusterData*>(resource)->resource;
return std::unique_ptr<ResourceData>(resource_copy);
}
bool AllResourcesRequiredInSotW() const override { return true; }
void InitUpbSymtab(upb_symtab* symtab) const override {
envoy_config_cluster_v3_Cluster_getmsgdef(symtab);
envoy_extensions_clusters_aggregate_v3_ClusterConfig_getmsgdef(symtab);
envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_getmsgdef(
symtab);
}
};
} // namespace grpc_core

@ -330,7 +330,8 @@ absl::StatusOr<XdsResourceType::DecodeResult> XdsEndpointResourceType::Decode(
auto* resource = envoy_config_endpoint_v3_ClusterLoadAssignment_parse(
serialized_resource.data(), serialized_resource.size(), context.arena);
if (resource == nullptr) {
return absl::InvalidArgumentError("Can't parse Listener resource.");
return absl::InvalidArgumentError(
"Can't parse ClusterLoadAssignment resource.");
}
MaybeLogClusterLoadAssignment(context, resource);
// Validate resource.
@ -341,9 +342,19 @@ absl::StatusOr<XdsResourceType::DecodeResult> XdsEndpointResourceType::Decode(
grpc_error_handle error =
EdsResourceParse(context, resource, is_v2, &endpoint_data->resource);
if (error != GRPC_ERROR_NONE) {
result.resource = absl::InvalidArgumentError(grpc_error_std_string(error));
std::string error_str = grpc_error_std_string(error);
GRPC_ERROR_UNREF(error);
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_ERROR, "[xds_client %p] invalid ClusterLoadAssignment %s: %s",
context.client, result.name.c_str(), error_str.c_str());
}
result.resource = absl::InvalidArgumentError(error_str);
} else {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_INFO, "[xds_client %p] parsed ClusterLoadAssignment %s: %s",
context.client, result.name.c_str(),
endpoint_data->resource.ToString().c_str());
}
result.resource = std::move(endpoint_data);
}
return std::move(result);

@ -24,6 +24,7 @@
#include <string>
#include "absl/container/inlined_vector.h"
#include "envoy/config/endpoint/v3/endpoint.upbdefs.h"
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/xds/xds_client_stats.h"
@ -125,6 +126,24 @@ class XdsEndpointResourceType : public XdsResourceType {
absl::StatusOr<DecodeResult> Decode(const XdsEncodingContext& context,
absl::string_view serialized_resource,
bool is_v2) const override;
bool ResourcesEqual(const ResourceData* r1,
const ResourceData* r2) const override {
return static_cast<const EndpointData*>(r1)->resource ==
static_cast<const EndpointData*>(r2)->resource;
}
std::unique_ptr<ResourceData> CopyResource(
const ResourceData* resource) const override {
auto* resource_copy = new EndpointData();
resource_copy->resource =
static_cast<const EndpointData*>(resource)->resource;
return std::unique_ptr<ResourceData>(resource_copy);
}
void InitUpbSymtab(upb_symtab* symtab) const override {
envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef(symtab);
}
};
} // namespace grpc_core

@ -1001,9 +1001,19 @@ absl::StatusOr<XdsResourceType::DecodeResult> XdsListenerResourceType::Decode(
grpc_error_handle error =
LdsResourceParse(context, resource, is_v2, &listener_data->resource);
if (error != GRPC_ERROR_NONE) {
result.resource = absl::InvalidArgumentError(grpc_error_std_string(error));
std::string error_str = grpc_error_std_string(error);
GRPC_ERROR_UNREF(error);
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_ERROR, "[xds_client %p] invalid Listener %s: %s",
context.client, result.name.c_str(), error_str.c_str());
}
result.resource = absl::InvalidArgumentError(error_str);
} else {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_INFO, "[xds_client %p] parsed Listener %s: %s",
context.client, result.name.c_str(),
listener_data->resource.ToString().c_str());
}
result.resource = std::move(listener_data);
}
return std::move(result);

@ -27,6 +27,8 @@
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "envoy/config/listener/v3/listener.upbdefs.h"
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h"
#include "src/core/ext/xds/xds_common_types.h"
#include "src/core/ext/xds/xds_http_filters.h"
@ -203,6 +205,29 @@ class XdsListenerResourceType : public XdsResourceType {
absl::StatusOr<DecodeResult> Decode(const XdsEncodingContext& context,
absl::string_view serialized_resource,
bool is_v2) const override;
bool ResourcesEqual(const ResourceData* r1,
const ResourceData* r2) const override {
return static_cast<const ListenerData*>(r1)->resource ==
static_cast<const ListenerData*>(r2)->resource;
}
std::unique_ptr<ResourceData> CopyResource(
const ResourceData* resource) const override {
auto* resource_copy = new ListenerData();
resource_copy->resource =
static_cast<const ListenerData*>(resource)->resource;
return std::unique_ptr<ResourceData>(resource_copy);
}
bool AllResourcesRequiredInSotW() const override { return true; }
void InitUpbSymtab(upb_symtab* symtab) const override {
envoy_config_listener_v3_Listener_getmsgdef(symtab);
envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_getmsgdef(
symtab);
XdsHttpFilterRegistry::PopulateSymtab(symtab);
}
};
} // namespace grpc_core

@ -0,0 +1,71 @@
//
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/xds/xds_resource_type.h"
#include <vector>
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include "src/core/lib/uri/uri_parser.h"
namespace grpc_core {
bool XdsResourceType::IsType(absl::string_view resource_type,
bool* is_v2) const {
if (resource_type == type_url()) return true;
if (resource_type == v2_type_url()) {
if (is_v2 != nullptr) *is_v2 = true;
return true;
}
return false;
}
XdsResourceTypeRegistry* XdsResourceTypeRegistry::GetOrCreate() {
static XdsResourceTypeRegistry* registry = new XdsResourceTypeRegistry();
return registry;
}
const XdsResourceType* XdsResourceTypeRegistry::GetType(
absl::string_view resource_type) {
auto it = resource_types_.find(resource_type);
if (it != resource_types_.end()) return it->second.get();
auto it2 = v2_resource_types_.find(resource_type);
if (it2 != v2_resource_types_.end()) return it2->second;
return nullptr;
}
void XdsResourceTypeRegistry::RegisterType(
std::unique_ptr<XdsResourceType> resource_type) {
GPR_ASSERT(resource_types_.find(resource_type->type_url()) ==
resource_types_.end());
GPR_ASSERT(v2_resource_types_.find(resource_type->v2_type_url()) ==
v2_resource_types_.end());
v2_resource_types_.emplace(resource_type->v2_type_url(), resource_type.get());
resource_types_.emplace(resource_type->type_url(), std::move(resource_type));
}
void XdsResourceTypeRegistry::ForEach(
std::function<void(const XdsResourceType*)> func) {
for (const auto& p : resource_types_) {
func(p.second.get());
}
}
} // namespace grpc_core

@ -16,6 +16,7 @@
#include <grpc/support/port_platform.h>
#include <map>
#include <memory>
#include <string>
@ -29,6 +30,8 @@
namespace grpc_core {
// Interface for an xDS resource type.
// Used to inject type-specific logic into XdsClient.
class XdsResourceType {
public:
// A base type for resource data.
@ -61,15 +64,58 @@ class XdsResourceType {
const XdsEncodingContext& context, absl::string_view serialized_resource,
bool is_v2) const = 0;
// Convenient method for checking if a resource type matches this type.
bool IsType(absl::string_view resource_type, bool* is_v2) const {
if (resource_type == type_url()) return true;
if (resource_type == v2_type_url()) {
if (is_v2 != nullptr) *is_v2 = true;
return true;
}
return false;
}
// Returns true if r1 and r2 are equal.
// Must be invoked only on resources returned by this object's Decode()
// method.
virtual bool ResourcesEqual(const ResourceData* r1,
const ResourceData* r2) const = 0;
// Returns a copy of resource.
// Must be invoked only on resources returned by this object's Decode()
// method.
virtual std::unique_ptr<ResourceData> CopyResource(
const ResourceData* resource) const = 0;
// Indicates whether the resource type requires that all resources must
// be present in every SotW response from the server. If true, a
// response that does not include a previously seen resource will be
// interpreted as a deletion of that resource.
virtual bool AllResourcesRequiredInSotW() const { return false; }
// Populate upb symtab with xDS proto messages that we want to print
// properly in logs.
// Note: This won't actually work properly until upb adds support for
// Any fields in textproto printing (internal b/178821188).
virtual void InitUpbSymtab(upb_symtab* symtab) const = 0;
// Convenience method for checking if resource_type matches this type.
// Checks against both type_url() and v2_type_url().
// If is_v2 is non-null, it will be set to true if matching v2_type_url().
bool IsType(absl::string_view resource_type, bool* is_v2) const;
};
// Global registry of xDS resource types.
class XdsResourceTypeRegistry {
public:
// Gets the global registry, creating it if needed.
static XdsResourceTypeRegistry* GetOrCreate();
// Gets the type for resource_type, or null if the type is unknown.
const XdsResourceType* GetType(absl::string_view resource_type);
// Registers a resource type.
// All types must be registered before they can be used in the XdsClient.
void RegisterType(std::unique_ptr<XdsResourceType> resource_type);
// Calls func for each resource type.
void ForEach(std::function<void(const XdsResourceType*)> func);
private:
std::map<absl::string_view /*resource_type*/,
std::unique_ptr<XdsResourceType>>
resource_types_;
std::map<absl::string_view /*v2_resource_type*/, XdsResourceType*>
v2_resource_types_;
};
} // namespace grpc_core

@ -960,7 +960,8 @@ XdsRouteConfigResourceType::Decode(const XdsEncodingContext& context,
auto* resource = envoy_config_route_v3_RouteConfiguration_parse(
serialized_resource.data(), serialized_resource.size(), context.arena);
if (resource == nullptr) {
return absl::InvalidArgumentError("Can't parse Listener resource.");
return absl::InvalidArgumentError(
"Can't parse RouteConfiguration resource.");
}
MaybeLogRouteConfiguration(context, resource);
// Validate resource.
@ -971,9 +972,19 @@ XdsRouteConfigResourceType::Decode(const XdsEncodingContext& context,
grpc_error_handle error = XdsRouteConfigResource::Parse(
context, resource, &route_config_data->resource);
if (error != GRPC_ERROR_NONE) {
result.resource = absl::InvalidArgumentError(grpc_error_std_string(error));
std::string error_str = grpc_error_std_string(error);
GRPC_ERROR_UNREF(error);
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_ERROR, "[xds_client %p] invalid RouteConfiguration %s: %s",
context.client, result.name.c_str(), error_str.c_str());
}
result.resource = absl::InvalidArgumentError(error_str);
} else {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_INFO, "[xds_client %p] parsed RouteConfiguration %s: %s",
context.client, result.name.c_str(),
route_config_data->resource.ToString().c_str());
}
result.resource = std::move(route_config_data);
}
return std::move(result);

@ -26,6 +26,7 @@
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include "envoy/config/route/v3/route.upb.h"
#include "envoy/config/route/v3/route.upbdefs.h"
#include "re2/re2.h"
#include "src/core/ext/xds/xds_common_types.h"
@ -204,6 +205,24 @@ class XdsRouteConfigResourceType : public XdsResourceType {
absl::StatusOr<DecodeResult> Decode(const XdsEncodingContext& context,
absl::string_view serialized_resource,
bool /*is_v2*/) const override;
bool ResourcesEqual(const ResourceData* r1,
const ResourceData* r2) const override {
return static_cast<const RouteConfigData*>(r1)->resource ==
static_cast<const RouteConfigData*>(r2)->resource;
}
std::unique_ptr<ResourceData> CopyResource(
const ResourceData* resource) const override {
auto* resource_copy = new RouteConfigData();
resource_copy->resource =
static_cast<const RouteConfigData*>(resource)->resource;
return std::unique_ptr<ResourceData>(resource_copy);
}
void InitUpbSymtab(upb_symtab* symtab) const override {
envoy_config_route_v3_RouteConfiguration_getmsgdef(symtab);
}
};
} // namespace grpc_core

@ -27,7 +27,8 @@
#include <grpc/support/log.h>
#include "src/core/ext/xds/xds_api.h"
#include "src/core/ext/xds/xds_listener.h"
#include "src/core/ext/xds/xds_route_config.h"
#include "src/core/lib/matchers/matchers.h"
#include "src/core/lib/transport/metadata_batch.h"

@ -23,7 +23,7 @@
#include <grpc/grpc_security.h>
#include "src/core/ext/xds/xds_api.h"
#include "src/core/lib/matchers/matchers.h"
#include "src/core/lib/security/credentials/credentials.h"
namespace grpc_core {

@ -346,6 +346,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/xds/xds_http_fault_filter.cc',
'src/core/ext/xds/xds_http_filters.cc',
'src/core/ext/xds/xds_listener.cc',
'src/core/ext/xds/xds_resource_type.cc',
'src/core/ext/xds/xds_route_config.cc',
'src/core/ext/xds/xds_routing.cc',
'src/core/ext/xds/xds_server_config_fetcher.cc',

@ -1732,6 +1732,7 @@ src/core/ext/xds/xds_http_filters.cc \
src/core/ext/xds/xds_http_filters.h \
src/core/ext/xds/xds_listener.cc \
src/core/ext/xds/xds_listener.h \
src/core/ext/xds/xds_resource_type.cc \
src/core/ext/xds/xds_resource_type.h \
src/core/ext/xds/xds_route_config.cc \
src/core/ext/xds/xds_route_config.h \

@ -1526,6 +1526,7 @@ src/core/ext/xds/xds_http_filters.cc \
src/core/ext/xds/xds_http_filters.h \
src/core/ext/xds/xds_listener.cc \
src/core/ext/xds/xds_listener.h \
src/core/ext/xds/xds_resource_type.cc \
src/core/ext/xds/xds_resource_type.h \
src/core/ext/xds/xds_route_config.cc \
src/core/ext/xds/xds_route_config.h \

Loading…
Cancel
Save