From e468b00c5682741379f8c79b7e85a3781ed27b55 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 8 Sep 2021 07:57:38 -0700 Subject: [PATCH] refactor xDS response parsing (#27272) * refactor xDS response parsing * fix build --- src/core/ext/xds/xds_api.cc | 1031 +++++++++++--------------- test/cpp/end2end/xds_end2end_test.cc | 33 +- 2 files changed, 431 insertions(+), 633 deletions(-) diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc index ecaf22ab64e..67bf1ed69f3 100644 --- a/src/core/ext/xds/xds_api.cc +++ b/src/core/ext/xds/xds_api.cc @@ -847,15 +847,15 @@ bool IsLds(absl::string_view type_url, bool* is_v2 = nullptr) { return false; } -bool IsRds(absl::string_view type_url) { +bool IsRds(absl::string_view type_url, bool* /*is_v2*/ = nullptr) { return type_url == XdsApi::kRdsTypeUrl || type_url == kRdsV2TypeUrl; } -bool IsCds(absl::string_view type_url) { +bool IsCds(absl::string_view type_url, bool* /*is_v2*/ = nullptr) { return type_url == XdsApi::kCdsTypeUrl || type_url == kCdsV2TypeUrl; } -bool IsEds(absl::string_view type_url) { +bool IsEds(absl::string_view type_url, bool* /*is_v2*/ = nullptr) { return type_url == XdsApi::kEdsTypeUrl || type_url == kEdsV2TypeUrl; } @@ -1207,6 +1207,18 @@ void MaybeLogDiscoveryResponse( } } +void MaybeLogListener(const EncodingContext& context, + const envoy_config_listener_v3_Listener* listener) { + if (GRPC_TRACE_FLAG_ENABLED(*context.tracer) && + gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) { + const upb_msgdef* msg_type = + envoy_config_listener_v3_Listener_getmsgdef(context.symtab); + char buf[10240]; + upb_text_encode(listener, msg_type, nullptr, 0, buf, sizeof(buf)); + gpr_log(GPR_DEBUG, "[xds_client %p] Listener: %s", context.client, buf); + } +} + void MaybeLogHttpConnectionManager( const EncodingContext& context, const envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager* @@ -1828,7 +1840,7 @@ grpc_error_handle RouteActionParse(const EncodingContext& context, grpc_error_handle RouteConfigParse( const EncodingContext& context, const envoy_config_route_v3_RouteConfiguration* route_config, - XdsApi::RdsUpdate* rds_update) { + bool /*is_v2*/, XdsApi::RdsUpdate* rds_update) { MaybeLogRouteConfiguration(context, route_config); // Get the virtual hosts. size_t num_virtual_hosts; @@ -2214,7 +2226,7 @@ grpc_error_handle HttpConnectionManagerParse( http_connection_manager_proto); XdsApi::RdsUpdate rds_update; grpc_error_handle error = - RouteConfigParse(context, route_config, &rds_update); + RouteConfigParse(context, route_config, is_v2, &rds_update); if (error != GRPC_ERROR_NONE) return error; http_connection_manager->rds_update = std::move(rds_update); return GRPC_ERROR_NONE; @@ -2247,7 +2259,7 @@ grpc_error_handle HttpConnectionManagerParse( return GRPC_ERROR_NONE; } -grpc_error_handle LdsResponseParseClient( +grpc_error_handle LdsResourceParseClient( const EncodingContext& context, const envoy_config_listener_v3_ApiListener* api_listener, bool is_v2, XdsApi::LdsUpdate* lds_update) { @@ -2714,7 +2726,7 @@ grpc_error_handle BuildFilterChainMap( return GRPC_ERROR_NONE; } -grpc_error_handle LdsResponseParseServer( +grpc_error_handle LdsResourceParseServer( const EncodingContext& context, const envoy_config_listener_v3_Listener* listener, bool is_v2, XdsApi::LdsUpdate* lds_update) { @@ -2763,166 +2775,31 @@ grpc_error_handle LdsResponseParseServer( return GRPC_ERROR_NONE; } -grpc_error_handle LdsResponseParse( +grpc_error_handle LdsResourceParse( const EncodingContext& context, - const envoy_service_discovery_v3_DiscoveryResponse* response, - const std::set& expected_listener_names, - XdsApi::LdsUpdateMap* lds_update_map, - std::set* resource_names_failed) { - std::vector errors; - // Get the resources from the response. - size_t size; - const google_protobuf_Any* const* resources = - envoy_service_discovery_v3_DiscoveryResponse_resources(response, &size); - for (size_t i = 0; i < size; ++i) { - // Check the type_url of the resource. - absl::string_view type_url = - UpbStringToAbsl(google_protobuf_Any_type_url(resources[i])); - bool is_v2 = false; - if (!IsLds(type_url, &is_v2)) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("resource index ", i, ": Resource is not LDS.") - .c_str())); - continue; - } - // Decode the listener. - const upb_strview encoded_listener = - google_protobuf_Any_value(resources[i]); - const envoy_config_listener_v3_Listener* listener = - envoy_config_listener_v3_Listener_parse( - encoded_listener.data, encoded_listener.size, context.arena); - if (listener == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("resource index ", i, ": Can't decode listener.") - .c_str())); - continue; - } - // Check listener name. Ignore unexpected listeners. - std::string listener_name = - UpbStringToStdString(envoy_config_listener_v3_Listener_name(listener)); - if (expected_listener_names.find(listener_name) == - expected_listener_names.end()) { - continue; - } - // Fail if listener name is duplicated. - if (lds_update_map->find(listener_name) != lds_update_map->end()) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("duplicate listener name \"", listener_name, "\"") - .c_str())); - resource_names_failed->insert(listener_name); - continue; - } - // Serialize into JSON and store it in the LdsUpdateMap - XdsApi::LdsResourceData& lds_resource_data = - (*lds_update_map)[listener_name]; - XdsApi::LdsUpdate& lds_update = lds_resource_data.resource; - lds_resource_data.serialized_proto = UpbStringToStdString(encoded_listener); - // Check whether it's a client or server listener. - const envoy_config_listener_v3_ApiListener* api_listener = - envoy_config_listener_v3_Listener_api_listener(listener); - const envoy_config_core_v3_Address* address = - envoy_config_listener_v3_Listener_address(listener); - if (api_listener != nullptr && address != nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(listener_name, - ": Listener has both address and ApiListener") - .c_str())); - resource_names_failed->insert(listener_name); - continue; - } - if (api_listener == nullptr && address == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(listener_name, - ": Listener has neither address nor ApiListener") - .c_str())); - resource_names_failed->insert(listener_name); - continue; - } - grpc_error_handle error = GRPC_ERROR_NONE; - if (api_listener != nullptr) { - error = LdsResponseParseClient(context, api_listener, is_v2, &lds_update); - } else { - error = LdsResponseParseServer(context, listener, is_v2, &lds_update); - } - if (error != GRPC_ERROR_NONE) { - errors.push_back(grpc_error_add_child( - GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(listener_name, ": validation error").c_str()), - error)); - resource_names_failed->insert(listener_name); - } + const envoy_config_listener_v3_Listener* listener, bool is_v2, + XdsApi::LdsUpdate* lds_update) { + // Check whether it's a client or server listener. + const envoy_config_listener_v3_ApiListener* api_listener = + envoy_config_listener_v3_Listener_api_listener(listener); + const envoy_config_core_v3_Address* address = + envoy_config_listener_v3_Listener_address(listener); + if (api_listener != nullptr && address != nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Listener has both address and ApiListener"); } - return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing LDS response", &errors); -} - -grpc_error_handle RdsResponseParse( - const EncodingContext& context, - const envoy_service_discovery_v3_DiscoveryResponse* response, - const std::set& expected_route_configuration_names, - XdsApi::RdsUpdateMap* rds_update_map, - std::set* resource_names_failed) { - std::vector errors; - // Get the resources from the response. - size_t size; - const google_protobuf_Any* const* resources = - envoy_service_discovery_v3_DiscoveryResponse_resources(response, &size); - for (size_t i = 0; i < size; ++i) { - // Check the type_url of the resource. - absl::string_view type_url = - UpbStringToAbsl(google_protobuf_Any_type_url(resources[i])); - if (!IsRds(type_url)) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("resource index ", i, ": Resource is not RDS.") - .c_str())); - continue; - } - // Decode the route_config. - const upb_strview encoded_route_config = - google_protobuf_Any_value(resources[i]); - const envoy_config_route_v3_RouteConfiguration* route_config = - envoy_config_route_v3_RouteConfiguration_parse( - encoded_route_config.data, encoded_route_config.size, - context.arena); - if (route_config == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("resource index ", i, ": Can't decode route_config.") - .c_str())); - continue; - } - // Check route_config_name. Ignore unexpected route_config. - std::string route_config_name = UpbStringToStdString( - envoy_config_route_v3_RouteConfiguration_name(route_config)); - if (expected_route_configuration_names.find(route_config_name) == - expected_route_configuration_names.end()) { - continue; - } - // Fail if route config name is duplicated. - if (rds_update_map->find(route_config_name) != rds_update_map->end()) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("duplicate route config name \"", route_config_name, - "\"") - .c_str())); - resource_names_failed->insert(route_config_name); - continue; - } - // Serialize into JSON and store it in the RdsUpdateMap - XdsApi::RdsResourceData& rds_resource_data = - (*rds_update_map)[route_config_name]; - XdsApi::RdsUpdate& rds_update = rds_resource_data.resource; - rds_resource_data.serialized_proto = - UpbStringToStdString(encoded_route_config); - // Parse the route_config. - grpc_error_handle error = - RouteConfigParse(context, route_config, &rds_update); - if (error != GRPC_ERROR_NONE) { - errors.push_back(grpc_error_add_child( - GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(route_config_name, ": validation error").c_str()), - error)); - resource_names_failed->insert(route_config_name); - } + if (api_listener == nullptr && address == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Listener has neither address nor ApiListener"); + } + // Validate Listener fields. + grpc_error_handle error = GRPC_ERROR_NONE; + if (api_listener != nullptr) { + error = LdsResourceParseClient(context, api_listener, is_v2, lds_update); + } else { + error = LdsResourceParseServer(context, listener, is_v2, lds_update); } - return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing RDS response", &errors); + return error; } grpc_error_handle UpstreamTlsContextParse( @@ -2973,380 +2850,258 @@ grpc_error_handle UpstreamTlsContextParse( return GRPC_ERROR_NONE; } -grpc_error_handle CdsResponseParse( +grpc_error_handle CdsLogicalDnsParse( + const envoy_config_cluster_v3_Cluster* cluster, + XdsApi::CdsUpdate* cds_update) { + const auto* load_assignment = + envoy_config_cluster_v3_Cluster_load_assignment(cluster); + if (load_assignment == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "load_assignment not present for LOGICAL_DNS cluster"); + } + size_t num_localities; + const auto* const* localities = + envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints(load_assignment, + &num_localities); + if (num_localities != 1) { + return GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("load_assignment for LOGICAL_DNS cluster must have " + "exactly one locality, found ", + num_localities) + .c_str()); + } + size_t num_endpoints; + const auto* const* endpoints = + envoy_config_endpoint_v3_LocalityLbEndpoints_lb_endpoints(localities[0], + &num_endpoints); + if (num_endpoints != 1) { + return GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("locality for LOGICAL_DNS cluster must have " + "exactly one endpoint, found ", + num_endpoints) + .c_str()); + } + const auto* endpoint = + envoy_config_endpoint_v3_LbEndpoint_endpoint(endpoints[0]); + if (endpoint == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "LbEndpoint endpoint field not set"); + } + const auto* address = envoy_config_endpoint_v3_Endpoint_address(endpoint); + if (address == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Endpoint address field not set"); + } + const auto* socket_address = + envoy_config_core_v3_Address_socket_address(address); + if (socket_address == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Address socket_address field not set"); + } + if (envoy_config_core_v3_SocketAddress_resolver_name(socket_address).size != + 0) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "LOGICAL_DNS clusters must NOT have a custom resolver name set"); + } + absl::string_view address_str = UpbStringToAbsl( + envoy_config_core_v3_SocketAddress_address(socket_address)); + if (address_str.empty()) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "SocketAddress address field not set"); + } + if (!envoy_config_core_v3_SocketAddress_has_port_value(socket_address)) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "SocketAddress port_value field not set"); + } + cds_update->dns_hostname = JoinHostPort( + address_str, + envoy_config_core_v3_SocketAddress_port_value(socket_address)); + return GRPC_ERROR_NONE; +} + +grpc_error_handle CdsResourceParse( const EncodingContext& context, - const envoy_service_discovery_v3_DiscoveryResponse* response, - const std::set& expected_cluster_names, - XdsApi::CdsUpdateMap* cds_update_map, - std::set* resource_names_failed) { + const envoy_config_cluster_v3_Cluster* cluster, bool /*is_v2*/, + XdsApi::CdsUpdate* cds_update) { std::vector errors; - // Get the resources from the response. - size_t size; - const google_protobuf_Any* const* resources = - envoy_service_discovery_v3_DiscoveryResponse_resources(response, &size); - // Parse all the resources in the CDS response. - for (size_t i = 0; i < size; ++i) { - // Check the type_url of the resource. - absl::string_view type_url = - UpbStringToAbsl(google_protobuf_Any_type_url(resources[i])); - if (!IsCds(type_url)) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("resource index ", i, ": Resource is not CDS.") - .c_str())); - continue; - } - // Decode the cluster. - const upb_strview encoded_cluster = google_protobuf_Any_value(resources[i]); - const envoy_config_cluster_v3_Cluster* cluster = - envoy_config_cluster_v3_Cluster_parse( - encoded_cluster.data, encoded_cluster.size, context.arena); - if (cluster == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("resource index ", i, ": Can't decode cluster.") - .c_str())); - continue; - } - MaybeLogCluster(context, cluster); - // Ignore unexpected cluster names. - std::string cluster_name = - UpbStringToStdString(envoy_config_cluster_v3_Cluster_name(cluster)); - if (expected_cluster_names.find(cluster_name) == - expected_cluster_names.end()) { - continue; - } - // Fail on duplicate resources. - if (cds_update_map->find(cluster_name) != cds_update_map->end()) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("duplicate resource name \"", cluster_name, "\"") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - // Add the cluster to cds_update_map. - XdsApi::CdsResourceData& cds_resource_data = - (*cds_update_map)[cluster_name]; - XdsApi::CdsUpdate& cds_update = cds_resource_data.resource; - // Store serialized proto. - cds_resource_data.serialized_proto = UpbStringToStdString(encoded_cluster); - // Check the cluster_discovery_type. - if (!envoy_config_cluster_v3_Cluster_has_type(cluster) && - !envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": DiscoveryType not found.").c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - if (envoy_config_cluster_v3_Cluster_type(cluster) == - envoy_config_cluster_v3_Cluster_EDS) { - cds_update.cluster_type = XdsApi::CdsUpdate::ClusterType::EDS; - // Check the EDS config source. - const envoy_config_cluster_v3_Cluster_EdsClusterConfig* - eds_cluster_config = - envoy_config_cluster_v3_Cluster_eds_cluster_config(cluster); - const envoy_config_core_v3_ConfigSource* eds_config = - envoy_config_cluster_v3_Cluster_EdsClusterConfig_eds_config( - eds_cluster_config); - if (!envoy_config_core_v3_ConfigSource_has_ads(eds_config)) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": EDS ConfigSource is not ADS.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - // Record EDS service_name (if any). - upb_strview service_name = - envoy_config_cluster_v3_Cluster_EdsClusterConfig_service_name( - eds_cluster_config); - if (service_name.size != 0) { - cds_update.eds_service_name = UpbStringToStdString(service_name); - } - } else if (!XdsAggregateAndLogicalDnsClusterEnabled()) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": DiscoveryType is not valid.").c_str())); - resource_names_failed->insert(cluster_name); - continue; - } else if (envoy_config_cluster_v3_Cluster_type(cluster) == - envoy_config_cluster_v3_Cluster_LOGICAL_DNS) { - cds_update.cluster_type = XdsApi::CdsUpdate::ClusterType::LOGICAL_DNS; - const auto* load_assignment = - envoy_config_cluster_v3_Cluster_load_assignment(cluster); - if (load_assignment == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat( - cluster_name, - ": load_assignment not present for LOGICAL_DNS cluster") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - size_t num_localities; - const auto* const* localities = - envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints( - load_assignment, &num_localities); - if (num_localities != 1) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, - ": load_assignment for LOGICAL_DNS cluster must have " - "exactly one locality, found ", - num_localities) - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - size_t num_endpoints; - const auto* const* endpoints = - envoy_config_endpoint_v3_LocalityLbEndpoints_lb_endpoints( - localities[0], &num_endpoints); - if (num_endpoints != 1) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, - ": locality for LOGICAL_DNS cluster must have " - "exactly one endpoint, found ", - num_endpoints) - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - const auto* endpoint = - envoy_config_endpoint_v3_LbEndpoint_endpoint(endpoints[0]); - if (endpoint == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": LbEndpoint endpoint field not set") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - const auto* address = envoy_config_endpoint_v3_Endpoint_address(endpoint); - if (address == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": Endpoint address field not set") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - const auto* socket_address = - envoy_config_core_v3_Address_socket_address(address); - if (socket_address == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": Address socket_address field not set") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - if (envoy_config_core_v3_SocketAddress_resolver_name(socket_address) - .size != 0) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat( - cluster_name, - ": LOGICAL_DNS clusters must NOT have a custom resolver " - "name set") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - absl::string_view address_str = UpbStringToAbsl( - envoy_config_core_v3_SocketAddress_address(socket_address)); - if (address_str.empty()) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": SocketAddress address field not set") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - if (!envoy_config_core_v3_SocketAddress_has_port_value(socket_address)) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, - ": SocketAddress port_value field not set") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - cds_update.dns_hostname = JoinHostPort( - address_str, - envoy_config_core_v3_SocketAddress_port_value(socket_address)); + // Check the cluster_discovery_type. + if (!envoy_config_cluster_v3_Cluster_has_type(cluster) && + !envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) { + errors.push_back( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType not found.")); + } else if (envoy_config_cluster_v3_Cluster_type(cluster) == + envoy_config_cluster_v3_Cluster_EDS) { + cds_update->cluster_type = XdsApi::CdsUpdate::ClusterType::EDS; + // Check the EDS config source. + const envoy_config_cluster_v3_Cluster_EdsClusterConfig* eds_cluster_config = + envoy_config_cluster_v3_Cluster_eds_cluster_config(cluster); + const envoy_config_core_v3_ConfigSource* eds_config = + envoy_config_cluster_v3_Cluster_EdsClusterConfig_eds_config( + eds_cluster_config); + if (!envoy_config_core_v3_ConfigSource_has_ads(eds_config)) { + errors.push_back( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("EDS ConfigSource is not ADS.")); + } + // Record EDS service_name (if any). + upb_strview service_name = + envoy_config_cluster_v3_Cluster_EdsClusterConfig_service_name( + eds_cluster_config); + if (service_name.size != 0) { + cds_update->eds_service_name = UpbStringToStdString(service_name); + } + } else if (!XdsAggregateAndLogicalDnsClusterEnabled()) { + errors.push_back( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType is not valid.")); + } else if (envoy_config_cluster_v3_Cluster_type(cluster) == + envoy_config_cluster_v3_Cluster_LOGICAL_DNS) { + cds_update->cluster_type = XdsApi::CdsUpdate::ClusterType::LOGICAL_DNS; + grpc_error_handle error = CdsLogicalDnsParse(cluster, cds_update); + if (error != GRPC_ERROR_NONE) errors.push_back(error); + } else { + if (!envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) { + errors.push_back( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType is not valid.")); } else { - if (envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) { - const envoy_config_cluster_v3_Cluster_CustomClusterType* - custom_cluster_type = - envoy_config_cluster_v3_Cluster_cluster_type(cluster); - upb_strview type_name = - envoy_config_cluster_v3_Cluster_CustomClusterType_name( + const envoy_config_cluster_v3_Cluster_CustomClusterType* + custom_cluster_type = + envoy_config_cluster_v3_Cluster_cluster_type(cluster); + upb_strview type_name = + envoy_config_cluster_v3_Cluster_CustomClusterType_name( + custom_cluster_type); + if (UpbStringToAbsl(type_name) != "envoy.clusters.aggregate") { + errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "DiscoveryType is not valid.")); + } else { + cds_update->cluster_type = XdsApi::CdsUpdate::ClusterType::AGGREGATE; + // Retrieve aggregate clusters. + const google_protobuf_Any* typed_config = + envoy_config_cluster_v3_Cluster_CustomClusterType_typed_config( custom_cluster_type); - if (UpbStringToAbsl(type_name) == "envoy.clusters.aggregate") { - cds_update.cluster_type = XdsApi::CdsUpdate::ClusterType::AGGREGATE; - // Retrieve aggregate clusters. - const google_protobuf_Any* typed_config = - envoy_config_cluster_v3_Cluster_CustomClusterType_typed_config( - custom_cluster_type); - const upb_strview aggregate_cluster_config_upb_strview = - google_protobuf_Any_value(typed_config); - const envoy_extensions_clusters_aggregate_v3_ClusterConfig* - aggregate_cluster_config = - envoy_extensions_clusters_aggregate_v3_ClusterConfig_parse( - aggregate_cluster_config_upb_strview.data, - aggregate_cluster_config_upb_strview.size, context.arena); - if (aggregate_cluster_config == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": Can't parse aggregate cluster.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } + const upb_strview aggregate_cluster_config_upb_strview = + google_protobuf_Any_value(typed_config); + const envoy_extensions_clusters_aggregate_v3_ClusterConfig* + aggregate_cluster_config = + envoy_extensions_clusters_aggregate_v3_ClusterConfig_parse( + aggregate_cluster_config_upb_strview.data, + aggregate_cluster_config_upb_strview.size, context.arena); + if (aggregate_cluster_config == nullptr) { + errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Can't parse aggregate cluster.")); + } else { size_t size; const upb_strview* clusters = envoy_extensions_clusters_aggregate_v3_ClusterConfig_clusters( aggregate_cluster_config, &size); for (size_t i = 0; i < size; ++i) { const upb_strview cluster = clusters[i]; - cds_update.prioritized_cluster_names.emplace_back( + cds_update->prioritized_cluster_names.emplace_back( UpbStringToStdString(cluster)); } - } else { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": DiscoveryType is not valid.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; } - } else { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": DiscoveryType is not valid.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; } } - // Check the LB policy. - if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) == - envoy_config_cluster_v3_Cluster_ROUND_ROBIN) { - cds_update.lb_policy = "ROUND_ROBIN"; - } else if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) == - envoy_config_cluster_v3_Cluster_RING_HASH) { - cds_update.lb_policy = "RING_HASH"; - // Record ring hash lb config - auto* ring_hash_config = - envoy_config_cluster_v3_Cluster_ring_hash_lb_config(cluster); - if (ring_hash_config != nullptr) { - const google_protobuf_UInt64Value* max_ring_size = - envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size( - ring_hash_config); - if (max_ring_size != nullptr) { - cds_update.max_ring_size = - google_protobuf_UInt64Value_value(max_ring_size); - if (cds_update.max_ring_size > 8388608 || - cds_update.max_ring_size == 0) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat( - cluster_name, - ": max_ring_size is not in the range of 1 to 8388608.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } + } + // Check the LB policy. + if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) == + envoy_config_cluster_v3_Cluster_ROUND_ROBIN) { + cds_update->lb_policy = "ROUND_ROBIN"; + } else if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) == + envoy_config_cluster_v3_Cluster_RING_HASH) { + cds_update->lb_policy = "RING_HASH"; + // Record ring hash lb config + auto* ring_hash_config = + envoy_config_cluster_v3_Cluster_ring_hash_lb_config(cluster); + if (ring_hash_config != nullptr) { + const google_protobuf_UInt64Value* max_ring_size = + envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size( + ring_hash_config); + if (max_ring_size != nullptr) { + cds_update->max_ring_size = + google_protobuf_UInt64Value_value(max_ring_size); + if (cds_update->max_ring_size > 8388608 || + cds_update->max_ring_size == 0) { + errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "max_ring_size is not in the range of 1 to 8388608.")); } - const google_protobuf_UInt64Value* min_ring_size = - envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size( - ring_hash_config); - if (min_ring_size != nullptr) { - cds_update.min_ring_size = - google_protobuf_UInt64Value_value(min_ring_size); - if (cds_update.min_ring_size > 8388608 || - cds_update.min_ring_size == 0) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat( - cluster_name, - ": min_ring_size is not in the range of 1 to 8388608.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - if (cds_update.min_ring_size > cds_update.max_ring_size) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat( - cluster_name, - ": min_ring_size cannot be greater than max_ring_size.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } + } + const google_protobuf_UInt64Value* min_ring_size = + envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size( + ring_hash_config); + if (min_ring_size != nullptr) { + cds_update->min_ring_size = + google_protobuf_UInt64Value_value(min_ring_size); + if (cds_update->min_ring_size > 8388608 || + cds_update->min_ring_size == 0) { + errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "min_ring_size is not in the range of 1 to 8388608.")); } - if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function( - ring_hash_config) != - envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, - ": ring hash lb config has invalid hash function.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; + if (cds_update->min_ring_size > cds_update->max_ring_size) { + errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "min_ring_size cannot be greater than max_ring_size.")); } } - } else { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": LB policy is not supported.").c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - if (XdsSecurityEnabled()) { - auto* transport_socket = - envoy_config_cluster_v3_Cluster_transport_socket(cluster); - if (transport_socket != nullptr) { - grpc_error_handle error = UpstreamTlsContextParse( - context, transport_socket, &cds_update.common_tls_context); - if (error != GRPC_ERROR_NONE) { - errors.push_back(grpc_error_add_child( - GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat( - "Error parsing security configuration for cluster: ", - cluster_name) - .c_str()), - error)); - resource_names_failed->insert(cluster_name); - continue; - } + if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function( + ring_hash_config) != + envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) { + errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "ring hash lb config has invalid hash function.")); } } - // Record LRS server name (if any). - const envoy_config_core_v3_ConfigSource* lrs_server = - envoy_config_cluster_v3_Cluster_lrs_server(cluster); - if (lrs_server != nullptr) { - if (!envoy_config_core_v3_ConfigSource_has_self(lrs_server)) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, ": LRS ConfigSource is not self.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; + } else { + errors.push_back( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("LB policy is not supported.")); + } + if (XdsSecurityEnabled()) { + auto* transport_socket = + envoy_config_cluster_v3_Cluster_transport_socket(cluster); + if (transport_socket != nullptr) { + grpc_error_handle error = UpstreamTlsContextParse( + context, transport_socket, &cds_update->common_tls_context); + if (error != GRPC_ERROR_NONE) { + errors.push_back( + grpc_error_add_child(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Error parsing security configuration"), + error)); } - cds_update.lrs_load_reporting_server_name.emplace(""); } - // The Cluster resource encodes the circuit breaking parameters in a list of - // Thresholds messages, where each message specifies the parameters for a - // particular RoutingPriority. we will look only at the first entry in the - // list for priority DEFAULT and default to 1024 if not found. - if (envoy_config_cluster_v3_Cluster_has_circuit_breakers(cluster)) { - const envoy_config_cluster_v3_CircuitBreakers* circuit_breakers = - envoy_config_cluster_v3_Cluster_circuit_breakers(cluster); - size_t num_thresholds; - const envoy_config_cluster_v3_CircuitBreakers_Thresholds* const* - thresholds = envoy_config_cluster_v3_CircuitBreakers_thresholds( - circuit_breakers, &num_thresholds); - for (size_t i = 0; i < num_thresholds; ++i) { - const auto* threshold = thresholds[i]; - if (envoy_config_cluster_v3_CircuitBreakers_Thresholds_priority( - threshold) == envoy_config_core_v3_DEFAULT) { - const google_protobuf_UInt32Value* max_requests = - envoy_config_cluster_v3_CircuitBreakers_Thresholds_max_requests( - threshold); - if (max_requests != nullptr) { - cds_update.max_concurrent_requests = - google_protobuf_UInt32Value_value(max_requests); - } - break; + } + // Record LRS server name (if any). + const envoy_config_core_v3_ConfigSource* lrs_server = + envoy_config_cluster_v3_Cluster_lrs_server(cluster); + if (lrs_server != nullptr) { + if (!envoy_config_core_v3_ConfigSource_has_self(lrs_server)) { + errors.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + ": LRS ConfigSource is not self.")); + } + cds_update->lrs_load_reporting_server_name.emplace(""); + } + // The Cluster resource encodes the circuit breaking parameters in a list of + // Thresholds messages, where each message specifies the parameters for a + // particular RoutingPriority. we will look only at the first entry in the + // list for priority DEFAULT and default to 1024 if not found. + if (envoy_config_cluster_v3_Cluster_has_circuit_breakers(cluster)) { + const envoy_config_cluster_v3_CircuitBreakers* circuit_breakers = + envoy_config_cluster_v3_Cluster_circuit_breakers(cluster); + size_t num_thresholds; + const envoy_config_cluster_v3_CircuitBreakers_Thresholds* const* + thresholds = envoy_config_cluster_v3_CircuitBreakers_thresholds( + circuit_breakers, &num_thresholds); + for (size_t i = 0; i < num_thresholds; ++i) { + const auto* threshold = thresholds[i]; + if (envoy_config_cluster_v3_CircuitBreakers_Thresholds_priority( + threshold) == envoy_config_core_v3_DEFAULT) { + const google_protobuf_UInt32Value* max_requests = + envoy_config_cluster_v3_CircuitBreakers_Thresholds_max_requests( + threshold); + if (max_requests != nullptr) { + cds_update->max_concurrent_requests = + google_protobuf_UInt32Value_value(max_requests); } + break; } } } - return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing CDS response", &errors); + return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing CDS resource", &errors); } grpc_error_handle ServerAddressParseAndAppend( @@ -3480,12 +3235,79 @@ grpc_error_handle DropParseAndAppend( return GRPC_ERROR_NONE; } -grpc_error_handle EdsResponseParse( - const EncodingContext& context, +grpc_error_handle EdsResourceParse( + const EncodingContext& /*context*/, + const envoy_config_endpoint_v3_ClusterLoadAssignment* + cluster_load_assignment, + bool /*is_v2*/, XdsApi::EdsUpdate* eds_update) { + std::vector errors; + // Get the endpoints. + size_t locality_size; + const envoy_config_endpoint_v3_LocalityLbEndpoints* const* endpoints = + envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints( + cluster_load_assignment, &locality_size); + for (size_t j = 0; j < locality_size; ++j) { + size_t priority; + XdsApi::EdsUpdate::Priority::Locality locality; + grpc_error_handle error = LocalityParse(endpoints[j], &locality, &priority); + if (error != GRPC_ERROR_NONE) { + errors.push_back(error); + continue; + } + // Filter out locality with weight 0. + if (locality.lb_weight == 0) continue; + // Make sure prorities is big enough. Note that they might not + // arrive in priority order. + while (eds_update->priorities.size() < priority + 1) { + eds_update->priorities.emplace_back(); + } + eds_update->priorities[priority].localities.emplace(locality.name.get(), + std::move(locality)); + } + for (const auto& priority : eds_update->priorities) { + if (priority.localities.empty()) { + errors.push_back( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("sparse priority list")); + } + } + // Get the drop config. + eds_update->drop_config = MakeRefCounted(); + const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy* policy = + envoy_config_endpoint_v3_ClusterLoadAssignment_policy( + cluster_load_assignment); + if (policy != nullptr) { + size_t drop_size; + const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload* const* + drop_overload = + envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_drop_overloads( + policy, &drop_size); + for (size_t j = 0; j < drop_size; ++j) { + grpc_error_handle error = + DropParseAndAppend(drop_overload[j], eds_update->drop_config.get()); + if (error != GRPC_ERROR_NONE) { + errors.push_back( + grpc_error_add_child(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "drop config validation error"), + error)); + } + } + } + return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing EDS resource", &errors); +} + +template +grpc_error_handle AdsResponseParse( + const EncodingContext& context, ProtoParseFunction proto_parse_function, + ProtoResourceNameFunction proto_resource_name_function, + ResourceTypeSelectorFunction resource_type_selector_function, + ProtoLogFunction proto_log_function, + ResourceParseFunction resource_parse_function, const envoy_service_discovery_v3_DiscoveryResponse* response, - const std::set& expected_eds_service_names, - XdsApi::EdsUpdateMap* eds_update_map, - std::set* resource_names_failed) { + const char* resource_type_string, + const std::set& expected_resource_names, + UpdateMap* update_map, std::set* resource_names_failed) { std::vector errors; // Get the resources from the response. size_t size; @@ -3495,115 +3317,60 @@ grpc_error_handle EdsResponseParse( // Check the type_url of the resource. absl::string_view type_url = UpbStringToAbsl(google_protobuf_Any_type_url(resources[i])); - if (!IsEds(type_url)) { + bool is_v2 = false; + if (!resource_type_selector_function(type_url, &is_v2)) { errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("resource index ", i, ": Resource is not EDS.") + absl::StrCat("resource index ", i, ": Resource is not ", + resource_type_string, ".") .c_str())); continue; } - // Get the cluster_load_assignment. - upb_strview encoded_cluster_load_assignment = - google_protobuf_Any_value(resources[i]); - envoy_config_endpoint_v3_ClusterLoadAssignment* cluster_load_assignment = - envoy_config_endpoint_v3_ClusterLoadAssignment_parse( - encoded_cluster_load_assignment.data, - encoded_cluster_load_assignment.size, context.arena); - if (cluster_load_assignment == nullptr) { + // Parse the resource. + upb_strview serialized_resource = google_protobuf_Any_value(resources[i]); + auto* resource = proto_parse_function( + serialized_resource.data, serialized_resource.size, context.arena); + if (resource == nullptr) { errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("resource index ", i, - ": Can't parse cluster_load_assignment.") + absl::StrCat("resource index ", i, ": Can't parse ", + resource_type_string, " resource.") .c_str())); continue; } - MaybeLogClusterLoadAssignment(context, cluster_load_assignment); - // Check the EDS service name. Ignore unexpected names. - std::string eds_service_name = UpbStringToStdString( - envoy_config_endpoint_v3_ClusterLoadAssignment_cluster_name( - cluster_load_assignment)); - if (expected_eds_service_names.find(eds_service_name) == - expected_eds_service_names.end()) { + proto_log_function(context, resource); + // Check the resource name. Ignore unexpected names. + std::string resource_name = + UpbStringToStdString(proto_resource_name_function(resource)); + if (expected_resource_names.find(resource_name) == + expected_resource_names.end()) { continue; } // Fail on duplicate resources. - if (eds_update_map->find(eds_service_name) != eds_update_map->end()) { + if (update_map->find(resource_name) != update_map->end()) { errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat("duplicate resource name \"", eds_service_name, "\"") + absl::StrCat("duplicate resource name \"", resource_name, "\"") .c_str())); - resource_names_failed->insert(eds_service_name); + resource_names_failed->insert(resource_name); continue; } - // Serialize into JSON and store it in the EdsUpdateMap - XdsApi::EdsResourceData& eds_resource_data = - (*eds_update_map)[eds_service_name]; - XdsApi::EdsUpdate& eds_update = eds_resource_data.resource; - eds_resource_data.serialized_proto = - UpbStringToStdString(encoded_cluster_load_assignment); - // Get the endpoints. - size_t locality_size; - const envoy_config_endpoint_v3_LocalityLbEndpoints* const* endpoints = - envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints( - cluster_load_assignment, &locality_size); - grpc_error_handle error = GRPC_ERROR_NONE; - for (size_t j = 0; j < locality_size; ++j) { - size_t priority; - XdsApi::EdsUpdate::Priority::Locality locality; - error = LocalityParse(endpoints[j], &locality, &priority); - if (error != GRPC_ERROR_NONE) break; - // Filter out locality with weight 0. - if (locality.lb_weight == 0) continue; - // Make sure prorities is big enough. Note that they might not - // arrive in priority order. - while (eds_update.priorities.size() < priority + 1) { - eds_update.priorities.emplace_back(); - } - eds_update.priorities[priority].localities.emplace(locality.name.get(), - std::move(locality)); - } + // Validate resource. + decltype(UpdateMap::mapped_type::resource) update; + grpc_error_handle error = + resource_parse_function(context, resource, is_v2, &update); if (error != GRPC_ERROR_NONE) { errors.push_back(grpc_error_add_child( GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(eds_service_name, ": locality validation error") - .c_str()), + absl::StrCat(resource_name, ": validation error").c_str()), error)); - resource_names_failed->insert(eds_service_name); - continue; - } - for (const auto& priority : eds_update.priorities) { - if (priority.localities.empty()) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(eds_service_name, ": sparse priority list").c_str())); - resource_names_failed->insert(eds_service_name); - continue; - } - } - // Get the drop config. - eds_update.drop_config = MakeRefCounted(); - const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy* policy = - envoy_config_endpoint_v3_ClusterLoadAssignment_policy( - cluster_load_assignment); - if (policy != nullptr) { - size_t drop_size; - const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload* const* - drop_overload = - envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_drop_overloads( - policy, &drop_size); - for (size_t j = 0; j < drop_size; ++j) { - error = - DropParseAndAppend(drop_overload[j], eds_update.drop_config.get()); - if (error != GRPC_ERROR_NONE) break; - } - if (error != GRPC_ERROR_NONE) { - errors.push_back(grpc_error_add_child( - GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(eds_service_name, ": drop config validation error") - .c_str()), - error)); - resource_names_failed->insert(eds_service_name); - continue; - } + resource_names_failed->insert(resource_name); + } else { + // Store result in update map, in both validated and serialized form. + auto& resource_data = (*update_map)[resource_name]; + resource_data.resource = std::move(update); + resource_data.serialized_proto = + UpbStringToStdString(serialized_resource); } } - return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing EDS response", &errors); + return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing ADS response", &errors); } std::string TypeUrlInternalToExternal(absl::string_view type_url) { @@ -3619,6 +3386,27 @@ std::string TypeUrlInternalToExternal(absl::string_view type_url) { return std::string(type_url); } +upb_strview LdsResourceName( + const envoy_config_listener_v3_Listener* lds_resource) { + return envoy_config_listener_v3_Listener_name(lds_resource); +} + +upb_strview RdsResourceName( + const envoy_config_route_v3_RouteConfiguration* rds_resource) { + return envoy_config_route_v3_RouteConfiguration_name(rds_resource); +} + +upb_strview CdsResourceName( + const envoy_config_cluster_v3_Cluster* cds_resource) { + return envoy_config_cluster_v3_Cluster_name(cds_resource); +} + +upb_strview EdsResourceName( + const envoy_config_endpoint_v3_ClusterLoadAssignment* eds_resource) { + return envoy_config_endpoint_v3_ClusterLoadAssignment_cluster_name( + eds_resource); +} + template void MoveUpdatesToFailedSet(UpdateMap* update_map, std::set* resource_names_failed) { @@ -3664,34 +3452,45 @@ XdsApi::AdsParseResult XdsApi::ParseAdsResponse( result.nonce = UpbStringToStdString( envoy_service_discovery_v3_DiscoveryResponse_nonce(response)); // Parse the response according to the resource type. + // TODO(roth): When we have time, consider defining an interface for the + // methods of each resource type, so that we don't have to pass + // individual functions into each call to AdsResponseParse(). if (IsLds(result.type_url)) { - result.parse_error = - LdsResponseParse(context, response, expected_listener_names, - &result.lds_update_map, &result.resource_names_failed); + result.parse_error = AdsResponseParse( + context, envoy_config_listener_v3_Listener_parse, LdsResourceName, + IsLds, MaybeLogListener, LdsResourceParse, response, "LDS", + expected_listener_names, &result.lds_update_map, + &result.resource_names_failed); if (result.parse_error != GRPC_ERROR_NONE) { MoveUpdatesToFailedSet(&result.lds_update_map, &result.resource_names_failed); } } else if (IsRds(result.type_url)) { - result.parse_error = - RdsResponseParse(context, response, expected_route_configuration_names, - &result.rds_update_map, &result.resource_names_failed); + result.parse_error = AdsResponseParse( + context, envoy_config_route_v3_RouteConfiguration_parse, + RdsResourceName, IsRds, MaybeLogRouteConfiguration, RouteConfigParse, + response, "RDS", expected_route_configuration_names, + &result.rds_update_map, &result.resource_names_failed); if (result.parse_error != GRPC_ERROR_NONE) { MoveUpdatesToFailedSet(&result.rds_update_map, &result.resource_names_failed); } } else if (IsCds(result.type_url)) { - result.parse_error = - CdsResponseParse(context, response, expected_cluster_names, - &result.cds_update_map, &result.resource_names_failed); + result.parse_error = AdsResponseParse( + context, envoy_config_cluster_v3_Cluster_parse, CdsResourceName, IsCds, + MaybeLogCluster, CdsResourceParse, response, "CDS", + expected_cluster_names, &result.cds_update_map, + &result.resource_names_failed); if (result.parse_error != GRPC_ERROR_NONE) { MoveUpdatesToFailedSet(&result.cds_update_map, &result.resource_names_failed); } } else if (IsEds(result.type_url)) { - result.parse_error = - EdsResponseParse(context, response, expected_eds_service_names, - &result.eds_update_map, &result.resource_names_failed); + result.parse_error = AdsResponseParse( + context, envoy_config_endpoint_v3_ClusterLoadAssignment_parse, + EdsResourceName, IsEds, MaybeLogClusterLoadAssignment, EdsResourceParse, + response, "EDS", expected_eds_service_names, &result.eds_update_map, + &result.resource_names_failed); if (result.parse_error != GRPC_ERROR_NONE) { MoveUpdatesToFailedSet(&result.eds_update_map, &result.resource_names_failed); diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index ccf2b3e9c56..345c1a04be4 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -3417,16 +3417,14 @@ TEST_P(GlobalXdsClientTest, MultipleBadResources) { const auto response_state = balancers_[0]->ads_service()->lds_response_state(); return response_state.state != AdsServiceImpl::ResponseState::NACKED || - !absl::StrContains( - response_state.error_message, - absl::StrCat( - kServerName, - ": Listener has neither address nor ApiListener")) || - !absl::StrContains( - response_state.error_message, - absl::StrCat( - kServerName2, - ": Listener has neither address nor ApiListener")); + ::testing::Matches(::testing::ContainsRegex(absl::StrCat( + kServerName, + ": validation error.*" + "Listener has neither address nor ApiListener.*", + kServerName2, + ": validation error.*" + "Listener has neither address nor ApiListener")))( + response_state.error_message); })); ASSERT_FALSE(timed_out); } @@ -7255,12 +7253,14 @@ TEST_P(CdsTest, MultipleBadResources) { const auto response_state = balancers_[0]->ads_service()->cds_response_state(); EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); - EXPECT_THAT(response_state.error_message, - ::testing::AllOf( - ::testing::HasSubstr(absl::StrCat( - kDefaultClusterName, ": DiscoveryType is not valid.")), - ::testing::HasSubstr(absl::StrCat( - kClusterName2, ": DiscoveryType is not valid.")))); + EXPECT_THAT( + response_state.error_message, + ::testing::ContainsRegex(absl::StrCat(kDefaultClusterName, + ": validation error.*" + "DiscoveryType is not valid.*", + kClusterName2, + ": validation error.*" + "DiscoveryType is not valid"))); } // Tests that CDS client should send a NACK if the eds_config in CDS response @@ -12904,7 +12904,6 @@ TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpEndpointError) { CheckRpcSendOk(); for (int o = 0; o < kFetchConfigRetries; o++) { auto csds_response = FetchCsdsResponse(); - // Check if error state is propagated bool ok = ::testing::Value( csds_response.config(0).xds_config(),