From d1448872fa0e4e17975d5fdf711c7e020a6974f0 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 6 Dec 2021 10:14:25 -0800 Subject: [PATCH] XdsClient: use XdsResourceType abstraction throughout XdsClient (#28165) * WIP * introduce XdsResourceType API and change Listener parsing to use it * converted RouteConfig parsing * convert cluster and endpoint parsing * cleanup * clang-format * attempt to work around compiler problems * move XdsResourceType to its own file, and move endpoint code out of XdsApi * move cluster parsing to its own file * move route config parsing to its own file * move listener parsing to its own file * clang-format * minor cleanup * plumbed XdsResourceType throughout XdsClient * a bit of cleanup * more cleanup * construct full resource names before calling XdsApi::CreateAdsRequest() * remove some unneeded code * clean up includes and have XdsResourceType initialize the upb symtab * more cleanup of unnecessary code * more cleanup * update comment * clang-format * add missing virtual dtor * fix build * remove comment * add missing virtual dtor --- BUILD | 1 + CMakeLists.txt | 1 + Makefile | 2 + build_autogenerated.yaml | 1 + config.m4 | 1 + config.w32 | 1 + gRPC-Core.podspec | 1 + grpc.gemspec | 1 + grpc.gyp | 1 + package.xml | 1 + src/core/ext/xds/upb_utils.h | 3 + src/core/ext/xds/xds_api.cc | 451 +----- src/core/ext/xds/xds_api.h | 143 +- src/core/ext/xds/xds_certificate_provider.h | 1 + src/core/ext/xds/xds_client.cc | 1391 ++++++----------- src/core/ext/xds/xds_client.h | 174 ++- src/core/ext/xds/xds_cluster.cc | 13 +- src/core/ext/xds/xds_cluster.h | 26 + src/core/ext/xds/xds_endpoint.cc | 15 +- src/core/ext/xds/xds_endpoint.h | 19 + src/core/ext/xds/xds_listener.cc | 12 +- src/core/ext/xds/xds_listener.h | 25 + src/core/ext/xds/xds_resource_type.cc | 71 + src/core/ext/xds/xds_resource_type.h | 64 +- src/core/ext/xds/xds_route_config.cc | 15 +- src/core/ext/xds/xds_route_config.h | 19 + src/core/ext/xds/xds_routing.h | 3 +- .../credentials/xds/xds_credentials.h | 2 +- src/python/grpcio/grpc_core_dependencies.py | 1 + tools/doxygen/Doxyfile.c++.internal | 1 + tools/doxygen/Doxyfile.core.internal | 1 + 31 files changed, 953 insertions(+), 1508 deletions(-) create mode 100644 src/core/ext/xds/xds_resource_type.cc diff --git a/BUILD b/BUILD index 6de1b94cff8..b56b31470f9 100644 --- a/BUILD +++ b/BUILD @@ -2621,6 +2621,7 @@ grpc_cc_library( "src/core/ext/xds/xds_http_fault_filter.cc", "src/core/ext/xds/xds_http_filters.cc", "src/core/ext/xds/xds_listener.cc", + "src/core/ext/xds/xds_resource_type.cc", "src/core/ext/xds/xds_route_config.cc", "src/core/ext/xds/xds_routing.cc", "src/core/lib/security/credentials/xds/xds_credentials.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index 4c1b78519ea..ced8b49f6a6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1855,6 +1855,7 @@ add_library(grpc src/core/ext/xds/xds_http_fault_filter.cc src/core/ext/xds/xds_http_filters.cc src/core/ext/xds/xds_listener.cc + src/core/ext/xds/xds_resource_type.cc src/core/ext/xds/xds_route_config.cc src/core/ext/xds/xds_routing.cc src/core/ext/xds/xds_server_config_fetcher.cc diff --git a/Makefile b/Makefile index 1f6f724f075..391f66cab98 100644 --- a/Makefile +++ b/Makefile @@ -1353,6 +1353,7 @@ LIBGRPC_SRC = \ src/core/ext/xds/xds_http_fault_filter.cc \ src/core/ext/xds/xds_http_filters.cc \ src/core/ext/xds/xds_listener.cc \ + src/core/ext/xds/xds_resource_type.cc \ src/core/ext/xds/xds_route_config.cc \ src/core/ext/xds/xds_routing.cc \ src/core/ext/xds/xds_server_config_fetcher.cc \ @@ -2963,6 +2964,7 @@ src/core/ext/xds/xds_endpoint.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_http_fault_filter.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_http_filters.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_listener.cc: $(OPENSSL_DEP) +src/core/ext/xds/xds_resource_type.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_route_config.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_routing.cc: $(OPENSSL_DEP) src/core/ext/xds/xds_server_config_fetcher.cc: $(OPENSSL_DEP) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 6f56edbd550..4661739ec5e 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -1318,6 +1318,7 @@ libs: - src/core/ext/xds/xds_http_fault_filter.cc - src/core/ext/xds/xds_http_filters.cc - src/core/ext/xds/xds_listener.cc + - src/core/ext/xds/xds_resource_type.cc - src/core/ext/xds/xds_route_config.cc - src/core/ext/xds/xds_routing.cc - src/core/ext/xds/xds_server_config_fetcher.cc diff --git a/config.m4 b/config.m4 index 48afe24357d..927915abb61 100644 --- a/config.m4 +++ b/config.m4 @@ -371,6 +371,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/xds/xds_http_fault_filter.cc \ src/core/ext/xds/xds_http_filters.cc \ src/core/ext/xds/xds_listener.cc \ + src/core/ext/xds/xds_resource_type.cc \ src/core/ext/xds/xds_route_config.cc \ src/core/ext/xds/xds_routing.cc \ src/core/ext/xds/xds_server_config_fetcher.cc \ diff --git a/config.w32 b/config.w32 index 25ee2fdd785..5ad01e4785d 100644 --- a/config.w32 +++ b/config.w32 @@ -337,6 +337,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\xds\\xds_http_fault_filter.cc " + "src\\core\\ext\\xds\\xds_http_filters.cc " + "src\\core\\ext\\xds\\xds_listener.cc " + + "src\\core\\ext\\xds\\xds_resource_type.cc " + "src\\core\\ext\\xds\\xds_route_config.cc " + "src\\core\\ext\\xds\\xds_routing.cc " + "src\\core\\ext\\xds\\xds_server_config_fetcher.cc " + diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 8a469082b3e..2f4761ae343 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -834,6 +834,7 @@ Pod::Spec.new do |s| 'src/core/ext/xds/xds_http_filters.h', 'src/core/ext/xds/xds_listener.cc', 'src/core/ext/xds/xds_listener.h', + 'src/core/ext/xds/xds_resource_type.cc', 'src/core/ext/xds/xds_resource_type.h', 'src/core/ext/xds/xds_route_config.cc', 'src/core/ext/xds/xds_route_config.h', diff --git a/grpc.gemspec b/grpc.gemspec index 2db969d01a8..008a4fbb627 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -753,6 +753,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/xds/xds_http_filters.h ) s.files += %w( src/core/ext/xds/xds_listener.cc ) s.files += %w( src/core/ext/xds/xds_listener.h ) + s.files += %w( src/core/ext/xds/xds_resource_type.cc ) s.files += %w( src/core/ext/xds/xds_resource_type.h ) s.files += %w( src/core/ext/xds/xds_route_config.cc ) s.files += %w( src/core/ext/xds/xds_route_config.h ) diff --git a/grpc.gyp b/grpc.gyp index afb723335b6..5b91b16d56f 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -808,6 +808,7 @@ 'src/core/ext/xds/xds_http_fault_filter.cc', 'src/core/ext/xds/xds_http_filters.cc', 'src/core/ext/xds/xds_listener.cc', + 'src/core/ext/xds/xds_resource_type.cc', 'src/core/ext/xds/xds_route_config.cc', 'src/core/ext/xds/xds_routing.cc', 'src/core/ext/xds/xds_server_config_fetcher.cc', diff --git a/package.xml b/package.xml index 9593268e7d4..0d3df25549c 100644 --- a/package.xml +++ b/package.xml @@ -733,6 +733,7 @@ + diff --git a/src/core/ext/xds/upb_utils.h b/src/core/ext/xds/upb_utils.h index d867eca5044..c6a90fbc64d 100644 --- a/src/core/ext/xds/upb_utils.h +++ b/src/core/ext/xds/upb_utils.h @@ -33,6 +33,9 @@ namespace grpc_core { class XdsClient; +// TODO(roth): Rethink this. All fields except symtab and arena should come +// from XdsClient, injected into XdsResourceType::Decode() somehow without +// passing through XdsApi code, maybe via the AdsResponseParser. struct XdsEncodingContext { XdsClient* client; // Used only for logging. Unsafe for dereferencing. TraceFlag* tracer; diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc index 0497cd3f4a6..5b9237cc6b7 100644 --- a/src/core/ext/xds/xds_api.cc +++ b/src/core/ext/xds/xds_api.cc @@ -18,64 +18,21 @@ #include "src/core/ext/xds/xds_api.h" -#include -#include -#include -#include +#include #include +#include #include "absl/strings/str_cat.h" -#include "absl/strings/str_format.h" -#include "absl/strings/str_join.h" -#include "absl/strings/str_split.h" #include "envoy/admin/v3/config_dump.upb.h" -#include "envoy/config/cluster/v3/circuit_breaker.upb.h" -#include "envoy/config/cluster/v3/cluster.upb.h" -#include "envoy/config/cluster/v3/cluster.upbdefs.h" -#include "envoy/config/core/v3/address.upb.h" #include "envoy/config/core/v3/base.upb.h" -#include "envoy/config/core/v3/base.upbdefs.h" -#include "envoy/config/core/v3/config_source.upb.h" -#include "envoy/config/core/v3/health_check.upb.h" -#include "envoy/config/core/v3/protocol.upb.h" -#include "envoy/config/endpoint/v3/endpoint.upb.h" -#include "envoy/config/endpoint/v3/endpoint.upbdefs.h" -#include "envoy/config/endpoint/v3/endpoint_components.upb.h" #include "envoy/config/endpoint/v3/load_report.upb.h" -#include "envoy/config/listener/v3/api_listener.upb.h" -#include "envoy/config/listener/v3/listener.upb.h" -#include "envoy/config/listener/v3/listener.upbdefs.h" -#include "envoy/config/listener/v3/listener_components.upb.h" -#include "envoy/config/route/v3/route.upb.h" -#include "envoy/config/route/v3/route.upbdefs.h" -#include "envoy/config/route/v3/route_components.upb.h" -#include "envoy/config/route/v3/route_components.upbdefs.h" -#include "envoy/extensions/clusters/aggregate/v3/cluster.upb.h" -#include "envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h" -#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upb.h" -#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h" -#include "envoy/extensions/transport_sockets/tls/v3/common.upb.h" -#include "envoy/extensions/transport_sockets/tls/v3/tls.upb.h" -#include "envoy/extensions/transport_sockets/tls/v3/tls.upbdefs.h" -#include "envoy/service/cluster/v3/cds.upb.h" -#include "envoy/service/cluster/v3/cds.upbdefs.h" #include "envoy/service/discovery/v3/discovery.upb.h" #include "envoy/service/discovery/v3/discovery.upbdefs.h" -#include "envoy/service/endpoint/v3/eds.upb.h" -#include "envoy/service/endpoint/v3/eds.upbdefs.h" -#include "envoy/service/listener/v3/lds.upb.h" #include "envoy/service/load_stats/v3/lrs.upb.h" #include "envoy/service/load_stats/v3/lrs.upbdefs.h" -#include "envoy/service/route/v3/rds.upb.h" -#include "envoy/service/route/v3/rds.upbdefs.h" #include "envoy/service/status/v3/csds.upb.h" #include "envoy/service/status/v3/csds.upbdefs.h" -#include "envoy/type/matcher/v3/regex.upb.h" -#include "envoy/type/matcher/v3/string.upb.h" -#include "envoy/type/v3/percent.upb.h" -#include "envoy/type/v3/range.upb.h" #include "google/protobuf/any.upb.h" -#include "google/protobuf/duration.upb.h" #include "google/protobuf/struct.upb.h" #include "google/protobuf/timestamp.upb.h" #include "google/protobuf/wrappers.upb.h" @@ -83,16 +40,13 @@ #include "upb/text_encode.h" #include "upb/upb.h" #include "upb/upb.hpp" -#include "xds/type/v3/typed_struct.upb.h" #include #include #include #include "src/core/ext/xds/upb_utils.h" -#include "src/core/ext/xds/xds_cluster.h" #include "src/core/ext/xds/xds_common_types.h" -#include "src/core/ext/xds/xds_endpoint.h" #include "src/core/ext/xds/xds_resource_type.h" #include "src/core/ext/xds/xds_routing.h" #include "src/core/lib/address_utils/sockaddr_utils.h" @@ -107,108 +61,6 @@ namespace grpc_core { -// -// XdsApi -// - -// TODO(roth): All constants and functions for individual resource types -// should be merged into the XdsResourceType abstraction. -const char* XdsApi::kLdsTypeUrl = "envoy.config.listener.v3.Listener"; -const char* XdsApi::kRdsTypeUrl = "envoy.config.route.v3.RouteConfiguration"; -const char* XdsApi::kCdsTypeUrl = "envoy.config.cluster.v3.Cluster"; -const char* XdsApi::kEdsTypeUrl = - "envoy.config.endpoint.v3.ClusterLoadAssignment"; - -namespace { - -const char* kLdsV2TypeUrl = "envoy.api.v2.Listener"; -const char* kRdsV2TypeUrl = "envoy.api.v2.RouteConfiguration"; -const char* kCdsV2TypeUrl = "envoy.api.v2.Cluster"; -const char* kEdsV2TypeUrl = "envoy.api.v2.ClusterLoadAssignment"; - -bool IsLdsInternal(absl::string_view type_url, bool* is_v2 = nullptr) { - if (type_url == XdsApi::kLdsTypeUrl) return true; - if (type_url == kLdsV2TypeUrl) { - if (is_v2 != nullptr) *is_v2 = true; - return true; - } - return false; -} - -bool IsRdsInternal(absl::string_view type_url, bool* /*is_v2*/ = nullptr) { - return type_url == XdsApi::kRdsTypeUrl || type_url == kRdsV2TypeUrl; -} - -bool IsCdsInternal(absl::string_view type_url, bool* /*is_v2*/ = nullptr) { - return type_url == XdsApi::kCdsTypeUrl || type_url == kCdsV2TypeUrl; -} - -bool IsEdsInternal(absl::string_view type_url, bool* /*is_v2*/ = nullptr) { - return type_url == XdsApi::kEdsTypeUrl || type_url == kEdsV2TypeUrl; -} - -absl::string_view TypeUrlExternalToInternal(bool use_v3, - const std::string& type_url) { - if (!use_v3) { - if (type_url == XdsApi::kLdsTypeUrl) { - return kLdsV2TypeUrl; - } - if (type_url == XdsApi::kRdsTypeUrl) { - return kRdsV2TypeUrl; - } - if (type_url == XdsApi::kCdsTypeUrl) { - return kCdsV2TypeUrl; - } - if (type_url == XdsApi::kEdsTypeUrl) { - return kEdsV2TypeUrl; - } - } - return type_url; -} - -std::string TypeUrlInternalToExternal(absl::string_view type_url) { - if (type_url == kLdsV2TypeUrl) { - return XdsApi::kLdsTypeUrl; - } else if (type_url == kRdsV2TypeUrl) { - return XdsApi::kRdsTypeUrl; - } else if (type_url == kCdsV2TypeUrl) { - return XdsApi::kCdsTypeUrl; - } else if (type_url == kEdsV2TypeUrl) { - return XdsApi::kEdsTypeUrl; - } - return std::string(type_url); -} - -absl::StatusOr ParseResourceNameInternal( - absl::string_view name, - std::function is_expected_type) { - // Old-style names use the empty string for authority. - // authority is prefixed with "old:" to indicate that it's an old-style name. - if (!absl::StartsWith(name, "xdstp:")) { - return XdsApi::ResourceName{"old:", std::string(name)}; - } - // New style name. Parse URI. - auto uri = URI::Parse(name); - if (!uri.ok()) return uri.status(); - // Split the resource type off of the path to get the id. - std::pair path_parts = - absl::StrSplit(uri->path(), absl::MaxSplits('/', 1)); - if (!is_expected_type(path_parts.first, nullptr)) { - return absl::InvalidArgumentError( - "xdstp URI path must indicate valid xDS resource type"); - } - std::vector> query_parameters( - uri->query_parameter_map().begin(), uri->query_parameter_map().end()); - std::sort(query_parameters.begin(), query_parameters.end()); - return XdsApi::ResourceName{ - absl::StrCat("xdstp:", uri->authority()), - absl::StrCat( - path_parts.second, (query_parameters.empty() ? "?" : ""), - absl::StrJoin(query_parameters, "&", absl::PairFormatter("=")))}; -} - -} // namespace - // If gRPC is built with -DGRPC_XDS_USER_AGENT_NAME_SUFFIX="...", that string // will be appended to the user agent name reported to the xDS server. #ifdef GRPC_XDS_USER_AGENT_NAME_SUFFIX @@ -249,54 +101,12 @@ XdsApi::XdsApi(XdsClient* client, TraceFlag* tracer, // properly in logs. // Note: This won't actually work properly until upb adds support for // Any fields in textproto printing (internal b/178821188). - envoy_config_listener_v3_Listener_getmsgdef(symtab_.ptr()); - envoy_config_route_v3_RouteConfiguration_getmsgdef(symtab_.ptr()); - envoy_config_cluster_v3_Cluster_getmsgdef(symtab_.ptr()); - envoy_extensions_clusters_aggregate_v3_ClusterConfig_getmsgdef(symtab_.ptr()); - envoy_config_cluster_v3_Cluster_getmsgdef(symtab_.ptr()); - envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef(symtab_.ptr()); - envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_getmsgdef( - symtab_.ptr()); - envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_getmsgdef( - symtab_.ptr()); - // Load HTTP filter proto messages into the upb symtab. - XdsHttpFilterRegistry::PopulateSymtab(symtab_.ptr()); -} - -bool XdsApi::IsLds(absl::string_view type_url) { - return IsLdsInternal(type_url); -} - -bool XdsApi::IsRds(absl::string_view type_url) { - return IsRdsInternal(type_url); -} - -bool XdsApi::IsCds(absl::string_view type_url) { - return IsCdsInternal(type_url); -} - -bool XdsApi::IsEds(absl::string_view type_url) { - return IsEdsInternal(type_url); -} - -absl::StatusOr XdsApi::ParseResourceName( - absl::string_view name, bool (*is_expected_type)(absl::string_view)) { - return ParseResourceNameInternal( - name, [is_expected_type](absl::string_view type, bool*) { - return is_expected_type(type); + XdsResourceTypeRegistry::GetOrCreate()->ForEach( + [this](const XdsResourceType* type) { + type->InitUpbSymtab(symtab_.ptr()); }); } -std::string XdsApi::ConstructFullResourceName(absl::string_view authority, - absl::string_view resource_type, - absl::string_view name) { - if (absl::ConsumePrefix(&authority, "xdstp:")) { - return absl::StrCat("xdstp://", authority, "/", resource_type, "/", name); - } else { - return std::string(absl::StripPrefix(name, "old:")); - } -} - namespace { void PopulateMetadataValue(const XdsEncodingContext& context, @@ -467,11 +277,10 @@ grpc_slice SerializeDiscoveryRequest( } // namespace grpc_slice XdsApi::CreateAdsRequest( - const XdsBootstrap::XdsServer& server, const std::string& type_url, - const std::map>& resource_names, - const std::string& version, const std::string& nonce, - grpc_error_handle error, bool populate_node) { + const XdsBootstrap::XdsServer& server, absl::string_view type_url, + absl::string_view version, absl::string_view nonce, + const std::vector& resource_names, grpc_error_handle error, + bool populate_node) { upb::Arena arena; const XdsEncodingContext context = {client_, tracer_, @@ -483,12 +292,9 @@ grpc_slice XdsApi::CreateAdsRequest( envoy_service_discovery_v3_DiscoveryRequest* request = envoy_service_discovery_v3_DiscoveryRequest_new(arena.ptr()); // Set type_url. - absl::string_view real_type_url = - TypeUrlExternalToInternal(server.ShouldUseV3(), type_url); - std::string real_type_url_str = - absl::StrCat("type.googleapis.com/", real_type_url); + std::string type_url_str = absl::StrCat("type.googleapis.com/", type_url); envoy_service_discovery_v3_DiscoveryRequest_set_type_url( - request, StdStringToUpbString(real_type_url_str)); + request, StdStringToUpbString(type_url_str)); // Set version_info. if (!version.empty()) { envoy_service_discovery_v3_DiscoveryRequest_set_version_info( @@ -524,27 +330,10 @@ grpc_slice XdsApi::CreateAdsRequest( PopulateNode(context, node_, build_version_, user_agent_name_, user_agent_version_, node_msg); } - // A vector for temporary local storage of resource name strings. - std::vector resource_name_storage; - // Make sure the vector is sized right up-front, so that reallocations - // don't move the strings out from under the upb proto object that - // points to them. - size_t size = 0; - for (const auto& p : resource_names) { - size += p.second.size(); - } - resource_name_storage.reserve(size); // Add resource_names. - for (const auto& a : resource_names) { - absl::string_view authority = a.first; - for (const auto& p : a.second) { - absl::string_view resource_id = p; - resource_name_storage.push_back( - ConstructFullResourceName(authority, real_type_url, resource_id)); - envoy_service_discovery_v3_DiscoveryRequest_add_resource_names( - request, StdStringToUpbString(resource_name_storage.back()), - arena.ptr()); - } + for (const std::string& resource_name : resource_names) { + envoy_service_discovery_v3_DiscoveryRequest_add_resource_names( + request, StdStringToUpbString(resource_name), arena.ptr()); } MaybeLogDiscoveryRequest(context, request); return SerializeDiscoveryRequest(context, request); @@ -566,110 +355,11 @@ void MaybeLogDiscoveryResponse( } } -grpc_error_handle AdsResourceParse( - const XdsEncodingContext& context, XdsResourceType* type, size_t idx, - const google_protobuf_Any* resource_any, - const std::map>& - subscribed_resource_names, - std::function, std::string)> - add_result_func, - std::set* resource_names_failed) { - // Check the type_url of the resource. - absl::string_view type_url = absl::StripPrefix( - UpbStringToAbsl(google_protobuf_Any_type_url(resource_any)), - "type.googleapis.com/"); - bool is_v2 = false; - if (!type->IsType(type_url, &is_v2)) { - return GRPC_ERROR_CREATE_FROM_CPP_STRING( - absl::StrCat("resource index ", idx, ": found resource type ", type_url, - " in response for type ", type->type_url())); - } - // Parse the resource. - absl::string_view serialized_resource = - UpbStringToAbsl(google_protobuf_Any_value(resource_any)); - absl::StatusOr result = - type->Decode(context, serialized_resource, is_v2); - if (!result.ok()) { - return GRPC_ERROR_CREATE_FROM_CPP_STRING( - absl::StrCat("resource index ", idx, ": ", result.status().ToString())); - } - // Check the resource name. - auto resource_name = ParseResourceNameInternal( - result->name, [type](absl::string_view type_url, bool* is_v2) { - return type->IsType(type_url, is_v2); - }); - if (!resource_name.ok()) { - return GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat( - "resource index ", idx, ": Cannot parse xDS resource name \"", - result->name, "\"")); - } - // Ignore unexpected names. - auto iter = subscribed_resource_names.find(resource_name->authority); - if (iter == subscribed_resource_names.end() || - iter->second.find(resource_name->id) == iter->second.end()) { - return GRPC_ERROR_NONE; - } - // Check that resource was valid. - if (!result->resource.ok()) { - resource_names_failed->insert(*resource_name); - return GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat( - "resource index ", idx, ": ", result->name, - ": validation error: ", result->resource.status().ToString())); - } - // Add result. - grpc_error_handle error = add_result_func(result->name, *resource_name, - std::move(*result->resource), - std::string(serialized_resource)); - if (error != GRPC_ERROR_NONE) { - resource_names_failed->insert(*resource_name); - return grpc_error_add_child( - GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat( - "resource index ", idx, ": ", result->name, ": validation error")), - error); - } - return GRPC_ERROR_NONE; -} - -template -grpc_error_handle AddResult( - UpdateMap* update_map, absl::string_view resource_name_string, - XdsApi::ResourceName resource_name, - std::unique_ptr resource, - std::string serialized_resource) { - // Reject duplicate names. - if (update_map->find(resource_name) != update_map->end()) { - return GRPC_ERROR_CREATE_FROM_CPP_STRING( - absl::StrCat("duplicate resource name \"", resource_name_string, "\"")); - } - // Save result. - auto& resource_data = (*update_map)[resource_name]; - ResourceTypeData* typed_resource = - static_cast(resource.get()); - resource_data.resource = std::move(typed_resource->resource); - resource_data.serialized_proto = std::move(serialized_resource); - return GRPC_ERROR_NONE; -} - } // namespace -XdsApi::AdsParseResult XdsApi::ParseAdsResponse( - const XdsBootstrap::XdsServer& server, const grpc_slice& encoded_response, - const std::map>& - subscribed_listener_names, - const std::map>& - subscribed_route_config_names, - const std::map>& - subscribed_cluster_names, - const std::map>& - subscribed_eds_service_names) { - AdsParseResult result; +absl::Status XdsApi::ParseAdsResponse(const XdsBootstrap::XdsServer& server, + const grpc_slice& encoded_response, + AdsResponseParserInterface* parser) { upb::Arena arena; const XdsEncodingContext context = {client_, tracer_, @@ -682,99 +372,38 @@ XdsApi::AdsParseResult XdsApi::ParseAdsResponse( envoy_service_discovery_v3_DiscoveryResponse_parse( reinterpret_cast(GRPC_SLICE_START_PTR(encoded_response)), GRPC_SLICE_LENGTH(encoded_response), arena.ptr()); - // If decoding fails, output an empty type_url and return. + // If decoding fails, report a fatal error and return. if (response == nullptr) { - result.parse_error = - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode DiscoveryResponse."); - return result; + return absl::InvalidArgumentError("Can't decode DiscoveryResponse."); } MaybeLogDiscoveryResponse(context, response); - // Record the type_url, the version_info, and the nonce of the response. - result.type_url = TypeUrlInternalToExternal(absl::StripPrefix( + // Report the type_url, version, nonce, and number of resources to the parser. + AdsResponseParserInterface::AdsResponseFields fields; + fields.type_url = std::string(absl::StripPrefix( UpbStringToAbsl( envoy_service_discovery_v3_DiscoveryResponse_type_url(response)), "type.googleapis.com/")); - result.version = UpbStringToStdString( + fields.version = UpbStringToStdString( envoy_service_discovery_v3_DiscoveryResponse_version_info(response)); - result.nonce = UpbStringToStdString( + fields.nonce = UpbStringToStdString( envoy_service_discovery_v3_DiscoveryResponse_nonce(response)); - // Get the resources from the response. - std::vector errors; - size_t size; + size_t num_resources; const google_protobuf_Any* const* resources = - envoy_service_discovery_v3_DiscoveryResponse_resources(response, &size); - for (size_t i = 0; i < size; ++i) { - // Parse the response according to the resource type. - // TODO(roth): When we have time, change the API here to avoid the need - // for templating and conditionals. - grpc_error_handle parse_error = GRPC_ERROR_NONE; - if (IsLds(result.type_url)) { - XdsListenerResourceType resource_type; - auto& update_map = result.lds_update_map; - parse_error = AdsResourceParse( - context, &resource_type, i, resources[i], subscribed_listener_names, - [&update_map](absl::string_view resource_name_string, - XdsApi::ResourceName resource_name, - std::unique_ptr resource, - std::string serialized_resource) { - return AddResult( - &update_map, resource_name_string, std::move(resource_name), - std::move(resource), std::move(serialized_resource)); - }, - &result.resource_names_failed); - } else if (IsRds(result.type_url)) { - XdsRouteConfigResourceType resource_type; - auto& update_map = result.rds_update_map; - parse_error = AdsResourceParse( - context, &resource_type, i, resources[i], - subscribed_route_config_names, - [&update_map](absl::string_view resource_name_string, - XdsApi::ResourceName resource_name, - std::unique_ptr resource, - std::string serialized_resource) { - return AddResult( - &update_map, resource_name_string, std::move(resource_name), - std::move(resource), std::move(serialized_resource)); - }, - &result.resource_names_failed); - } else if (IsCds(result.type_url)) { - XdsClusterResourceType resource_type; - auto& update_map = result.cds_update_map; - parse_error = AdsResourceParse( - context, &resource_type, i, resources[i], subscribed_cluster_names, - [&update_map](absl::string_view resource_name_string, - XdsApi::ResourceName resource_name, - std::unique_ptr resource, - std::string serialized_resource) { - return AddResult( - &update_map, resource_name_string, std::move(resource_name), - std::move(resource), std::move(serialized_resource)); - }, - &result.resource_names_failed); - } else if (IsEds(result.type_url)) { - XdsEndpointResourceType resource_type; - auto& update_map = result.eds_update_map; - parse_error = AdsResourceParse( - context, &resource_type, i, resources[i], - subscribed_eds_service_names, - [&update_map](absl::string_view resource_name_string, - XdsApi::ResourceName resource_name, - std::unique_ptr resource, - std::string serialized_resource) { - return AddResult( - &update_map, resource_name_string, std::move(resource_name), - std::move(resource), std::move(serialized_resource)); - }, - &result.resource_names_failed); - } - if (parse_error != GRPC_ERROR_NONE) errors.push_back(parse_error); - } - result.parse_error = - GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing ADS response", &errors); - return result; + envoy_service_discovery_v3_DiscoveryResponse_resources(response, + &num_resources); + fields.num_resources = num_resources; + absl::Status status = parser->ProcessAdsResponseFields(std::move(fields)); + if (!status.ok()) return status; + // Process each resource. + for (size_t i = 0; i < num_resources; ++i) { + absl::string_view type_url = absl::StripPrefix( + UpbStringToAbsl(google_protobuf_Any_type_url(resources[i])), + "type.googleapis.com/"); + absl::string_view serialized_resource = + UpbStringToAbsl(google_protobuf_Any_value(resources[i])); + parser->ParseResource(context, i, type_url, serialized_resource); + } + return absl::OkStatus(); } namespace { diff --git a/src/core/ext/xds/xds_api.h b/src/core/ext/xds/xds_api.h index fee90af02cb..fd2ccdbf932 100644 --- a/src/core/ext/xds/xds_api.h +++ b/src/core/ext/xds/xds_api.h @@ -23,71 +23,48 @@ #include -#include "absl/container/inlined_vector.h" -#include "absl/types/optional.h" -#include "absl/types/variant.h" #include "envoy/admin/v3/config_dump.upb.h" -#include "re2/re2.h" #include "upb/def.hpp" -#include +#include -#include "src/core/ext/filters/client_channel/server_address.h" +#include "src/core/ext/xds/upb_utils.h" #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_client_stats.h" -#include "src/core/ext/xds/xds_cluster.h" -#include "src/core/ext/xds/xds_endpoint.h" -#include "src/core/ext/xds/xds_http_filters.h" -#include "src/core/ext/xds/xds_listener.h" -#include "src/core/ext/xds/xds_route_config.h" -#include "src/core/lib/channel/status_util.h" -#include "src/core/lib/matchers/matchers.h" namespace grpc_core { class XdsClient; +// TODO(roth): When we have time, split this into multiple pieces: +// - a common upb-based parsing framework (combine with XdsEncodingContext) +// - ADS request/response handling +// - LRS request/response handling +// - CSDS response generation class XdsApi { public: - static const char* kLdsTypeUrl; - static const char* kRdsTypeUrl; - static const char* kCdsTypeUrl; - static const char* kEdsTypeUrl; - - struct ResourceName { - std::string authority; - std::string id; - - bool operator<(const ResourceName& other) const { - if (authority < other.authority) return true; - if (id < other.id) return true; - return false; - } - }; - - struct LdsResourceData { - XdsListenerResource resource; - std::string serialized_proto; - }; - using LdsUpdateMap = std::map; + // Interface defined by caller and passed to ParseAdsResponse(). + class AdsResponseParserInterface { + public: + struct AdsResponseFields { + std::string type_url; + std::string version; + std::string nonce; + size_t num_resources; + }; - struct RdsResourceData { - XdsRouteConfigResource resource; - std::string serialized_proto; - }; - using RdsUpdateMap = std::map; + virtual ~AdsResponseParserInterface() = default; - struct CdsResourceData { - XdsClusterResource resource; - std::string serialized_proto; - }; - using CdsUpdateMap = std::map; + // Called when the top-level ADS fields are parsed. + // If this returns non-OK, parsing will stop, and the individual + // resources will not be processed. + virtual absl::Status ProcessAdsResponseFields(AdsResponseFields fields) = 0; - struct EdsResourceData { - XdsEndpointResource resource; - std::string serialized_proto; + // Called to parse each individual resource in the ADS response. + virtual void ParseResource(const XdsEncodingContext& context, size_t idx, + absl::string_view type_url, + absl::string_view serialized_resource) = 0; }; - using EdsUpdateMap = std::map; struct ClusterLoadReport { XdsClusterDropStats::Snapshot dropped_requests; @@ -156,69 +133,23 @@ class XdsApi { ResourceMetadata::ClientResourceStatus::NACKED, ""); - // If the response can't be parsed at the top level, the resulting - // type_url will be empty. - // If there is any other type of validation error, the parse_error - // field will be set to something other than GRPC_ERROR_NONE and the - // resource_names_failed field will be populated. - // Otherwise, one of the *_update_map fields will be populated, based - // on the type_url field. - struct AdsParseResult { - grpc_error_handle parse_error = GRPC_ERROR_NONE; - std::string version; - std::string nonce; - std::string type_url; - LdsUpdateMap lds_update_map; - RdsUpdateMap rds_update_map; - CdsUpdateMap cds_update_map; - EdsUpdateMap eds_update_map; - std::set resource_names_failed; - }; - XdsApi(XdsClient* client, TraceFlag* tracer, const XdsBootstrap::Node* node, const CertificateProviderStore::PluginDefinitionMap* map); - static bool IsLds(absl::string_view type_url); - static bool IsRds(absl::string_view type_url); - static bool IsCds(absl::string_view type_url); - static bool IsEds(absl::string_view type_url); - - // A helper method to parse the resource name and return back a ResourceName - // struct. Optionally the parser can check the resource type portion of the - // resource name. - static absl::StatusOr ParseResourceName( - absl::string_view name, - bool (*is_expected_type)(absl::string_view) = nullptr); - - // A helper method to construct the resource name from parts. - static std::string ConstructFullResourceName(absl::string_view authority, - absl::string_view resource_type, - absl::string_view name); - // Creates an ADS request. // Takes ownership of \a error. - grpc_slice CreateAdsRequest( - const XdsBootstrap::XdsServer& server, const std::string& type_url, - const std::map>& resource_names, - const std::string& version, const std::string& nonce, - grpc_error_handle error, bool populate_node); - - // Parses an ADS response. - AdsParseResult ParseAdsResponse( - const XdsBootstrap::XdsServer& server, const grpc_slice& encoded_response, - const std::map>& - subscribed_listener_names, - const std::map>& - subscribed_route_config_names, - const std::map>& - subscribed_cluster_names, - const std::map>& - subscribed_eds_service_names); + grpc_slice CreateAdsRequest(const XdsBootstrap::XdsServer& server, + absl::string_view type_url, + absl::string_view version, + absl::string_view nonce, + const std::vector& resource_names, + grpc_error_handle error, bool populate_node); + + // Returns non-OK when failing to deserialize response message. + // Otherwise, all events are reported to the parser. + absl::Status ParseAdsResponse(const XdsBootstrap::XdsServer& server, + const grpc_slice& encoded_response, + AdsResponseParserInterface* parser); // Creates an initial LRS request. grpc_slice CreateLrsInitialRequest(const XdsBootstrap::XdsServer& server); diff --git a/src/core/ext/xds/xds_certificate_provider.h b/src/core/ext/xds/xds_certificate_provider.h index 3e57b86d919..c2436ebaa99 100644 --- a/src/core/ext/xds/xds_certificate_provider.h +++ b/src/core/ext/xds/xds_certificate_provider.h @@ -22,6 +22,7 @@ #include #include "src/core/ext/xds/xds_api.h" +#include "src/core/lib/matchers/matchers.h" #include "src/core/lib/security/credentials/tls/grpc_tls_certificate_provider.h" #define GRPC_ARG_XDS_CERTIFICATE_PROVIDER \ diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 5315aa9f1bc..95cd22fc27d 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -76,6 +76,11 @@ const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr; XdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr; char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr; +const char* kLdsTypeUrl = "envoy.config.listener.v3.Listener"; +const char* kRdsTypeUrl = "envoy.config.route.v3.RouteConfiguration"; +const char* kCdsTypeUrl = "envoy.config.cluster.v3.Cluster"; +const char* kEdsTypeUrl = "envoy.config.endpoint.v3.ClusterLoadAssignment"; + } // namespace class XdsClient::Notifier { @@ -172,21 +177,52 @@ class XdsClient::ChannelState::AdsCallState XdsClient* xds_client() const { return chand()->xds_client(); } bool seen_response() const { return seen_response_; } - void SubscribeLocked(const std::string& type_url, - const XdsApi::ResourceName& name) + void SubscribeLocked(const XdsResourceType* type, const XdsResourceName& name) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - void UnsubscribeLocked(const std::string& type_url, - const XdsApi::ResourceName& name, - bool delay_unsubscription) + void UnsubscribeLocked(const XdsResourceType* type, + const XdsResourceName& name, bool delay_unsubscription) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); bool HasSubscribedResources() const; private: - class ResourceState : public InternallyRefCounted { + class AdsResponseParser : public XdsApi::AdsResponseParserInterface { + public: + struct Result { + const XdsResourceType* type; + std::string type_url; + std::string version; + std::string nonce; + std::vector errors; + std::map> + resources_seen; + bool have_valid_resources = false; + }; + + explicit AdsResponseParser(AdsCallState* ads_call_state) + : ads_call_state_(ads_call_state) {} + + absl::Status ProcessAdsResponseFields(AdsResponseFields fields) override; + + void ParseResource(const XdsEncodingContext& context, size_t idx, + absl::string_view type_url, + absl::string_view serialized_resource) override + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); + + Result TakeResult() { return std::move(result_); } + + private: + XdsClient* xds_client() const { return ads_call_state_->xds_client(); } + + AdsCallState* ads_call_state_; + const grpc_millis update_time_ = ExecCtx::Get()->Now(); + Result result_; + }; + + class ResourceTimer : public InternallyRefCounted { public: - ResourceState(const std::string& type_url, const XdsApi::ResourceName& name) - : type_url_(type_url), name_(name) { + ResourceTimer(const XdsResourceType* type, const XdsResourceName& name) + : type_(type), name_(name) { GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this, grpc_schedule_on_exec_ctx); } @@ -217,7 +253,7 @@ class XdsClient::ChannelState::AdsCallState private: static void OnTimer(void* arg, grpc_error_handle error) { - ResourceState* self = static_cast(arg); + ResourceTimer* self = static_cast(arg); { MutexLock lock(&self->ads_calld_->xds_client()->mu_); self->OnTimerLocked(GRPC_ERROR_REF(error)); @@ -234,9 +270,9 @@ class XdsClient::ChannelState::AdsCallState grpc_error_handle watcher_error = GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat( "timeout obtaining resource {type=%s name=%s} from xds server", - type_url_, - XdsApi::ConstructFullResourceName(name_.authority, type_url_, - name_.id))); + type_->type_url(), + XdsClient::ConstructFullXdsResourceName( + name_.authority, type_->type_url(), name_.id))); watcher_error = grpc_error_set_int( watcher_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { @@ -245,40 +281,17 @@ class XdsClient::ChannelState::AdsCallState } auto& authority_state = ads_calld_->xds_client()->authority_state_map_[name_.authority]; - if (type_url_ == XdsApi::kLdsTypeUrl) { - ListenerState& state = authority_state.listener_map[name_.id]; - state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; - Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( - ads_calld_->xds_client(), state.watchers, - GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION); - } else if (type_url_ == XdsApi::kRdsTypeUrl) { - RouteConfigState& state = authority_state.route_config_map[name_.id]; - state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; - Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( - ads_calld_->xds_client(), state.watchers, - GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION); - } else if (type_url_ == XdsApi::kCdsTypeUrl) { - ClusterState& state = authority_state.cluster_map[name_.id]; - state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; - Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( - ads_calld_->xds_client(), state.watchers, - GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION); - } else if (type_url_ == XdsApi::kEdsTypeUrl) { - EndpointState& state = authority_state.endpoint_map[name_.id]; - state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; - Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( - ads_calld_->xds_client(), state.watchers, - GRPC_ERROR_REF(watcher_error), DEBUG_LOCATION); - } else { - GPR_UNREACHABLE_CODE(return ); - } - GRPC_ERROR_UNREF(watcher_error); + ResourceState& state = authority_state.resource_map[type_][name_.id]; + state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST; + Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( + ads_calld_->xds_client(), state.watchers, watcher_error, + DEBUG_LOCATION); } GRPC_ERROR_UNREF(error); } - const std::string type_url_; - const XdsApi::ResourceName name_; + const XdsResourceType* type_; + const XdsResourceName name_; RefCountedPtr ads_calld_; bool timer_started_ = false; @@ -296,40 +309,11 @@ class XdsClient::ChannelState::AdsCallState // Subscribed resources of this type. std::map>> + std::map>> subscribed_resources; }; - void SendMessageLocked(const std::string& type_url) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - - void AcceptLdsUpdateLocked( - std::string version, grpc_millis update_time, - XdsApi::LdsUpdateMap lds_update_map, - const std::set& resource_names_failed) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - void AcceptRdsUpdateLocked(std::string version, grpc_millis update_time, - XdsApi::RdsUpdateMap rds_update_map) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - void AcceptCdsUpdateLocked( - std::string version, grpc_millis update_time, - XdsApi::CdsUpdateMap cds_update_map, - const std::set& resource_names_failed) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - void AcceptEdsUpdateLocked(std::string version, grpc_millis update_time, - XdsApi::EdsUpdateMap eds_update_map) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - - template - void RejectAdsUpdateHelperLocked(const std::string& resource_name, - grpc_millis update_time, - const XdsApi::AdsParseResult& result, - const std::string& error_details, - StateMap* state_map) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - - void RejectAdsUpdateLocked(grpc_millis update_time, - const XdsApi::AdsParseResult& result) + void SendMessageLocked(const XdsResourceType* type) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); static void OnRequestSent(void* arg, grpc_error_handle error); @@ -344,9 +328,9 @@ class XdsClient::ChannelState::AdsCallState bool IsCurrentCallOnChannel() const; - std::map> - ResourceNamesForRequest(const std::string& type_url); + // Constructs a list of resource names of a given type for an ADS + // request. Also starts the timer for each resource if needed. + std::vector ResourceNamesForRequest(const XdsResourceType* type); // The owning RetryableCall<>. RefCountedPtr> parent_; @@ -375,10 +359,10 @@ class XdsClient::ChannelState::AdsCallState grpc_closure on_status_received_; // Resource types for which requests need to be sent. - std::set buffered_requests_; + std::set buffered_requests_; // State for each resource type. - std::map state_map_; + std::map state_map_; }; // Contains an LRS call to the xds server. @@ -612,8 +596,8 @@ void XdsClient::ChannelState::CancelConnectivityWatchLocked() { client_channel->RemoveConnectivityWatcher(watcher_); } -void XdsClient::ChannelState::SubscribeLocked( - const std::string& type_url, const XdsApi::ResourceName& name) { +void XdsClient::ChannelState::SubscribeLocked(const XdsResourceType* type, + const XdsResourceName& name) { if (ads_calld_ == nullptr) { // Start the ADS call if this is the first request. ads_calld_.reset(new RetryableCall( @@ -627,16 +611,16 @@ void XdsClient::ChannelState::SubscribeLocked( // because when the call is restarted it will resend all necessary requests. if (ads_calld() == nullptr) return; // Subscribe to this resource if the ADS call is active. - ads_calld()->SubscribeLocked(type_url, name); + ads_calld()->SubscribeLocked(type, name); } -void XdsClient::ChannelState::UnsubscribeLocked( - const std::string& type_url, const XdsApi::ResourceName& name, - bool delay_unsubscription) { +void XdsClient::ChannelState::UnsubscribeLocked(const XdsResourceType* type, + const XdsResourceName& name, + bool delay_unsubscription) { if (ads_calld_ != nullptr) { auto* calld = ads_calld_->calld(); if (calld != nullptr) { - calld->UnsubscribeLocked(type_url, name, delay_unsubscription); + calld->UnsubscribeLocked(type, name, delay_unsubscription); if (!calld->HasSubscribedResources()) { ads_calld_.reset(); } @@ -747,6 +731,170 @@ void XdsClient::ChannelState::RetryableCall::OnRetryTimerLocked( GRPC_ERROR_UNREF(error); } +// +// XdsClient::ChannelState::AdsCallState::AdsResponseParser +// + +absl::Status XdsClient::ChannelState::AdsCallState::AdsResponseParser:: + ProcessAdsResponseFields(AdsResponseFields fields) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] received ADS response: type_url=%s, " + "version=%s, nonce=%s, num_resources=%" PRIuPTR, + ads_call_state_->xds_client(), fields.type_url.c_str(), + fields.version.c_str(), fields.nonce.c_str(), fields.num_resources); + } + result_.type = + XdsResourceTypeRegistry::GetOrCreate()->GetType(fields.type_url); + if (result_.type == nullptr) { + return absl::InvalidArgumentError( + absl::StrCat("unknown resource type ", fields.type_url)); + } + result_.type_url = std::move(fields.type_url); + result_.version = std::move(fields.version); + result_.nonce = std::move(fields.nonce); + return absl::OkStatus(); +} + +namespace { + +// Build a resource metadata struct for ADS result accepting methods and CSDS. +XdsApi::ResourceMetadata CreateResourceMetadataAcked( + std::string serialized_proto, std::string version, + grpc_millis update_time) { + XdsApi::ResourceMetadata resource_metadata; + resource_metadata.serialized_proto = std::move(serialized_proto); + resource_metadata.update_time = update_time; + resource_metadata.version = std::move(version); + resource_metadata.client_status = XdsApi::ResourceMetadata::ACKED; + return resource_metadata; +} + +// Update resource_metadata for NACK. +void UpdateResourceMetadataNacked(const std::string& version, + const std::string& details, + grpc_millis update_time, + XdsApi::ResourceMetadata* resource_metadata) { + resource_metadata->client_status = XdsApi::ResourceMetadata::NACKED; + resource_metadata->failed_version = version; + resource_metadata->failed_details = details; + resource_metadata->failed_update_time = update_time; +} + +} // namespace + +void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource( + const XdsEncodingContext& context, size_t idx, absl::string_view type_url, + absl::string_view serialized_resource) { + // Check the type_url of the resource. + bool is_v2 = false; + if (!result_.type->IsType(type_url, &is_v2)) { + result_.errors.emplace_back( + absl::StrCat("resource index ", idx, ": incorrect resource type ", + type_url, " (should be ", result_.type_url, ")")); + return; + } + // Parse the resource. + absl::StatusOr result = + result_.type->Decode(context, serialized_resource, is_v2); + if (!result.ok()) { + result_.errors.emplace_back( + absl::StrCat("resource index ", idx, ": ", result.status().ToString())); + return; + } + // Check the resource name. + auto resource_name = + XdsClient::ParseXdsResourceName(result->name, result_.type); + if (!resource_name.ok()) { + result_.errors.emplace_back(absl::StrCat( + "resource index ", idx, ": Cannot parse xDS resource name \"", + result->name, "\"")); + return; + } + // Cancel resource-does-not-exist timer, if needed. + auto timer_it = ads_call_state_->state_map_.find(result_.type); + if (timer_it != ads_call_state_->state_map_.end()) { + auto it = + timer_it->second.subscribed_resources.find(resource_name->authority); + if (it != timer_it->second.subscribed_resources.end()) { + auto res_it = it->second.find(resource_name->id); + if (res_it != it->second.end()) { + res_it->second->MaybeCancelTimer(); + } + } + } + // Lookup the authority in the cache. + auto authority_it = + xds_client()->authority_state_map_.find(resource_name->authority); + if (authority_it == xds_client()->authority_state_map_.end()) { + return; // Skip resource -- we don't have a subscription for it. + } + // Found authority, so look up type. + AuthorityState& authority_state = authority_it->second; + auto type_it = authority_state.resource_map.find(result_.type); + if (type_it == authority_state.resource_map.end()) { + return; // Skip resource -- we don't have a subscription for it. + } + auto& type_map = type_it->second; + // Found type, so look up resource id. + auto it = type_map.find(resource_name->id); + if (it == type_map.end()) { + return; // Skip resource -- we don't have a subscription for it. + } + ResourceState& resource_state = it->second; + // If needed, record that we've seen this resource. + if (result_.type->AllResourcesRequiredInSotW()) { + result_.resources_seen[resource_name->authority].insert(resource_name->id); + } + // Update resource state based on whether the resource is valid. + if (!result->resource.ok()) { + result_.errors.emplace_back(absl::StrCat( + "resource index ", idx, ": ", result->name, + ": validation error: ", result->resource.status().ToString())); + Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( + xds_client(), resource_state.watchers, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat( + "invalid resource: ", result->resource.status().ToString())), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), + DEBUG_LOCATION); + UpdateResourceMetadataNacked(result_.version, + result->resource.status().ToString(), + update_time_, &resource_state.meta); + return; + } + // Resource is valid. + result_.have_valid_resources = true; + // If it didn't change, ignore it. + if (resource_state.resource != nullptr && + result_.type->ResourcesEqual(resource_state.resource.get(), + result->resource->get())) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] %s resource %s identical to current, ignoring.", + xds_client(), result_.type_url.c_str(), result->name.c_str()); + } + return; + } + // Update the resource state. + resource_state.resource = std::move(*result->resource); + resource_state.meta = CreateResourceMetadataAcked( + std::string(serialized_resource), result_.version, update_time_); + // Notify watchers. + auto& watchers_list = resource_state.watchers; + auto* value = + result_.type->CopyResource(resource_state.resource.get()).release(); + xds_client()->work_serializer_.Schedule( + [watchers_list, value]() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) { + for (const auto& p : watchers_list) { + p.first->OnResourceChanged(value); + } + delete value; + }, + DEBUG_LOCATION); +} + // // XdsClient::ChannelState::AdsCallState // @@ -804,21 +952,12 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( const std::string& authority = a.first; // Skip authorities that are not using this xDS channel. if (a.second.channel_state != chand()) continue; - for (const auto& l : a.second.listener_map) { - const std::string& listener_name = l.first; - SubscribeLocked(XdsApi::kLdsTypeUrl, {authority, listener_name}); - } - for (const auto& r : a.second.route_config_map) { - const std::string& route_config_name = r.first; - SubscribeLocked(XdsApi::kRdsTypeUrl, {authority, route_config_name}); - } - for (const auto& c : a.second.cluster_map) { - const std::string& cluster_name = c.first; - SubscribeLocked(XdsApi::kCdsTypeUrl, {authority, cluster_name}); - } - for (const auto& e : a.second.endpoint_map) { - const std::string& endpoint_name = e.first; - SubscribeLocked(XdsApi::kEdsTypeUrl, {authority, endpoint_name}); + for (const auto& t : a.second.resource_map) { + const XdsResourceType* type = t.first; + for (const auto& r : t.second) { + const std::string& resource_id = r.first; + SubscribeLocked(type, {authority, resource_id}); + } } } // Op: recv initial metadata. @@ -883,33 +1022,28 @@ void XdsClient::ChannelState::AdsCallState::Orphan() { } void XdsClient::ChannelState::AdsCallState::SendMessageLocked( - const std::string& type_url) + const XdsResourceType* type) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) { // Buffer message sending if an existing message is in flight. if (send_message_payload_ != nullptr) { - buffered_requests_.insert(type_url); + buffered_requests_.insert(type); return; } - auto& state = state_map_[type_url]; + auto& state = state_map_[type]; grpc_slice request_payload_slice; - std::map> - resource_map = ResourceNamesForRequest(type_url); request_payload_slice = xds_client()->api_.CreateAdsRequest( - chand()->server_, type_url, resource_map, - chand()->resource_type_version_map_[type_url], state.nonce, - GRPC_ERROR_REF(state.error), !sent_initial_message_); - if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl && - type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) { - state_map_.erase(type_url); - } + chand()->server_, + chand()->server_.ShouldUseV3() ? type->type_url() : type->v2_type_url(), + chand()->resource_type_version_map_[type], state.nonce, + ResourceNamesForRequest(type), GRPC_ERROR_REF(state.error), + !sent_initial_message_); sent_initial_message_ = true; if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s " "error=%s", - xds_client(), type_url.c_str(), - chand()->resource_type_version_map_[type_url].c_str(), + xds_client(), std::string(type->type_url()).c_str(), + chand()->resource_type_version_map_[type].c_str(), state.nonce.c_str(), grpc_error_std_string(state.error).c_str()); } GRPC_ERROR_UNREF(state.error); @@ -937,25 +1071,24 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( } void XdsClient::ChannelState::AdsCallState::SubscribeLocked( - const std::string& type_url, const XdsApi::ResourceName& name) { - auto& state = - state_map_[type_url].subscribed_resources[name.authority][name.id]; + const XdsResourceType* type, const XdsResourceName& name) { + auto& state = state_map_[type].subscribed_resources[name.authority][name.id]; if (state == nullptr) { - state = MakeOrphanable(type_url, name); - SendMessageLocked(type_url); + state = MakeOrphanable(type, name); + SendMessageLocked(type); } } void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked( - const std::string& type_url, const XdsApi::ResourceName& name, + const XdsResourceType* type, const XdsResourceName& name, bool delay_unsubscription) { - auto& type_state_map = state_map_[type_url]; + auto& type_state_map = state_map_[type]; auto& authority_map = type_state_map.subscribed_resources[name.authority]; authority_map.erase(name.id); if (authority_map.empty()) { type_state_map.subscribed_resources.erase(name.authority); } - if (!delay_unsubscription) SendMessageLocked(type_url); + if (!delay_unsubscription) SendMessageLocked(type); } bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const { @@ -965,395 +1098,6 @@ bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const { return false; } -namespace { - -// Build a resource metadata struct for ADS result accepting methods and CSDS. -XdsApi::ResourceMetadata CreateResourceMetadataAcked( - std::string serialized_proto, std::string version, - grpc_millis update_time) { - XdsApi::ResourceMetadata resource_metadata; - resource_metadata.serialized_proto = std::move(serialized_proto); - resource_metadata.update_time = update_time; - resource_metadata.version = std::move(version); - resource_metadata.client_status = XdsApi::ResourceMetadata::ACKED; - return resource_metadata; -} - -} // namespace - -void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked( - std::string version, grpc_millis update_time, - XdsApi::LdsUpdateMap lds_update_map, - const std::set& resource_names_failed) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] LDS update received containing %" PRIuPTR - " resources", - xds_client(), lds_update_map.size()); - } - auto& lds_state = state_map_[XdsApi::kLdsTypeUrl]; - for (auto& p : lds_update_map) { - const XdsApi::ResourceName& name = p.first; - XdsListenerResource& lds_update = p.second.resource; - auto it = lds_state.subscribed_resources.find(name.authority); - if (it != lds_state.subscribed_resources.end()) { - auto res_it = it->second.find(name.id); - if (res_it != it->second.end()) { - res_it->second->MaybeCancelTimer(); - } - } - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: %s", xds_client(), - XdsApi::ConstructFullResourceName(name.authority, - XdsApi::kLdsTypeUrl, name.id) - .c_str(), - lds_update.ToString().c_str()); - } - ListenerState& listener_state = xds_client() - ->authority_state_map_[name.authority] - .listener_map[name.id]; - // Ignore identical update. - if (listener_state.update.has_value() && - *listener_state.update == lds_update) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] LDS update for %s identical to current, " - "ignoring.", - xds_client(), - XdsApi::ConstructFullResourceName(name.authority, - XdsApi::kLdsTypeUrl, name.id) - .c_str()); - } - continue; - } - // Update the listener state. - listener_state.update = std::move(lds_update); - listener_state.meta = CreateResourceMetadataAcked( - std::move(p.second.serialized_proto), version, update_time); - // Notify watchers. - auto& watchers_list = listener_state.watchers; - auto& value = listener_state.update.value(); - xds_client()->work_serializer_.Schedule( - [watchers_list, value]() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) { - for (const auto& p : watchers_list) { - p.first->OnListenerChanged(value); - } - }, - DEBUG_LOCATION); - } - // For invalid resources in the update, if they are already in the - // cache, pretend that they are present in the update, so that we - // don't incorrectly consider them deleted below. - for (const auto& name : resource_names_failed) { - auto& listener_map = - xds_client()->authority_state_map_[name.authority].listener_map; - auto it = listener_map.find(name.id); - if (it != listener_map.end()) { - auto& update = it->second.update; - if (!update.has_value()) continue; - lds_update_map[name]; - } - } - // For any subscribed resource that is not present in the update, - // remove it from the cache and notify watchers that it does not exist. - for (const auto& a : lds_state.subscribed_resources) { - const std::string& authority_name = a.first; - for (const auto& p : a.second) { - const std::string& listener_name = p.first; - if (lds_update_map.find({authority_name, listener_name}) == - lds_update_map.end()) { - ListenerState& listener_state = - xds_client() - ->authority_state_map_[authority_name] - .listener_map[listener_name]; - // If the resource was newly requested but has not yet been received, - // we don't want to generate an error for the watchers, because this LDS - // response may be in reaction to an earlier request that did not yet - // request the new resource, so its absence from the response does not - // necessarily indicate that the resource does not exist. - // For that case, we rely on the request timeout instead. - if (!listener_state.update.has_value()) continue; - listener_state.update.reset(); - Notifier::ScheduleNotifyWatchersOnResourceDoesNotExistInWorkSerializer( - xds_client(), listener_state.watchers, DEBUG_LOCATION); - } - } - } -} - -void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdateLocked( - std::string version, grpc_millis update_time, - XdsApi::RdsUpdateMap rds_update_map) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] RDS update received containing %" PRIuPTR - " resources", - xds_client(), rds_update_map.size()); - } - auto& rds_state = state_map_[XdsApi::kRdsTypeUrl]; - for (auto& p : rds_update_map) { - const XdsApi::ResourceName& name = p.first; - XdsRouteConfigResource& rds_update = p.second.resource; - auto it = rds_state.subscribed_resources.find(name.authority); - if (it != rds_state.subscribed_resources.end()) { - auto res_it = it->second.find(name.id); - if (res_it != it->second.end()) { - res_it->second->MaybeCancelTimer(); - } - } - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(), - rds_update.ToString().c_str()); - } - RouteConfigState& route_config_state = - xds_client() - ->authority_state_map_[name.authority] - .route_config_map[name.id]; - // Ignore identical update. - if (route_config_state.update.has_value() && - *route_config_state.update == rds_update) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] RDS resource identical to current, ignoring", - xds_client()); - } - continue; - } - // Update the cache. - route_config_state.update = std::move(rds_update); - route_config_state.meta = CreateResourceMetadataAcked( - std::move(p.second.serialized_proto), version, update_time); - // Notify all watchers. - auto& watchers_list = route_config_state.watchers; - auto& value = route_config_state.update.value(); - xds_client()->work_serializer_.Schedule( - [watchers_list, value]() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) { - for (const auto& p : watchers_list) { - p.first->OnRouteConfigChanged(value); - } - }, - DEBUG_LOCATION); - } -} - -void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked( - std::string version, grpc_millis update_time, - XdsApi::CdsUpdateMap cds_update_map, - const std::set& resource_names_failed) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] CDS update received containing %" PRIuPTR - " resources", - xds_client(), cds_update_map.size()); - } - auto& cds_state = state_map_[XdsApi::kCdsTypeUrl]; - for (auto& p : cds_update_map) { - const XdsApi::ResourceName& name = p.first; - XdsClusterResource& cds_update = p.second.resource; - auto it = cds_state.subscribed_resources.find(name.authority); - if (it != cds_state.subscribed_resources.end()) { - auto res_it = it->second.find(name.id); - if (res_it != it->second.end()) { - res_it->second->MaybeCancelTimer(); - } - } - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, "[xds_client %p] cluster=%s: %s", xds_client(), - XdsApi::ConstructFullResourceName(name.authority, - XdsApi::kCdsTypeUrl, name.id) - .c_str(), - cds_update.ToString().c_str()); - } - ClusterState& cluster_state = - xds_client()->authority_state_map_[name.authority].cluster_map[name.id]; - // Ignore identical update. - if (cluster_state.update.has_value() && - *cluster_state.update == cds_update) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] CDS update identical to current, ignoring.", - xds_client()); - } - continue; - } - // Update the cluster state. - cluster_state.update = std::move(cds_update); - cluster_state.meta = CreateResourceMetadataAcked( - std::move(p.second.serialized_proto), version, update_time); - // Notify all watchers. - auto& watchers_list = cluster_state.watchers; - auto& value = cluster_state.update.value(); - xds_client()->work_serializer_.Schedule( - [watchers_list, value]() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) { - for (const auto& p : watchers_list) { - p.first->OnClusterChanged(value); - } - }, - DEBUG_LOCATION); - } - // For invalid resources in the update, if they are already in the - // cache, pretend that they are present in the update, so that we - // don't incorrectly consider them deleted below. - for (const auto& name : resource_names_failed) { - auto& cluster_map = - xds_client()->authority_state_map_[name.authority].cluster_map; - auto it = cluster_map.find(name.id); - if (it != cluster_map.end()) { - auto& update = it->second.update; - if (!update.has_value()) continue; - cds_update_map[name]; - } - } - // For any subscribed resource that is not present in the update, - // remove it from the cache and notify watchers that it does not exist. - for (const auto& a : cds_state.subscribed_resources) { - const std::string& authority = a.first; - for (const auto& p : a.second) { - const std::string& cluster_name = p.first; - if (cds_update_map.find({authority, cluster_name}) == - cds_update_map.end()) { - ClusterState& cluster_state = xds_client() - ->authority_state_map_[authority] - .cluster_map[cluster_name]; - // If the resource was newly requested but has not yet been received, - // we don't want to generate an error for the watchers, because this CDS - // response may be in reaction to an earlier request that did not yet - // request the new resource, so its absence from the response does not - // necessarily indicate that the resource does not exist. - // For that case, we rely on the request timeout instead. - if (!cluster_state.update.has_value()) continue; - cluster_state.update.reset(); - Notifier::ScheduleNotifyWatchersOnResourceDoesNotExistInWorkSerializer( - xds_client(), cluster_state.watchers, DEBUG_LOCATION); - } - } - } -} - -void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdateLocked( - std::string version, grpc_millis update_time, - XdsApi::EdsUpdateMap eds_update_map) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] EDS update received containing %" PRIuPTR - " resources", - xds_client(), eds_update_map.size()); - } - auto& eds_state = state_map_[XdsApi::kEdsTypeUrl]; - for (auto& p : eds_update_map) { - const XdsApi::ResourceName& name = p.first; - XdsEndpointResource& eds_update = p.second.resource; - auto it = eds_state.subscribed_resources.find(name.authority); - if (it != eds_state.subscribed_resources.end()) { - auto res_it = it->second.find(name.id); - if (res_it != it->second.end()) { - res_it->second->MaybeCancelTimer(); - } - } - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(), - XdsApi::ConstructFullResourceName(name.authority, - XdsApi::kCdsTypeUrl, name.id) - .c_str(), - eds_update.ToString().c_str()); - } - EndpointState& endpoint_state = xds_client() - ->authority_state_map_[name.authority] - .endpoint_map[name.id]; - // Ignore identical update. - if (endpoint_state.update.has_value() && - *endpoint_state.update == eds_update) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] EDS update identical to current, ignoring.", - xds_client()); - } - continue; - } - // Update the cluster state. - endpoint_state.update = std::move(eds_update); - endpoint_state.meta = CreateResourceMetadataAcked( - std::move(p.second.serialized_proto), version, update_time); - // Notify all watchers. - auto& watchers_list = endpoint_state.watchers; - auto& value = endpoint_state.update.value(); - xds_client()->work_serializer_.Schedule( - [watchers_list, value]() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) { - for (const auto& p : watchers_list) { - p.first->OnEndpointChanged(value); - } - }, - DEBUG_LOCATION); - } -} - -namespace { - -// Update resource_metadata for NACK. -void UpdateResourceMetadataNacked(const std::string& version, - const std::string& details, - grpc_millis update_time, - XdsApi::ResourceMetadata* resource_metadata) { - resource_metadata->client_status = XdsApi::ResourceMetadata::NACKED; - resource_metadata->failed_version = version; - resource_metadata->failed_details = details; - resource_metadata->failed_update_time = update_time; -} - -} // namespace - -template -void XdsClient::ChannelState::AdsCallState::RejectAdsUpdateHelperLocked( - const std::string& resource_name, grpc_millis update_time, - const XdsApi::AdsParseResult& result, const std::string& error_details, - StateMap* state_map) { - auto it = state_map->find(resource_name); - if (it == state_map->end()) return; - auto& state = it->second; - Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer( - xds_client(), state.watchers, GRPC_ERROR_REF(result.parse_error), - DEBUG_LOCATION); - UpdateResourceMetadataNacked(result.version, error_details, update_time, - &state.meta); -} - -void XdsClient::ChannelState::AdsCallState::RejectAdsUpdateLocked( - grpc_millis update_time, const XdsApi::AdsParseResult& result) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] %s update NACKed containing %" PRIuPTR - " invalid resources", - xds_client(), result.type_url.c_str(), - result.resource_names_failed.size()); - } - std::string details = grpc_error_std_string(result.parse_error); - for (auto& resource : result.resource_names_failed) { - auto authority_it = - xds_client()->authority_state_map_.find(resource.authority); - if (authority_it == xds_client()->authority_state_map_.end()) continue; - AuthorityState& authority_state = authority_it->second; - if (result.type_url == XdsApi::kLdsTypeUrl) { - RejectAdsUpdateHelperLocked(resource.id, update_time, result, details, - &authority_state.listener_map); - } else if (result.type_url == XdsApi::kRdsTypeUrl) { - RejectAdsUpdateHelperLocked(resource.id, update_time, result, details, - &authority_state.route_config_map); - } else if (result.type_url == XdsApi::kCdsTypeUrl) { - RejectAdsUpdateHelperLocked(resource.id, update_time, result, details, - &authority_state.cluster_map); - } else if (result.type_url == XdsApi::kEdsTypeUrl) { - RejectAdsUpdateHelperLocked(resource.id, update_time, result, details, - &authority_state.endpoint_map); - } else { - GPR_ASSERT(0); - } - } -} - void XdsClient::ChannelState::AdsCallState::OnRequestSent( void* arg, grpc_error_handle error) { AdsCallState* ads_calld = static_cast(arg); @@ -1413,63 +1157,72 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { grpc_byte_buffer_destroy(recv_message_payload_); recv_message_payload_ = nullptr; // Parse and validate the response. - XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse( - chand()->server_, response_slice, - ResourceNamesForRequest(XdsApi::kLdsTypeUrl), - ResourceNamesForRequest(XdsApi::kRdsTypeUrl), - ResourceNamesForRequest(XdsApi::kCdsTypeUrl), - ResourceNamesForRequest(XdsApi::kEdsTypeUrl)); + AdsResponseParser parser(this); + absl::Status status = xds_client()->api_.ParseAdsResponse( + chand()->server_, response_slice, &parser); grpc_slice_unref_internal(response_slice); - if (result.type_url.empty()) { + if (!status.ok()) { // Ignore unparsable response. gpr_log(GPR_ERROR, "[xds_client %p] Error parsing ADS response (%s) -- ignoring", - xds_client(), grpc_error_std_string(result.parse_error).c_str()); - GRPC_ERROR_UNREF(result.parse_error); + xds_client(), status.ToString().c_str()); } else { - grpc_millis update_time = ExecCtx::Get()->Now(); + AdsResponseParser::Result result = parser.TakeResult(); // Update nonce. - auto& state = state_map_[result.type_url]; - state.nonce = std::move(result.nonce); - // If we got an error, we'll NACK the update. - if (result.parse_error != GRPC_ERROR_NONE) { + auto& state = state_map_[result.type]; + state.nonce = result.nonce; + // If we got an error, set state.error so that we'll NACK the update. + if (!result.errors.empty()) { + std::string error = absl::StrJoin(result.errors, "; "); gpr_log(GPR_ERROR, "[xds_client %p] ADS response invalid for resource type %s " "version %s, will NACK: nonce=%s error=%s", xds_client(), result.type_url.c_str(), result.version.c_str(), - state.nonce.c_str(), - grpc_error_std_string(result.parse_error).c_str()); - result.parse_error = - grpc_error_set_int(result.parse_error, GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAVAILABLE); + state.nonce.c_str(), error.c_str()); GRPC_ERROR_UNREF(state.error); - state.error = result.parse_error; - RejectAdsUpdateLocked(update_time, result); - } - // Process any valid resources. - bool have_valid_resources = false; - if (result.type_url == XdsApi::kLdsTypeUrl) { - have_valid_resources = !result.lds_update_map.empty(); - AcceptLdsUpdateLocked(result.version, update_time, - std::move(result.lds_update_map), - result.resource_names_failed); - } else if (result.type_url == XdsApi::kRdsTypeUrl) { - have_valid_resources = !result.rds_update_map.empty(); - AcceptRdsUpdateLocked(result.version, update_time, - std::move(result.rds_update_map)); - } else if (result.type_url == XdsApi::kCdsTypeUrl) { - have_valid_resources = !result.cds_update_map.empty(); - AcceptCdsUpdateLocked(result.version, update_time, - std::move(result.cds_update_map), - result.resource_names_failed); - } else if (result.type_url == XdsApi::kEdsTypeUrl) { - have_valid_resources = !result.eds_update_map.empty(); - AcceptEdsUpdateLocked(result.version, update_time, - std::move(result.eds_update_map)); + state.error = grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_CPP_STRING(std::move(error)), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); + } + // Delete resources not seen in update if needed. + if (result.type->AllResourcesRequiredInSotW()) { + for (auto& a : xds_client()->authority_state_map_) { + const std::string& authority = a.first; + AuthorityState& authority_state = a.second; + // Skip authorities that are not using this xDS channel. + if (authority_state.channel_state != chand()) continue; + auto seen_authority_it = result.resources_seen.find(authority); + // Find this resource type. + auto type_it = authority_state.resource_map.find(result.type); + if (type_it == authority_state.resource_map.end()) continue; + // Iterate over resource ids. + for (auto& r : type_it->second) { + const std::string& resource_id = r.first; + ResourceState& resource_state = r.second; + if (seen_authority_it == result.resources_seen.end() || + seen_authority_it->second.find(resource_id) == + seen_authority_it->second.end()) { + // If the resource was newly requested but has not yet been + // received, we don't want to generate an error for the watchers, + // because this ADS response may be in reaction to an earlier + // request that did not yet request the new resource, so its absence + // from the response does not necessarily indicate that the resource + // does not exist. For that case, we rely on the request timeout + // instead. + if (resource_state.resource == nullptr) continue; + resource_state.resource.reset(); + Notifier:: + ScheduleNotifyWatchersOnResourceDoesNotExistInWorkSerializer( + xds_client(), resource_state.watchers, DEBUG_LOCATION); + } + } + } } - if (have_valid_resources) { + // If we had valid resources, update the version. + if (result.have_valid_resources) { seen_response_ = true; - chand()->resource_type_version_map_[result.type_url] = result.version; + chand()->resource_type_version_map_[result.type] = + std::move(result.version); // Start load reporting if needed. auto& lrs_call = chand()->lrs_calld_; if (lrs_call != nullptr) { @@ -1478,7 +1231,7 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { } } // Send ACK or NACK. - SendMessageLocked(result.type_url); + SendMessageLocked(result.type); } if (xds_client()->shutting_down_) return true; // Keep listening for updates. @@ -1536,23 +1289,24 @@ bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const { return this == chand()->ads_calld_->calld(); } -std::map> +std::vector XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest( - const std::string& type_url) { - std::map> - resource_map; - auto it = state_map_.find(type_url); + const XdsResourceType* type) { + std::vector resource_names; + auto it = state_map_.find(type); if (it != state_map_.end()) { for (auto& a : it->second.subscribed_resources) { + const std::string& authority = a.first; for (auto& p : a.second) { - resource_map[a.first].insert(p.first); - OrphanablePtr& state = p.second; - state->MaybeStartTimer(Ref(DEBUG_LOCATION, "ResourceState")); + const std::string& resource_id = p.first; + resource_names.emplace_back(XdsClient::ConstructFullXdsResourceName( + authority, type->type_url(), resource_id)); + OrphanablePtr& resource_timer = p.second; + resource_timer->MaybeStartTimer(Ref(DEBUG_LOCATION, "ResourceTimer")); } } } - return resource_map; + return resource_names; } // @@ -2055,12 +1809,9 @@ void XdsClient::Orphan() { { MutexLock lock(&mu_); shutting_down_ = true; + // Clear cache and any remaining watchers that may not have been cancelled. authority_state_map_.clear(); - // We clear these invalid resource watchers as cancel never came. - invalid_listener_watchers_.clear(); - invalid_route_config_watchers_.clear(); - invalid_cluster_watchers_.clear(); - invalid_endpoint_watchers_.clear(); + invalid_watchers_.clear(); } } @@ -2077,19 +1828,18 @@ RefCountedPtr XdsClient::GetOrCreateChannelStateLocked( return channel_state; } -void XdsClient::WatchListenerData( - absl::string_view listener_name, - RefCountedPtr watcher) { - std::string listener_name_str = std::string(listener_name); - ListenerWatcherInterface* w = watcher.get(); - auto resource = XdsApi::ParseResourceName(listener_name, XdsApi::IsLds); - if (!resource.ok()) { +void XdsClient::WatchResource(const XdsResourceType* type, + absl::string_view name, + RefCountedPtr watcher) { + ResourceWatcherInterface* w = watcher.get(); + auto resource_name = ParseXdsResourceName(name, type); + if (!resource_name.ok()) { { MutexLock lock(&mu_); - invalid_listener_watchers_[w] = watcher; + invalid_watchers_[w] = watcher; } - grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat( - "Unable to parse resource name for listener %s", listener_name)); + grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING( + absl::StrFormat("Unable to parse resource name %s", name)); work_serializer_.Run( // TODO(yashykt): When we move to C++14, capture watcher using // std::move() @@ -2101,23 +1851,26 @@ void XdsClient::WatchListenerData( } { MutexLock lock(&mu_); - AuthorityState& authority_state = authority_state_map_[resource->authority]; - ListenerState& listener_state = authority_state.listener_map[resource->id]; - listener_state.watchers[w] = watcher; - // If we've already received an LDS update, notify the new watcher - // immediately. - if (listener_state.update.has_value()) { + // TODO(donnadionne): If we get a request for an authority that is not + // configured in the bootstrap file, reject it. + AuthorityState& authority_state = + authority_state_map_[resource_name->authority]; + ResourceState& resource_state = + authority_state.resource_map[type][resource_name->id]; + resource_state.watchers[w] = watcher; + // If we already have a cached value for the resource, notify the new + // watcher immediately. + if (resource_state.resource != nullptr) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s", this, - listener_name_str.c_str()); + std::string(name).c_str()); } - auto& value = listener_state.update.value(); + auto* value = type->CopyResource(resource_state.resource.get()).release(); work_serializer_.Schedule( - // TODO(yashykt): When we move to C++14, capture watcher using - // std::move() - [watcher, value]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) { - watcher->OnListenerChanged(value); + [watcher, value]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { + watcher->OnResourceChanged(value); + delete value; }, DEBUG_LOCATION); } @@ -2127,266 +1880,144 @@ void XdsClient::WatchListenerData( authority_state.channel_state = GetOrCreateChannelStateLocked(bootstrap_->server()); } - authority_state.channel_state->SubscribeLocked(XdsApi::kLdsTypeUrl, - *resource); + authority_state.channel_state->SubscribeLocked(type, *resource_name); } work_serializer_.DrainQueue(); } -void XdsClient::CancelListenerDataWatch(absl::string_view listener_name, - ListenerWatcherInterface* watcher, - bool delay_unsubscription) { +void XdsClient::CancelResourceWatch(const XdsResourceType* type, + absl::string_view name, + ResourceWatcherInterface* watcher, + bool delay_unsubscription) { + auto resource_name = ParseXdsResourceName(name, type); MutexLock lock(&mu_); - if (shutting_down_) return; - auto resource = XdsApi::ParseResourceName(listener_name, XdsApi::IsLds); - if (!resource.ok()) return; - auto& authority_state = authority_state_map_[resource->authority]; - ListenerState& listener_state = authority_state.listener_map[resource->id]; - auto it = listener_state.watchers.find(watcher); - if (it == listener_state.watchers.end()) { - invalid_listener_watchers_.erase(watcher); + if (!resource_name.ok()) { + invalid_watchers_.erase(watcher); return; } - listener_state.watchers.erase(it); - if (!listener_state.watchers.empty()) return; - authority_state.listener_map.erase(resource->id); - xds_server_channel_map_[bootstrap_->server()]->UnsubscribeLocked( - XdsApi::kLdsTypeUrl, *resource, delay_unsubscription); - if (!authority_state.HasSubscribedResources()) { - authority_state.channel_state.reset(); + if (shutting_down_) return; + // Find authority. + auto authority_it = authority_state_map_.find(resource_name->authority); + if (authority_it == authority_state_map_.end()) return; + AuthorityState& authority_state = authority_it->second; + // Find type map. + auto type_it = authority_state.resource_map.find(type); + if (type_it == authority_state.resource_map.end()) return; + auto& type_map = type_it->second; + // Find resource id. + auto resource_it = type_map.find(resource_name->id); + if (resource_it == type_map.end()) return; + ResourceState& resource_state = resource_it->second; + // Remove watcher. + resource_state.watchers.erase(watcher); + authority_state.channel_state->UnsubscribeLocked(type, *resource_name, + delay_unsubscription); + // Clean up empty map entries, if any. + if (resource_state.watchers.empty()) { + type_map.erase(resource_it); + if (type_map.empty()) { + authority_state.resource_map.erase(type_it); + if (authority_state.resource_map.empty()) { + authority_state.channel_state.reset(); + } + } } } +void XdsClient::WatchListenerData( + absl::string_view listener_name, + RefCountedPtr watcher) { + WatchResource(XdsResourceTypeRegistry::GetOrCreate()->GetType(kLdsTypeUrl), + listener_name, std::move(watcher)); +} + +void XdsClient::CancelListenerDataWatch(absl::string_view listener_name, + ListenerWatcherInterface* watcher, + bool delay_unsubscription) { + CancelResourceWatch( + XdsResourceTypeRegistry::GetOrCreate()->GetType(kLdsTypeUrl), + listener_name, watcher, delay_unsubscription); +} + void XdsClient::WatchRouteConfigData( absl::string_view route_config_name, RefCountedPtr watcher) { - std::string route_config_name_str = std::string(route_config_name); - RouteConfigWatcherInterface* w = watcher.get(); - auto resource = XdsApi::ParseResourceName(route_config_name, XdsApi::IsRds); - if (!resource.ok()) { - { - MutexLock lock(&mu_); - invalid_route_config_watchers_[w] = watcher; - } - grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING( - absl::StrFormat("Unable to parse resource name for route config %s", - route_config_name)); - work_serializer_.Run( - // TODO(yashykt): When we move to C++14, capture watcher using - // std::move() - [watcher, error]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) { - watcher->OnError(error); - }, - DEBUG_LOCATION); - return; - } - { - MutexLock lock(&mu_); - auto& authority_state = authority_state_map_[resource->authority]; - RouteConfigState& route_config_state = - authority_state.route_config_map[resource->id]; - route_config_state.watchers[w] = watcher; - // If we've already received an RDS update, notify the new watcher - // immediately. - if (route_config_state.update.has_value()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] returning cached route config data for %s", - this, route_config_name_str.c_str()); - } - auto& value = route_config_state.update.value(); - work_serializer_.Schedule( - [watcher, value]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) { - watcher->OnRouteConfigChanged(value); - }, - DEBUG_LOCATION); - } - // If the authority doesn't yet have a channel, set it, creating it if - // needed. - if (authority_state.channel_state == nullptr) { - authority_state.channel_state = - GetOrCreateChannelStateLocked(bootstrap_->server()); - } - authority_state.channel_state->SubscribeLocked(XdsApi::kRdsTypeUrl, - *resource); - } - work_serializer_.DrainQueue(); + WatchResource(XdsResourceTypeRegistry::GetOrCreate()->GetType(kRdsTypeUrl), + route_config_name, std::move(watcher)); } void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name, RouteConfigWatcherInterface* watcher, bool delay_unsubscription) { - MutexLock lock(&mu_); - if (shutting_down_) return; - auto resource = XdsApi::ParseResourceName(route_config_name, XdsApi::IsRds); - if (!resource.ok()) return; - auto& authority_state = authority_state_map_[resource->authority]; - RouteConfigState& route_config_state = - authority_state.route_config_map[resource->id]; - auto it = route_config_state.watchers.find(watcher); - if (it == route_config_state.watchers.end()) { - invalid_route_config_watchers_.erase(watcher); - return; - } - route_config_state.watchers.erase(it); - if (!route_config_state.watchers.empty()) return; - authority_state.route_config_map.erase(resource->id); - xds_server_channel_map_[bootstrap_->server()]->UnsubscribeLocked( - XdsApi::kRdsTypeUrl, *resource, delay_unsubscription); - if (!authority_state.HasSubscribedResources()) { - authority_state.channel_state.reset(); - } + CancelResourceWatch( + XdsResourceTypeRegistry::GetOrCreate()->GetType(kRdsTypeUrl), + route_config_name, watcher, delay_unsubscription); } void XdsClient::WatchClusterData( absl::string_view cluster_name, RefCountedPtr watcher) { - std::string cluster_name_str = std::string(cluster_name); - ClusterWatcherInterface* w = watcher.get(); - auto resource = XdsApi::ParseResourceName(cluster_name, XdsApi::IsCds); - if (!resource.ok()) { - { - MutexLock lock(&mu_); - invalid_cluster_watchers_[w] = watcher; - } - grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat( - "Unable to parse resource name for cluster %s", cluster_name)); - work_serializer_.Run([watcher, error]() ABSL_EXCLUSIVE_LOCKS_REQUIRED( - work_serializer_) { watcher->OnError(error); }, - DEBUG_LOCATION); - return; - } - { - MutexLock lock(&mu_); - auto& authority_state = authority_state_map_[resource->authority]; - ClusterState& cluster_state = authority_state.cluster_map[resource->id]; - cluster_state.watchers[w] = watcher; - // If we've already received a CDS update, notify the new watcher - // immediately. - if (cluster_state.update.has_value()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] returning cached cluster data for %s", this, - cluster_name_str.c_str()); - } - auto& value = cluster_state.update.value(); - work_serializer_.Schedule( - // TODO(yashykt): When we move to C++14, capture watcher using - // std::move() - [watcher, value]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) { - watcher->OnClusterChanged(value); - }, - DEBUG_LOCATION); - } - // If the authority doesn't yet have a channel, set it, creating it if - // needed. - if (authority_state.channel_state == nullptr) { - authority_state.channel_state = - GetOrCreateChannelStateLocked(bootstrap_->server()); - } - authority_state.channel_state->SubscribeLocked(XdsApi::kCdsTypeUrl, - *resource); - } - work_serializer_.DrainQueue(); + WatchResource(XdsResourceTypeRegistry::GetOrCreate()->GetType(kCdsTypeUrl), + cluster_name, std::move(watcher)); } void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name, ClusterWatcherInterface* watcher, bool delay_unsubscription) { - MutexLock lock(&mu_); - if (shutting_down_) return; - auto resource = XdsApi::ParseResourceName(cluster_name, XdsApi::IsCds); - if (!resource.ok()) return; - auto& authority_state = authority_state_map_[resource->authority]; - ClusterState& cluster_state = authority_state.cluster_map[resource->id]; - auto it = cluster_state.watchers.find(watcher); - if (it == cluster_state.watchers.end()) { - invalid_cluster_watchers_.erase(watcher); - return; - } - cluster_state.watchers.erase(it); - if (!cluster_state.watchers.empty()) return; - authority_state.cluster_map.erase(resource->id); - xds_server_channel_map_[bootstrap_->server()]->UnsubscribeLocked( - XdsApi::kCdsTypeUrl, *resource, delay_unsubscription); - if (!authority_state.HasSubscribedResources()) { - authority_state.channel_state.reset(); - } + CancelResourceWatch( + XdsResourceTypeRegistry::GetOrCreate()->GetType(kCdsTypeUrl), + cluster_name, watcher, delay_unsubscription); } void XdsClient::WatchEndpointData( absl::string_view eds_service_name, RefCountedPtr watcher) { - std::string eds_service_name_str = std::string(eds_service_name); - EndpointWatcherInterface* w = watcher.get(); - auto resource = XdsApi::ParseResourceName(eds_service_name, XdsApi::IsEds); - if (!resource.ok()) { - { - MutexLock lock(&mu_); - invalid_endpoint_watchers_[w] = watcher; - } - grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING( - absl::StrFormat("Unable to parse resource name for endpoint service %s", - eds_service_name)); - work_serializer_.Run([watcher, error]() ABSL_EXCLUSIVE_LOCKS_REQUIRED( - work_serializer_) { watcher->OnError(error); }, - DEBUG_LOCATION); - return; - } - { - MutexLock lock(&mu_); - auto& authority_state = authority_state_map_[resource->authority]; - EndpointState& endpoint_state = authority_state.endpoint_map[resource->id]; - endpoint_state.watchers[w] = watcher; - // If we've already received an EDS update, notify the new watcher - // immediately. - if (endpoint_state.update.has_value()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] returning cached endpoint data for %s", this, - eds_service_name_str.c_str()); - } - auto& value = endpoint_state.update.value(); - work_serializer_.Schedule( - [watcher, value]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) { - watcher->OnEndpointChanged(value); - }, - DEBUG_LOCATION); - } - // If the authority doesn't yet have a channel, set it, creating it if - // needed. - if (authority_state.channel_state == nullptr) { - authority_state.channel_state = - GetOrCreateChannelStateLocked(bootstrap_->server()); - } - authority_state.channel_state->SubscribeLocked(XdsApi::kEdsTypeUrl, - *resource); - } - work_serializer_.DrainQueue(); + WatchResource(XdsResourceTypeRegistry::GetOrCreate()->GetType(kEdsTypeUrl), + eds_service_name, std::move(watcher)); } void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name, EndpointWatcherInterface* watcher, bool delay_unsubscription) { - MutexLock lock(&mu_); - if (shutting_down_) return; - auto resource = XdsApi::ParseResourceName(eds_service_name, XdsApi::IsEds); - if (!resource.ok()) return; - auto& authority_state = authority_state_map_[resource->authority]; - EndpointState& endpoint_state = authority_state.endpoint_map[resource->id]; - auto it = endpoint_state.watchers.find(watcher); - if (it == endpoint_state.watchers.end()) { - invalid_endpoint_watchers_.erase(watcher); - return; - } - endpoint_state.watchers.erase(it); - if (!endpoint_state.watchers.empty()) return; - authority_state.endpoint_map.erase(resource->id); - xds_server_channel_map_[bootstrap_->server()]->UnsubscribeLocked( - XdsApi::kEdsTypeUrl, *resource, delay_unsubscription); - if (!authority_state.HasSubscribedResources()) { - authority_state.channel_state.reset(); - } + CancelResourceWatch( + XdsResourceTypeRegistry::GetOrCreate()->GetType(kEdsTypeUrl), + eds_service_name, watcher, delay_unsubscription); +} + +absl::StatusOr XdsClient::ParseXdsResourceName( + absl::string_view name, const XdsResourceType* type) { + // Old-style names use the empty string for authority. + // authority is prefixed with "old:" to indicate that it's an old-style name. + if (!absl::StartsWith(name, "xdstp:")) { + return XdsResourceName{"old:", std::string(name)}; + } + // New style name. Parse URI. + auto uri = URI::Parse(name); + if (!uri.ok()) return uri.status(); + // Split the resource type off of the path to get the id. + std::pair path_parts = + absl::StrSplit(uri->path(), absl::MaxSplits('/', 1)); + if (!type->IsType(path_parts.first, nullptr)) { + return absl::InvalidArgumentError( + "xdstp URI path must indicate valid xDS resource type"); + } + std::vector> query_parameters( + uri->query_parameter_map().begin(), uri->query_parameter_map().end()); + std::sort(query_parameters.begin(), query_parameters.end()); + return XdsResourceName{ + absl::StrCat("xdstp:", uri->authority()), + absl::StrCat( + path_parts.second, (query_parameters.empty() ? "?" : ""), + absl::StrJoin(query_parameters, "&", absl::PairFormatter("=")))}; +} + +std::string XdsClient::ConstructFullXdsResourceName( + absl::string_view authority, absl::string_view resource_type, + absl::string_view id) { + if (absl::ConsumePrefix(&authority, "xdstp:")) { + return absl::StrCat("xdstp://", authority, "/", resource_type, "/", id); + } + return std::string(id); } RefCountedPtr XdsClient::AddClusterDropStats( @@ -2419,9 +2050,11 @@ RefCountedPtr XdsClient::AddClusterDropStats( it->first.second /*eds_service_name*/); load_report_state.drop_stats = cluster_drop_stats.get(); } - auto resource = XdsApi::ParseResourceName(cluster_name, XdsApi::IsCds); - GPR_ASSERT(resource.ok()); - auto a = authority_state_map_.find(resource->authority); + auto resource_name = ParseXdsResourceName( + cluster_name, + XdsResourceTypeRegistry::GetOrCreate()->GetType(kCdsTypeUrl)); + GPR_ASSERT(resource_name.ok()); + auto a = authority_state_map_.find(resource_name->authority); if (a != authority_state_map_.end()) { a->second.channel_state->MaybeStartLrsCall(); } @@ -2481,9 +2114,11 @@ RefCountedPtr XdsClient::AddClusterLocalityStats( std::move(locality)); locality_state.locality_stats = cluster_locality_stats.get(); } - auto resource = XdsApi::ParseResourceName(cluster_name, XdsApi::IsCds); - GPR_ASSERT(resource.ok()); - auto a = authority_state_map_.find(resource->authority); + auto resource_name = ParseXdsResourceName( + cluster_name, + XdsResourceTypeRegistry::GetOrCreate()->GetType(kCdsTypeUrl)); + GPR_ASSERT(resource_name.ok()); + auto a = authority_state_map_.find(resource_name->authority); if (a != authority_state_map_.end()) { a->second.channel_state->MaybeStartLrsCall(); } @@ -2522,56 +2157,25 @@ void XdsClient::ResetBackoff() { } void XdsClient::NotifyOnErrorLocked(grpc_error_handle error) { - std::set> listener_watchers; - std::set> route_config_watchers; - std::set> cluster_watchers; - std::set> endpoint_watchers; - for (const auto& a : authority_state_map_) { - for (const auto& p : a.second.listener_map) { - const ListenerState& listener_state = p.second; - for (const auto& q : listener_state.watchers) { - listener_watchers.insert(q.second); - } - } - for (const auto& p : a.second.route_config_map) { - const RouteConfigState& route_config_state = p.second; - for (const auto& q : route_config_state.watchers) { - route_config_watchers.insert(q.second); - } - } - for (const auto& p : a.second.cluster_map) { - const ClusterState& cluster_state = p.second; - for (const auto& q : cluster_state.watchers) { - cluster_watchers.insert(q.second); - } - } - for (const auto& p : a.second.endpoint_map) { - const EndpointState& endpoint_state = p.second; - for (const auto& q : endpoint_state.watchers) { - endpoint_watchers.insert(q.second); + std::set> watchers; + for (const auto& a : authority_state_map_) { // authority + for (const auto& t : a.second.resource_map) { // type + for (const auto& r : t.second) { // resource id + for (const auto& w : r.second.watchers) { // watchers + watchers.insert(w.second); + } } } } work_serializer_.Schedule( - // TODO(yashykt): When we move to C++14, capture *_watchers using + // TODO(yashykt): When we move to C++14, capture watchers using // std::move() - [listener_watchers, route_config_watchers, cluster_watchers, - endpoint_watchers, error]() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) { - for (const auto& watcher : listener_watchers) { - watcher->OnError(GRPC_ERROR_REF(error)); - } - for (const auto& watcher : route_config_watchers) { - watcher->OnError(GRPC_ERROR_REF(error)); - } - for (const auto& watcher : cluster_watchers) { - watcher->OnError(GRPC_ERROR_REF(error)); - } - for (const auto& watcher : endpoint_watchers) { - watcher->OnError(GRPC_ERROR_REF(error)); - } - GRPC_ERROR_UNREF(error); - }, + [watchers, error]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) { + for (const auto& watcher : watchers) { + watcher->OnError(GRPC_ERROR_REF(error)); + } + GRPC_ERROR_UNREF(error); + }, DEBUG_LOCATION); } @@ -2659,35 +2263,18 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( std::string XdsClient::DumpClientConfigBinary() { MutexLock lock(&mu_); XdsApi::ResourceTypeMetadataMap resource_type_metadata_map; - auto& lds_map = resource_type_metadata_map[XdsApi::kLdsTypeUrl]; - auto& rds_map = resource_type_metadata_map[XdsApi::kRdsTypeUrl]; - auto& cds_map = resource_type_metadata_map[XdsApi::kCdsTypeUrl]; - auto& eds_map = resource_type_metadata_map[XdsApi::kEdsTypeUrl]; - for (auto& a : authority_state_map_) { + for (const auto& a : authority_state_map_) { // authority const std::string& authority = a.first; - // Collect resource metadata from listeners - for (auto& p : a.second.listener_map) { - const std::string& listener_name = p.first; - lds_map[XdsApi::ConstructFullResourceName( - authority, XdsApi::kLdsTypeUrl, listener_name)] = &p.second.meta; - } - // Collect resource metadata from route configs - for (auto& p : a.second.route_config_map) { - const std::string& route_config_name = p.first; - rds_map[XdsApi::ConstructFullResourceName( - authority, XdsApi::kRdsTypeUrl, route_config_name)] = &p.second.meta; - } - // Collect resource metadata from clusters - for (auto& p : a.second.cluster_map) { - const std::string& cluster_name = p.first; - cds_map[XdsApi::ConstructFullResourceName(authority, XdsApi::kCdsTypeUrl, - cluster_name)] = &p.second.meta; - } - // Collect resource metadata from endpoints - for (auto& p : a.second.endpoint_map) { - const std::string& endpoint_name = p.first; - eds_map[XdsApi::ConstructFullResourceName( - authority, XdsApi::kEdsTypeUrl, endpoint_name)] = &p.second.meta; + for (const auto& t : a.second.resource_map) { // type + const XdsResourceType* type = t.first; + auto& resource_metadata_map = + resource_type_metadata_map[type->type_url()]; + for (const auto& r : t.second) { // resource id + const std::string& resource_id = r.first; + const ResourceState& resource_state = r.second; + resource_metadata_map[ConstructFullXdsResourceName( + authority, type->type_url(), resource_id)] = &resource_state.meta; + } } } // Assemble config dump messages @@ -2698,9 +2285,23 @@ std::string XdsClient::DumpClientConfigBinary() { // accessors for global state // +namespace { + +void InitResourceTypeRegistry() { + auto* registry = XdsResourceTypeRegistry::GetOrCreate(); + registry->RegisterType(absl::make_unique()); + registry->RegisterType(absl::make_unique()); + registry->RegisterType(absl::make_unique()); + registry->RegisterType(absl::make_unique()); +} + +} // namespace + void XdsClientGlobalInit() { g_mu = new Mutex; XdsHttpFilterRegistry::Init(); + static gpr_once once = GPR_ONCE_INIT; + gpr_once_init(&once, InitResourceTypeRegistry); } // TODO(roth): Find a better way to clear the fallback config that does diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index 53400697dfa..143f0927588 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -28,6 +28,10 @@ #include "src/core/ext/xds/xds_api.h" #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_client_stats.h" +#include "src/core/ext/xds/xds_cluster.h" +#include "src/core/ext/xds/xds_endpoint.h" +#include "src/core/ext/xds/xds_listener.h" +#include "src/core/ext/xds/xds_route_config.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/gprpp/dual_ref_counted.h" #include "src/core/lib/gprpp/memory.h" @@ -43,10 +47,11 @@ extern TraceFlag grpc_xds_client_refcount_trace; class XdsClient : public DualRefCounted { public: - // Listener data watcher interface. Implemented by callers. - class ListenerWatcherInterface : public RefCounted { + // Resource watcher interface. Implemented by callers. + class ResourceWatcherInterface : public RefCounted { public: - virtual void OnListenerChanged(XdsListenerResource listener) + virtual void OnResourceChanged( + const XdsResourceType::ResourceData* resource) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; virtual void OnError(grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; @@ -54,38 +59,65 @@ class XdsClient : public DualRefCounted { ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; }; + // TODO(roth): Consider removing these resource-type-specific APIs in + // favor of some mechanism for automatic type-deduction for the generic + // API. + + // Listener data watcher interface. Implemented by callers. + class ListenerWatcherInterface : public ResourceWatcherInterface { + public: + virtual void OnListenerChanged(XdsListenerResource listener) = 0; + + private: + void OnResourceChanged( + const XdsResourceType::ResourceData* resource) override { + OnListenerChanged( + static_cast(resource) + ->resource); + } + }; + // RouteConfiguration data watcher interface. Implemented by callers. - class RouteConfigWatcherInterface - : public RefCounted { + class RouteConfigWatcherInterface : public ResourceWatcherInterface { public: - virtual void OnRouteConfigChanged(XdsRouteConfigResource route_config) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; - virtual void OnError(grpc_error_handle error) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; - virtual void OnResourceDoesNotExist() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; + virtual void OnRouteConfigChanged(XdsRouteConfigResource route_config) = 0; + + private: + void OnResourceChanged( + const XdsResourceType::ResourceData* resource) override { + OnRouteConfigChanged( + static_cast( + resource) + ->resource); + } }; // Cluster data watcher interface. Implemented by callers. - class ClusterWatcherInterface : public RefCounted { + class ClusterWatcherInterface : public ResourceWatcherInterface { public: - virtual void OnClusterChanged(XdsClusterResource cluster_data) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; - virtual void OnError(grpc_error_handle error) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; - virtual void OnResourceDoesNotExist() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; + virtual void OnClusterChanged(XdsClusterResource cluster_data) = 0; + + private: + void OnResourceChanged( + const XdsResourceType::ResourceData* resource) override { + OnClusterChanged( + static_cast(resource) + ->resource); + } }; // Endpoint data watcher interface. Implemented by callers. - class EndpointWatcherInterface : public RefCounted { + class EndpointWatcherInterface : public ResourceWatcherInterface { public: - virtual void OnEndpointChanged(XdsEndpointResource update) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; - virtual void OnError(grpc_error_handle error) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; - virtual void OnResourceDoesNotExist() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; + virtual void OnEndpointChanged(XdsEndpointResource update) = 0; + + private: + void OnResourceChanged( + const XdsResourceType::ResourceData* resource) override { + OnEndpointChanged( + static_cast(resource) + ->resource); + } }; // Factory function to get or create the global XdsClient instance. @@ -113,6 +145,20 @@ class XdsClient : public DualRefCounted { void Orphan() override; + // Start and cancel watch for a resource. + // The XdsClient takes ownership of the watcher, but the caller may + // keep a raw pointer to the watcher, which may be used only for + // cancellation. (Because the caller does not own the watcher, the + // pointer must not be used for any other purpose.) + // If the caller is going to start a new watch after cancelling the + // old one, it should set delay_unsubscription to true. + void WatchResource(const XdsResourceType* type, absl::string_view name, + RefCountedPtr watcher); + void CancelResourceWatch(const XdsResourceType* type, + absl::string_view listener_name, + ResourceWatcherInterface* watcher, + bool delay_unsubscription = false); + // Start and cancel listener data watch for a listener. // The XdsClient takes ownership of the watcher, but the caller may // keep a raw pointer to the watcher, which may be used only for @@ -204,6 +250,11 @@ class XdsClient : public DualRefCounted { const grpc_channel_args& args); private: + struct XdsResourceName { + std::string authority; + std::string id; + }; + // Contains a channel to the xds server and all the data related to the // channel. Holds a ref to the xds client object. class ChannelState : public DualRefCounted { @@ -234,11 +285,11 @@ class XdsClient : public DualRefCounted { void StartConnectivityWatchLocked(); void CancelConnectivityWatchLocked(); - void SubscribeLocked(const std::string& type_url, - const XdsApi::ResourceName& name) + void SubscribeLocked(const XdsResourceType* type, + const XdsResourceName& name) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); - void UnsubscribeLocked(const std::string& type_url, - const XdsApi::ResourceName& name, + void UnsubscribeLocked(const XdsResourceType* type, + const XdsResourceName& name, bool delay_unsubscription) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); @@ -260,55 +311,23 @@ class XdsClient : public DualRefCounted { OrphanablePtr> lrs_calld_; // Stores the most recent accepted resource version for each resource type. - std::map + std::map resource_type_version_map_; }; - struct ListenerState { - std::map> + struct ResourceState { + std::map> watchers; - // The latest data seen from LDS. - absl::optional update; - XdsApi::ResourceMetadata meta; - }; - - struct RouteConfigState { - std::map> - watchers; - // The latest data seen from RDS. - absl::optional update; - XdsApi::ResourceMetadata meta; - }; - - struct ClusterState { - std::map> - watchers; - // The latest data seen from CDS. - absl::optional update; - XdsApi::ResourceMetadata meta; - }; - - struct EndpointState { - std::map> - watchers; - // The latest data seen from EDS. - absl::optional update; + // The latest data seen for the resource. + std::unique_ptr resource; XdsApi::ResourceMetadata meta; }; struct AuthorityState { RefCountedPtr channel_state; - std::map listener_map; - std::map - route_config_map; - std::map cluster_map; - std::map endpoint_map; - - bool HasSubscribedResources() { - return !listener_map.empty() || !route_config_map.empty() || - !cluster_map.empty() || !endpoint_map.empty(); - } + std::map> + resource_map; }; struct LoadReportState { @@ -331,6 +350,12 @@ class XdsClient : public DualRefCounted { void NotifyOnErrorLocked(grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + static absl::StatusOr ParseXdsResourceName( + absl::string_view name, const XdsResourceType* type); + static std::string ConstructFullXdsResourceName( + absl::string_view authority, absl::string_view resource_type, + absl::string_view id); + XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked( bool send_all_clusters, const std::set& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); @@ -363,15 +388,8 @@ class XdsClient : public DualRefCounted { // Stores started watchers whose resource name was not parsed successfully, // waiting to be cancelled or reset in Orphan(). - std::map> - invalid_listener_watchers_ ABSL_GUARDED_BY(mu_); - std::map> - invalid_route_config_watchers_ ABSL_GUARDED_BY(mu_); - std::map> - invalid_cluster_watchers_ ABSL_GUARDED_BY(mu_); - std::map> - invalid_endpoint_watchers_ ABSL_GUARDED_BY(mu_); + std::map> + invalid_watchers_ ABSL_GUARDED_BY(mu_); bool shutting_down_ ABSL_GUARDED_BY(mu_) = false; }; diff --git a/src/core/ext/xds/xds_cluster.cc b/src/core/ext/xds/xds_cluster.cc index 328cab95503..5e7f2662441 100644 --- a/src/core/ext/xds/xds_cluster.cc +++ b/src/core/ext/xds/xds_cluster.cc @@ -420,7 +420,7 @@ absl::StatusOr XdsClusterResourceType::Decode( auto* resource = envoy_config_cluster_v3_Cluster_parse( serialized_resource.data(), serialized_resource.size(), context.arena); if (resource == nullptr) { - return absl::InvalidArgumentError("Can't parse Listener resource."); + return absl::InvalidArgumentError("Can't parse Cluster resource."); } MaybeLogCluster(context, resource); // Validate resource. @@ -431,9 +431,18 @@ absl::StatusOr XdsClusterResourceType::Decode( grpc_error_handle error = CdsResourceParse(context, resource, is_v2, &cluster_data->resource); if (error != GRPC_ERROR_NONE) { - result.resource = absl::InvalidArgumentError(grpc_error_std_string(error)); + std::string error_str = grpc_error_std_string(error); GRPC_ERROR_UNREF(error); + if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) { + gpr_log(GPR_ERROR, "[xds_client %p] invalid Cluster %s: %s", + context.client, result.name.c_str(), error_str.c_str()); + } + result.resource = absl::InvalidArgumentError(error_str); } else { + if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) { + gpr_log(GPR_INFO, "[xds_client %p] parsed Cluster %s: %s", context.client, + result.name.c_str(), cluster_data->resource.ToString().c_str()); + } result.resource = std::move(cluster_data); } return std::move(result); diff --git a/src/core/ext/xds/xds_cluster.h b/src/core/ext/xds/xds_cluster.h index de195113b69..3071340b13e 100644 --- a/src/core/ext/xds/xds_cluster.h +++ b/src/core/ext/xds/xds_cluster.h @@ -23,6 +23,9 @@ #include #include "absl/types/optional.h" +#include "envoy/config/cluster/v3/cluster.upbdefs.h" +#include "envoy/extensions/clusters/aggregate/v3/cluster.upbdefs.h" +#include "envoy/extensions/transport_sockets/tls/v3/tls.upbdefs.h" #include "src/core/ext/xds/xds_common_types.h" #include "src/core/ext/xds/xds_resource_type.h" @@ -94,6 +97,29 @@ class XdsClusterResourceType : public XdsResourceType { absl::StatusOr Decode(const XdsEncodingContext& context, absl::string_view serialized_resource, bool is_v2) const override; + + bool ResourcesEqual(const ResourceData* r1, + const ResourceData* r2) const override { + return static_cast(r1)->resource == + static_cast(r2)->resource; + } + + std::unique_ptr CopyResource( + const ResourceData* resource) const override { + auto* resource_copy = new ClusterData(); + resource_copy->resource = + static_cast(resource)->resource; + return std::unique_ptr(resource_copy); + } + + bool AllResourcesRequiredInSotW() const override { return true; } + + void InitUpbSymtab(upb_symtab* symtab) const override { + envoy_config_cluster_v3_Cluster_getmsgdef(symtab); + envoy_extensions_clusters_aggregate_v3_ClusterConfig_getmsgdef(symtab); + envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_getmsgdef( + symtab); + } }; } // namespace grpc_core diff --git a/src/core/ext/xds/xds_endpoint.cc b/src/core/ext/xds/xds_endpoint.cc index 1b438c2ea23..3d37c1b0e42 100644 --- a/src/core/ext/xds/xds_endpoint.cc +++ b/src/core/ext/xds/xds_endpoint.cc @@ -330,7 +330,8 @@ absl::StatusOr XdsEndpointResourceType::Decode( auto* resource = envoy_config_endpoint_v3_ClusterLoadAssignment_parse( serialized_resource.data(), serialized_resource.size(), context.arena); if (resource == nullptr) { - return absl::InvalidArgumentError("Can't parse Listener resource."); + return absl::InvalidArgumentError( + "Can't parse ClusterLoadAssignment resource."); } MaybeLogClusterLoadAssignment(context, resource); // Validate resource. @@ -341,9 +342,19 @@ absl::StatusOr XdsEndpointResourceType::Decode( grpc_error_handle error = EdsResourceParse(context, resource, is_v2, &endpoint_data->resource); if (error != GRPC_ERROR_NONE) { - result.resource = absl::InvalidArgumentError(grpc_error_std_string(error)); + std::string error_str = grpc_error_std_string(error); GRPC_ERROR_UNREF(error); + if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) { + gpr_log(GPR_ERROR, "[xds_client %p] invalid ClusterLoadAssignment %s: %s", + context.client, result.name.c_str(), error_str.c_str()); + } + result.resource = absl::InvalidArgumentError(error_str); } else { + if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) { + gpr_log(GPR_INFO, "[xds_client %p] parsed ClusterLoadAssignment %s: %s", + context.client, result.name.c_str(), + endpoint_data->resource.ToString().c_str()); + } result.resource = std::move(endpoint_data); } return std::move(result); diff --git a/src/core/ext/xds/xds_endpoint.h b/src/core/ext/xds/xds_endpoint.h index 089d3e1b096..9c230acdc05 100644 --- a/src/core/ext/xds/xds_endpoint.h +++ b/src/core/ext/xds/xds_endpoint.h @@ -24,6 +24,7 @@ #include #include "absl/container/inlined_vector.h" +#include "envoy/config/endpoint/v3/endpoint.upbdefs.h" #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/xds/xds_client_stats.h" @@ -125,6 +126,24 @@ class XdsEndpointResourceType : public XdsResourceType { absl::StatusOr Decode(const XdsEncodingContext& context, absl::string_view serialized_resource, bool is_v2) const override; + + bool ResourcesEqual(const ResourceData* r1, + const ResourceData* r2) const override { + return static_cast(r1)->resource == + static_cast(r2)->resource; + } + + std::unique_ptr CopyResource( + const ResourceData* resource) const override { + auto* resource_copy = new EndpointData(); + resource_copy->resource = + static_cast(resource)->resource; + return std::unique_ptr(resource_copy); + } + + void InitUpbSymtab(upb_symtab* symtab) const override { + envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef(symtab); + } }; } // namespace grpc_core diff --git a/src/core/ext/xds/xds_listener.cc b/src/core/ext/xds/xds_listener.cc index e51d2dec7c1..75d65b40d1e 100644 --- a/src/core/ext/xds/xds_listener.cc +++ b/src/core/ext/xds/xds_listener.cc @@ -1001,9 +1001,19 @@ absl::StatusOr XdsListenerResourceType::Decode( grpc_error_handle error = LdsResourceParse(context, resource, is_v2, &listener_data->resource); if (error != GRPC_ERROR_NONE) { - result.resource = absl::InvalidArgumentError(grpc_error_std_string(error)); + std::string error_str = grpc_error_std_string(error); GRPC_ERROR_UNREF(error); + if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) { + gpr_log(GPR_ERROR, "[xds_client %p] invalid Listener %s: %s", + context.client, result.name.c_str(), error_str.c_str()); + } + result.resource = absl::InvalidArgumentError(error_str); } else { + if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) { + gpr_log(GPR_INFO, "[xds_client %p] parsed Listener %s: %s", + context.client, result.name.c_str(), + listener_data->resource.ToString().c_str()); + } result.resource = std::move(listener_data); } return std::move(result); diff --git a/src/core/ext/xds/xds_listener.h b/src/core/ext/xds/xds_listener.h index d701de66154..ee211c77726 100644 --- a/src/core/ext/xds/xds_listener.h +++ b/src/core/ext/xds/xds_listener.h @@ -27,6 +27,8 @@ #include "absl/status/statusor.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" +#include "envoy/config/listener/v3/listener.upbdefs.h" +#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.upbdefs.h" #include "src/core/ext/xds/xds_common_types.h" #include "src/core/ext/xds/xds_http_filters.h" @@ -203,6 +205,29 @@ class XdsListenerResourceType : public XdsResourceType { absl::StatusOr Decode(const XdsEncodingContext& context, absl::string_view serialized_resource, bool is_v2) const override; + + bool ResourcesEqual(const ResourceData* r1, + const ResourceData* r2) const override { + return static_cast(r1)->resource == + static_cast(r2)->resource; + } + + std::unique_ptr CopyResource( + const ResourceData* resource) const override { + auto* resource_copy = new ListenerData(); + resource_copy->resource = + static_cast(resource)->resource; + return std::unique_ptr(resource_copy); + } + + bool AllResourcesRequiredInSotW() const override { return true; } + + void InitUpbSymtab(upb_symtab* symtab) const override { + envoy_config_listener_v3_Listener_getmsgdef(symtab); + envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_getmsgdef( + symtab); + XdsHttpFilterRegistry::PopulateSymtab(symtab); + } }; } // namespace grpc_core diff --git a/src/core/ext/xds/xds_resource_type.cc b/src/core/ext/xds/xds_resource_type.cc new file mode 100644 index 00000000000..d7497101835 --- /dev/null +++ b/src/core/ext/xds/xds_resource_type.cc @@ -0,0 +1,71 @@ +// +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include "src/core/ext/xds/xds_resource_type.h" + +#include + +#include "absl/strings/str_join.h" +#include "absl/strings/str_split.h" + +#include "src/core/lib/uri/uri_parser.h" + +namespace grpc_core { + +bool XdsResourceType::IsType(absl::string_view resource_type, + bool* is_v2) const { + if (resource_type == type_url()) return true; + if (resource_type == v2_type_url()) { + if (is_v2 != nullptr) *is_v2 = true; + return true; + } + return false; +} + +XdsResourceTypeRegistry* XdsResourceTypeRegistry::GetOrCreate() { + static XdsResourceTypeRegistry* registry = new XdsResourceTypeRegistry(); + return registry; +} + +const XdsResourceType* XdsResourceTypeRegistry::GetType( + absl::string_view resource_type) { + auto it = resource_types_.find(resource_type); + if (it != resource_types_.end()) return it->second.get(); + auto it2 = v2_resource_types_.find(resource_type); + if (it2 != v2_resource_types_.end()) return it2->second; + return nullptr; +} + +void XdsResourceTypeRegistry::RegisterType( + std::unique_ptr resource_type) { + GPR_ASSERT(resource_types_.find(resource_type->type_url()) == + resource_types_.end()); + GPR_ASSERT(v2_resource_types_.find(resource_type->v2_type_url()) == + v2_resource_types_.end()); + v2_resource_types_.emplace(resource_type->v2_type_url(), resource_type.get()); + resource_types_.emplace(resource_type->type_url(), std::move(resource_type)); +} + +void XdsResourceTypeRegistry::ForEach( + std::function func) { + for (const auto& p : resource_types_) { + func(p.second.get()); + } +} + +} // namespace grpc_core diff --git a/src/core/ext/xds/xds_resource_type.h b/src/core/ext/xds/xds_resource_type.h index d7819832159..cdeb0cca8c6 100644 --- a/src/core/ext/xds/xds_resource_type.h +++ b/src/core/ext/xds/xds_resource_type.h @@ -16,6 +16,7 @@ #include +#include #include #include @@ -29,6 +30,8 @@ namespace grpc_core { +// Interface for an xDS resource type. +// Used to inject type-specific logic into XdsClient. class XdsResourceType { public: // A base type for resource data. @@ -61,15 +64,58 @@ class XdsResourceType { const XdsEncodingContext& context, absl::string_view serialized_resource, bool is_v2) const = 0; - // Convenient method for checking if a resource type matches this type. - bool IsType(absl::string_view resource_type, bool* is_v2) const { - if (resource_type == type_url()) return true; - if (resource_type == v2_type_url()) { - if (is_v2 != nullptr) *is_v2 = true; - return true; - } - return false; - } + // Returns true if r1 and r2 are equal. + // Must be invoked only on resources returned by this object's Decode() + // method. + virtual bool ResourcesEqual(const ResourceData* r1, + const ResourceData* r2) const = 0; + + // Returns a copy of resource. + // Must be invoked only on resources returned by this object's Decode() + // method. + virtual std::unique_ptr CopyResource( + const ResourceData* resource) const = 0; + + // Indicates whether the resource type requires that all resources must + // be present in every SotW response from the server. If true, a + // response that does not include a previously seen resource will be + // interpreted as a deletion of that resource. + virtual bool AllResourcesRequiredInSotW() const { return false; } + + // Populate upb symtab with xDS proto messages that we want to print + // properly in logs. + // Note: This won't actually work properly until upb adds support for + // Any fields in textproto printing (internal b/178821188). + virtual void InitUpbSymtab(upb_symtab* symtab) const = 0; + + // Convenience method for checking if resource_type matches this type. + // Checks against both type_url() and v2_type_url(). + // If is_v2 is non-null, it will be set to true if matching v2_type_url(). + bool IsType(absl::string_view resource_type, bool* is_v2) const; +}; + +// Global registry of xDS resource types. +class XdsResourceTypeRegistry { + public: + // Gets the global registry, creating it if needed. + static XdsResourceTypeRegistry* GetOrCreate(); + + // Gets the type for resource_type, or null if the type is unknown. + const XdsResourceType* GetType(absl::string_view resource_type); + + // Registers a resource type. + // All types must be registered before they can be used in the XdsClient. + void RegisterType(std::unique_ptr resource_type); + + // Calls func for each resource type. + void ForEach(std::function func); + + private: + std::map> + resource_types_; + std::map + v2_resource_types_; }; } // namespace grpc_core diff --git a/src/core/ext/xds/xds_route_config.cc b/src/core/ext/xds/xds_route_config.cc index a9120b21b8b..618772c5d6a 100644 --- a/src/core/ext/xds/xds_route_config.cc +++ b/src/core/ext/xds/xds_route_config.cc @@ -960,7 +960,8 @@ XdsRouteConfigResourceType::Decode(const XdsEncodingContext& context, auto* resource = envoy_config_route_v3_RouteConfiguration_parse( serialized_resource.data(), serialized_resource.size(), context.arena); if (resource == nullptr) { - return absl::InvalidArgumentError("Can't parse Listener resource."); + return absl::InvalidArgumentError( + "Can't parse RouteConfiguration resource."); } MaybeLogRouteConfiguration(context, resource); // Validate resource. @@ -971,9 +972,19 @@ XdsRouteConfigResourceType::Decode(const XdsEncodingContext& context, grpc_error_handle error = XdsRouteConfigResource::Parse( context, resource, &route_config_data->resource); if (error != GRPC_ERROR_NONE) { - result.resource = absl::InvalidArgumentError(grpc_error_std_string(error)); + std::string error_str = grpc_error_std_string(error); GRPC_ERROR_UNREF(error); + if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) { + gpr_log(GPR_ERROR, "[xds_client %p] invalid RouteConfiguration %s: %s", + context.client, result.name.c_str(), error_str.c_str()); + } + result.resource = absl::InvalidArgumentError(error_str); } else { + if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) { + gpr_log(GPR_INFO, "[xds_client %p] parsed RouteConfiguration %s: %s", + context.client, result.name.c_str(), + route_config_data->resource.ToString().c_str()); + } result.resource = std::move(route_config_data); } return std::move(result); diff --git a/src/core/ext/xds/xds_route_config.h b/src/core/ext/xds/xds_route_config.h index 7adac7676f8..54f610253b7 100644 --- a/src/core/ext/xds/xds_route_config.h +++ b/src/core/ext/xds/xds_route_config.h @@ -26,6 +26,7 @@ #include "absl/types/optional.h" #include "absl/types/variant.h" #include "envoy/config/route/v3/route.upb.h" +#include "envoy/config/route/v3/route.upbdefs.h" #include "re2/re2.h" #include "src/core/ext/xds/xds_common_types.h" @@ -204,6 +205,24 @@ class XdsRouteConfigResourceType : public XdsResourceType { absl::StatusOr Decode(const XdsEncodingContext& context, absl::string_view serialized_resource, bool /*is_v2*/) const override; + + bool ResourcesEqual(const ResourceData* r1, + const ResourceData* r2) const override { + return static_cast(r1)->resource == + static_cast(r2)->resource; + } + + std::unique_ptr CopyResource( + const ResourceData* resource) const override { + auto* resource_copy = new RouteConfigData(); + resource_copy->resource = + static_cast(resource)->resource; + return std::unique_ptr(resource_copy); + } + + void InitUpbSymtab(upb_symtab* symtab) const override { + envoy_config_route_v3_RouteConfiguration_getmsgdef(symtab); + } }; } // namespace grpc_core diff --git a/src/core/ext/xds/xds_routing.h b/src/core/ext/xds/xds_routing.h index ba1fce3ceff..18df4ce44d9 100644 --- a/src/core/ext/xds/xds_routing.h +++ b/src/core/ext/xds/xds_routing.h @@ -27,7 +27,8 @@ #include -#include "src/core/ext/xds/xds_api.h" +#include "src/core/ext/xds/xds_listener.h" +#include "src/core/ext/xds/xds_route_config.h" #include "src/core/lib/matchers/matchers.h" #include "src/core/lib/transport/metadata_batch.h" diff --git a/src/core/lib/security/credentials/xds/xds_credentials.h b/src/core/lib/security/credentials/xds/xds_credentials.h index ee2e71c5837..a7525e69309 100644 --- a/src/core/lib/security/credentials/xds/xds_credentials.h +++ b/src/core/lib/security/credentials/xds/xds_credentials.h @@ -23,7 +23,7 @@ #include -#include "src/core/ext/xds/xds_api.h" +#include "src/core/lib/matchers/matchers.h" #include "src/core/lib/security/credentials/credentials.h" namespace grpc_core { diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index e053ec1bf11..91a14dde9ce 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -346,6 +346,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/xds/xds_http_fault_filter.cc', 'src/core/ext/xds/xds_http_filters.cc', 'src/core/ext/xds/xds_listener.cc', + 'src/core/ext/xds/xds_resource_type.cc', 'src/core/ext/xds/xds_route_config.cc', 'src/core/ext/xds/xds_routing.cc', 'src/core/ext/xds/xds_server_config_fetcher.cc', diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index d073abb62e1..65164414dd1 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1732,6 +1732,7 @@ src/core/ext/xds/xds_http_filters.cc \ src/core/ext/xds/xds_http_filters.h \ src/core/ext/xds/xds_listener.cc \ src/core/ext/xds/xds_listener.h \ +src/core/ext/xds/xds_resource_type.cc \ src/core/ext/xds/xds_resource_type.h \ src/core/ext/xds/xds_route_config.cc \ src/core/ext/xds/xds_route_config.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index aa0ec8593a0..a205835de3a 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1526,6 +1526,7 @@ src/core/ext/xds/xds_http_filters.cc \ src/core/ext/xds/xds_http_filters.h \ src/core/ext/xds/xds_listener.cc \ src/core/ext/xds/xds_listener.h \ +src/core/ext/xds/xds_resource_type.cc \ src/core/ext/xds/xds_resource_type.h \ src/core/ext/xds/xds_route_config.cc \ src/core/ext/xds/xds_route_config.h \