From e7536952a38fef8f281e8e2f9562103fb438a16e Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 10 Mar 2021 21:30:37 -0800 Subject: [PATCH] Filters parsing logic for servers (#25609) * Filters parsing logic for servers --- .../resolver/xds/xds_resolver.cc | 29 +- src/core/ext/xds/xds_api.cc | 260 +++++++++++------- src/core/ext/xds/xds_api.h | 61 ++-- src/core/ext/xds/xds_client.cc | 5 +- src/core/ext/xds/xds_http_fault_filter.h | 4 + src/core/ext/xds/xds_http_filters.cc | 4 + src/core/ext/xds/xds_http_filters.h | 6 + src/proto/grpc/testing/xds/v3/listener.proto | 21 ++ test/cpp/end2end/xds_end2end_test.cc | 246 ++++++++++++++++- 9 files changed, 505 insertions(+), 131 deletions(-) diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index db579d0e22a..32bac37e948 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -234,7 +234,7 @@ XdsResolver::Notifier::Notifier(RefCountedPtr resolver, XdsResolver::Notifier::Notifier(RefCountedPtr resolver, XdsApi::RdsUpdate update) : resolver_(std::move(resolver)), type_(kRdsUpdate) { - update_.rds_update = std::move(update); + update_.http_connection_manager.rds_update = std::move(update); GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); } @@ -270,7 +270,8 @@ void XdsResolver::Notifier::RunInWorkSerializer(grpc_error* error) { resolver_->OnListenerUpdate(std::move(update_)); break; case kRdsUpdate: - resolver_->OnRouteConfigUpdate(std::move(*update_.rds_update)); + resolver_->OnRouteConfigUpdate( + std::move(*update_.http_connection_manager.rds_update)); break; case kError: resolver_->OnError(error); @@ -338,7 +339,8 @@ XdsResolver::XdsConfigSelector::XdsConfigSelector( // one. if (!route.max_stream_duration.has_value()) { route_entry.route.max_stream_duration = - resolver_->current_listener_.http_max_stream_duration; + resolver_->current_listener_.http_connection_manager + .http_max_stream_duration; } if (route.weighted_clusters.empty()) { *error = CreateMethodConfig(route_entry.route, nullptr, @@ -363,7 +365,8 @@ XdsResolver::XdsConfigSelector::XdsConfigSelector( // Populate filter list. if (XdsFaultInjectionEnabled()) { bool found_router = false; - for (const auto& http_filter : resolver_->current_listener_.http_filters) { + for (const auto& http_filter : + resolver_->current_listener_.http_connection_manager.http_filters) { // Stop at the router filter. It's a no-op for us, and we ignore // anything that may come after it, for compatibility with Envoy. if (http_filter.config.config_proto_type_name == @@ -437,7 +440,8 @@ grpc_error* XdsResolver::XdsConfigSelector::CreateMethodConfig( // Handle xDS HTTP filters. std::map> per_filter_configs; grpc_channel_args* args = grpc_channel_args_copy(resolver_->args_); - for (const auto& http_filter : resolver_->current_listener_.http_filters) { + for (const auto& http_filter : + resolver_->current_listener_.http_connection_manager.http_filters) { // Stop at the router filter. It's a no-op for us, and we ignore // anything that may come after it, for compatibility with Envoy. if (http_filter.config.config_proto_type_name == @@ -700,14 +704,17 @@ void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this); } - if (listener.route_config_name != route_config_name_) { + if (listener.http_connection_manager.route_config_name != + route_config_name_) { if (route_config_watcher_ != nullptr) { xds_client_->CancelRouteConfigDataWatch( route_config_name_, route_config_watcher_, - /*delay_unsubscription=*/!listener.route_config_name.empty()); + /*delay_unsubscription=*/ + !listener.http_connection_manager.route_config_name.empty()); route_config_watcher_ = nullptr; } - route_config_name_ = std::move(listener.route_config_name); + route_config_name_ = + std::move(listener.http_connection_manager.route_config_name); if (!route_config_name_.empty()) { current_virtual_host_.routes.clear(); auto watcher = absl::make_unique(Ref()); @@ -717,8 +724,10 @@ void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) { } current_listener_ = std::move(listener); if (route_config_name_.empty()) { - GPR_ASSERT(current_listener_.rds_update.has_value()); - OnRouteConfigUpdate(std::move(*current_listener_.rds_update)); + GPR_ASSERT( + current_listener_.http_connection_manager.rds_update.has_value()); + OnRouteConfigUpdate( + std::move(*current_listener_.http_connection_manager.rds_update)); } else { // HCM may contain newer filter config. We need to propagate the update as // config selector to the channel diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc index b21b5d9ccb5..f9a59aa2a3c 100644 --- a/src/core/ext/xds/xds_api.cc +++ b/src/core/ext/xds/xds_api.cc @@ -439,11 +439,38 @@ bool XdsApi::DownstreamTlsContext::Empty() const { return common_tls_context.Empty(); } +// +// XdsApi::LdsUpdate::HttpConnectionManager +// + +std::string XdsApi::LdsUpdate::HttpConnectionManager::ToString() const { + absl::InlinedVector contents; + contents.push_back(absl::StrFormat( + "route_config_name=%s", + !route_config_name.empty() ? route_config_name.c_str() : "")); + contents.push_back(absl::StrFormat("http_max_stream_duration=%s", + http_max_stream_duration.ToString())); + if (rds_update.has_value()) { + contents.push_back( + absl::StrFormat("rds_update=%s", rds_update->ToString())); + } + if (!http_filters.empty()) { + std::vector filter_strings; + for (const auto& http_filter : http_filters) { + filter_strings.push_back(http_filter.ToString()); + } + contents.push_back(absl::StrCat("http_filters=[", + absl::StrJoin(filter_strings, ", "), "]")); + } + return absl::StrCat("{", absl::StrJoin(contents, ", "), "}"); +} + // // XdsApi::LdsUpdate::HttpFilter // -std::string XdsApi::LdsUpdate::HttpFilter::ToString() const { +std::string XdsApi::LdsUpdate::HttpConnectionManager::HttpFilter::ToString() + const { return absl::StrCat("{name=", name, ", config=", config.ToString(), "}"); } @@ -514,9 +541,11 @@ std::string XdsApi::LdsUpdate::FilterChain::FilterChainMatch::ToString() const { // std::string XdsApi::LdsUpdate::FilterChain::ToString() const { - return absl::StrFormat("{filter_chain_match=%s, downstream_tls_context=%s}", - filter_chain_match.ToString(), - downstream_tls_context.ToString()); + return absl::StrFormat( + "{filter_chain_match=%s, downstream_tls_context=%s, " + "http_connection_manager=%s}", + filter_chain_match.ToString(), downstream_tls_context.ToString(), + http_connection_manager.ToString()); } // @@ -538,23 +567,8 @@ std::string XdsApi::LdsUpdate::ToString() const { default_filter_chain->ToString())); } } else if (type == ListenerType::kHttpApiListener) { - contents.push_back(absl::StrFormat( - "route_config_name=%s", - !route_config_name.empty() ? route_config_name.c_str() : "")); - contents.push_back(absl::StrFormat("http_max_stream_duration=%s", - http_max_stream_duration.ToString())); - if (rds_update.has_value()) { - contents.push_back( - absl::StrFormat("rds_update=%s", rds_update->ToString())); - } - } - if (!http_filters.empty()) { - std::vector filter_strings; - for (const auto& http_filter : http_filters) { - filter_strings.push_back(http_filter.ToString()); - } - contents.push_back(absl::StrCat("http_filters=[", - absl::StrJoin(filter_strings, ", "), "]")); + contents.push_back(absl::StrFormat("http_connection_manager=%s", + http_connection_manager.ToString())); } return absl::StrCat("{", absl::StrJoin(contents, ", "), "}"); } @@ -1646,33 +1660,24 @@ grpc_error* CommonTlsContextParse( return GRPC_ERROR_NONE; } -grpc_error* LdsResponseParseClient( - const EncodingContext& context, - const envoy_config_listener_v3_ApiListener* api_listener, - XdsApi::LdsUpdate* lds_update) { - lds_update->type = XdsApi::LdsUpdate::ListenerType::kHttpApiListener; - const upb_strview encoded_api_listener = google_protobuf_Any_value( - envoy_config_listener_v3_ApiListener_api_listener(api_listener)); - const auto* http_connection_manager = - envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_parse( - encoded_api_listener.data, encoded_api_listener.size, context.arena); - if (http_connection_manager == nullptr) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Could not parse HttpConnectionManager config from ApiListener"); - } - MaybeLogHttpConnectionManager(context, http_connection_manager); +grpc_error* HttpConnectionManagerParse( + bool is_client, const EncodingContext& context, + const envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager* + http_connection_manager_proto, + XdsApi::LdsUpdate::HttpConnectionManager* http_connection_manager) { + MaybeLogHttpConnectionManager(context, http_connection_manager_proto); if (XdsTimeoutEnabled()) { // Obtain max_stream_duration from Http Protocol Options. const envoy_config_core_v3_HttpProtocolOptions* options = envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_common_http_protocol_options( - http_connection_manager); + http_connection_manager_proto); if (options != nullptr) { const google_protobuf_Duration* duration = envoy_config_core_v3_HttpProtocolOptions_max_stream_duration(options); if (duration != nullptr) { - lds_update->http_max_stream_duration.seconds = + http_connection_manager->http_max_stream_duration.seconds = google_protobuf_Duration_seconds(duration); - lds_update->http_max_stream_duration.nanos = + http_connection_manager->http_max_stream_duration.nanos = google_protobuf_Duration_nanos(duration); } } @@ -1682,7 +1687,7 @@ grpc_error* LdsResponseParseClient( size_t num_filters = 0; const auto* http_filters = envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_http_filters( - http_connection_manager, &num_filters); + http_connection_manager_proto, &num_filters); std::set names_seen; for (size_t i = 0; i < num_filters; ++i) { const auto* http_filter = http_filters[i]; @@ -1721,6 +1726,14 @@ grpc_error* LdsResponseParseClient( absl::StrCat("no filter registered for config type ", filter_type) .c_str()); } + if ((is_client && !filter_impl->IsSupportedOnClients()) || + (!is_client && !filter_impl->IsSupportedOnServers())) { + if (is_optional) continue; + return GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrFormat("Filter %s is not supported on %s", filter_type, + is_client ? "clients" : "servers") + .c_str()); + } absl::StatusOr filter_config = filter_impl->GenerateFilterConfig(google_protobuf_Any_value(any), context.arena); @@ -1731,47 +1744,69 @@ grpc_error* LdsResponseParseClient( " failed to parse: ", filter_config.status().ToString()) .c_str()); } - lds_update->http_filters.emplace_back(XdsApi::LdsUpdate::HttpFilter{ - std::string(name), std::move(*filter_config)}); + http_connection_manager->http_filters.emplace_back( + XdsApi::LdsUpdate::HttpConnectionManager::HttpFilter{ + std::string(name), std::move(*filter_config)}); + } + } + if (is_client) { + // Found inlined route_config. Parse it to find the cluster_name. + if (envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_has_route_config( + http_connection_manager_proto)) { + const envoy_config_route_v3_RouteConfiguration* route_config = + envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_route_config( + http_connection_manager_proto); + XdsApi::RdsUpdate rds_update; + grpc_error* error = RouteConfigParse(context, route_config, &rds_update); + if (error != GRPC_ERROR_NONE) return error; + http_connection_manager->rds_update = std::move(rds_update); + return GRPC_ERROR_NONE; } + // Validate that RDS must be used to get the route_config dynamically. + const envoy_extensions_filters_network_http_connection_manager_v3_Rds* rds = + envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_rds( + http_connection_manager_proto); + if (rds == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "HttpConnectionManager neither has inlined route_config nor RDS."); + } + // Check that the ConfigSource specifies ADS. + const envoy_config_core_v3_ConfigSource* config_source = + envoy_extensions_filters_network_http_connection_manager_v3_Rds_config_source( + rds); + if (config_source == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "HttpConnectionManager missing config_source for RDS."); + } + if (!envoy_config_core_v3_ConfigSource_has_ads(config_source)) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "HttpConnectionManager ConfigSource for RDS does not specify ADS."); + } + // Get the route_config_name. + http_connection_manager->route_config_name = UpbStringToStdString( + envoy_extensions_filters_network_http_connection_manager_v3_Rds_route_config_name( + rds)); } - // Found inlined route_config. Parse it to find the cluster_name. - if (envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_has_route_config( - http_connection_manager)) { - const envoy_config_route_v3_RouteConfiguration* route_config = - envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_route_config( - http_connection_manager); - XdsApi::RdsUpdate rds_update; - grpc_error* error = RouteConfigParse(context, route_config, &rds_update); - if (error != GRPC_ERROR_NONE) return error; - lds_update->rds_update = std::move(rds_update); - return GRPC_ERROR_NONE; - } - // Validate that RDS must be used to get the route_config dynamically. - const envoy_extensions_filters_network_http_connection_manager_v3_Rds* rds = - envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_rds( - http_connection_manager); - if (rds == nullptr) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "HttpConnectionManager neither has inlined route_config nor RDS."); - } - // Check that the ConfigSource specifies ADS. - const envoy_config_core_v3_ConfigSource* config_source = - envoy_extensions_filters_network_http_connection_manager_v3_Rds_config_source( - rds); - if (config_source == nullptr) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "HttpConnectionManager missing config_source for RDS."); - } - if (!envoy_config_core_v3_ConfigSource_has_ads(config_source)) { + return GRPC_ERROR_NONE; +} + +grpc_error* LdsResponseParseClient( + const EncodingContext& context, + const envoy_config_listener_v3_ApiListener* api_listener, + XdsApi::LdsUpdate* lds_update) { + lds_update->type = XdsApi::LdsUpdate::ListenerType::kHttpApiListener; + const upb_strview encoded_api_listener = google_protobuf_Any_value( + envoy_config_listener_v3_ApiListener_api_listener(api_listener)); + const auto* http_connection_manager = + envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_parse( + encoded_api_listener.data, encoded_api_listener.size, context.arena); + if (http_connection_manager == nullptr) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "HttpConnectionManager ConfigSource for RDS does not specify ADS."); + "Could not parse HttpConnectionManager config from ApiListener"); } - // Get the route_config_name. - lds_update->route_config_name = UpbStringToStdString( - envoy_extensions_filters_network_http_connection_manager_v3_Rds_route_config_name( - rds)); - return GRPC_ERROR_NONE; + return HttpConnectionManagerParse(true /* is_client */, context, + http_connection_manager, + &lds_update->http_connection_manager); } XdsApi::LdsUpdate::FilterChain::FilterChainMatch::CidrRange CidrRangeParse( @@ -1889,28 +1924,67 @@ grpc_error* DownstreamTlsContextParse( return GRPC_ERROR_NONE; } -XdsApi::LdsUpdate::FilterChain FilterChainParse( +grpc_error* FilterChainParse( const EncodingContext& context, const envoy_config_listener_v3_FilterChain* filter_chain_proto, - grpc_error** error) { - XdsApi::LdsUpdate::FilterChain filter_chain; + XdsApi::LdsUpdate::FilterChain* filter_chain) { + grpc_error* error = GRPC_ERROR_NONE; auto* filter_chain_match = envoy_config_listener_v3_FilterChain_filter_chain_match( filter_chain_proto); if (filter_chain_match != nullptr) { - filter_chain.filter_chain_match = FilterChainMatchParse(filter_chain_match); + filter_chain->filter_chain_match = + FilterChainMatchParse(filter_chain_match); + } + // Parse the filters list. Currently we only support HttpConnectionManager. + size_t size = 0; + auto* filters = + envoy_config_listener_v3_FilterChain_filters(filter_chain_proto, &size); + if (size != 1) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "FilterChain should have exactly one filter: HttpConnectionManager; no " + "other filter is supported at the moment"); + } + auto* typed_config = envoy_config_listener_v3_Filter_typed_config(filters[0]); + if (typed_config == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "No typed_config found in filter."); + } + absl::string_view type_url = + UpbStringToAbsl(google_protobuf_Any_type_url(typed_config)); + if (type_url != + "type.googleapis.com/" + "envoy.extensions.filters.network.http_connection_manager.v3." + "HttpConnectionManager") { + return GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("Unsupported filter type ", type_url).c_str()); } + const upb_strview encoded_http_connection_manager = + google_protobuf_Any_value(typed_config); + const auto* http_connection_manager = + envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_parse( + encoded_http_connection_manager.data, + encoded_http_connection_manager.size, context.arena); + if (http_connection_manager == nullptr) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Could not parse HttpConnectionManager config from filter " + "typed_config"); + } + error = HttpConnectionManagerParse(false /* is_client */, context, + http_connection_manager, + &filter_chain->http_connection_manager); + if (error != GRPC_ERROR_NONE) return error; // Get the DownstreamTlsContext for the filter chain if (XdsSecurityEnabled()) { auto* transport_socket = envoy_config_listener_v3_FilterChain_transport_socket( filter_chain_proto); if (transport_socket != nullptr) { - *error = DownstreamTlsContextParse(context, transport_socket, - &filter_chain.downstream_tls_context); + error = DownstreamTlsContextParse(context, transport_socket, + &filter_chain->downstream_tls_context); } } - return filter_chain; + return error; } grpc_error* AddressParse(const envoy_config_core_v3_Address* address_proto, @@ -1960,20 +2034,18 @@ grpc_error* LdsResponseParseServer( } lds_update->filter_chains.reserve(size); for (size_t i = 0; i < size; i++) { - lds_update->filter_chains.push_back( - FilterChainParse(context, filter_chains[0], &error)); - if (error != GRPC_ERROR_NONE) { - return error; - } + XdsApi::LdsUpdate::FilterChain filter_chain; + error = FilterChainParse(context, filter_chains[0], &filter_chain); + if (error != GRPC_ERROR_NONE) return error; + lds_update->filter_chains.push_back(std::move(filter_chain)); } auto* default_filter_chain = envoy_config_listener_v3_Listener_default_filter_chain(listener); if (default_filter_chain != nullptr) { - lds_update->default_filter_chain = - FilterChainParse(context, default_filter_chain, &error); - if (error != GRPC_ERROR_NONE) { - return error; - } + XdsApi::LdsUpdate::FilterChain filter_chain; + error = FilterChainParse(context, default_filter_chain, &filter_chain); + if (error != GRPC_ERROR_NONE) return error; + lds_update->default_filter_chain = std::move(filter_chain); } if (size == 0 && default_filter_chain == nullptr) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No filter chain provided."); diff --git a/src/core/ext/xds/xds_api.h b/src/core/ext/xds/xds_api.h index eb1d9435433..f936c192ec2 100644 --- a/src/core/ext/xds/xds_api.h +++ b/src/core/ext/xds/xds_api.h @@ -219,28 +219,39 @@ class XdsApi { kTcpListener = 0, kHttpApiListener, } type; - // The name to use in the RDS request. - std::string route_config_name; - // Storing the Http Connection Manager Common Http Protocol Option - // max_stream_duration - Duration http_max_stream_duration; - // The RouteConfiguration to use for this listener. - // Present only if it is inlined in the LDS response. - absl::optional rds_update; - - struct HttpFilter { - std::string name; - XdsHttpFilterImpl::FilterConfig config; - bool operator==(const HttpFilter& other) const { - return name == other.name && config == other.config; + struct HttpConnectionManager { + // The name to use in the RDS request. + std::string route_config_name; + // Storing the Http Connection Manager Common Http Protocol Option + // max_stream_duration + Duration http_max_stream_duration; + // The RouteConfiguration to use for this listener. + // Present only if it is inlined in the LDS response. + absl::optional rds_update; + + struct HttpFilter { + std::string name; + XdsHttpFilterImpl::FilterConfig config; + + bool operator==(const HttpFilter& other) const { + return name == other.name && config == other.config; + } + + std::string ToString() const; + }; + std::vector http_filters; + + bool operator==(const HttpConnectionManager& other) const { + return route_config_name == other.route_config_name && + http_max_stream_duration == other.http_max_stream_duration && + rds_update == other.rds_update && + http_filters == other.http_filters; } std::string ToString() const; }; - std::vector http_filters; - struct FilterChain { struct FilterChainMatch { uint32_t destination_port = 0; @@ -287,25 +298,31 @@ class XdsApi { DownstreamTlsContext downstream_tls_context; + // This is in principle the filter list. + // We currently require exactly one filter, which is the HCM. + HttpConnectionManager http_connection_manager; + bool operator==(const FilterChain& other) const { return filter_chain_match == other.filter_chain_match && - downstream_tls_context == other.downstream_tls_context; + downstream_tls_context == other.downstream_tls_context && + http_connection_manager == other.http_connection_manager; } std::string ToString() const; }; + // Populated for type=kHttpApiListener. + HttpConnectionManager http_connection_manager; + + // Populated for type=kTcpListener. // host:port listening_address set when type is kTcpListener std::string address; std::vector filter_chains; absl::optional default_filter_chain; bool operator==(const LdsUpdate& other) const { - return route_config_name == other.route_config_name && - rds_update == other.rds_update && - http_max_stream_duration == other.http_max_stream_duration && - http_filters == other.http_filters && address == other.address && - filter_chains == other.filter_chains && + return http_connection_manager == other.http_connection_manager && + address == other.address && filter_chains == other.filter_chains && default_filter_chain == other.default_filter_chain; } diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index fddf9f201b2..52b0f22250f 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -887,8 +887,9 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( listener_name.c_str(), lds_update.ToString().c_str()); } // Record the RDS resource names seen. - if (!lds_update.route_config_name.empty()) { - rds_resource_names_seen.insert(lds_update.route_config_name); + if (!lds_update.http_connection_manager.route_config_name.empty()) { + rds_resource_names_seen.insert( + lds_update.http_connection_manager.route_config_name); } // Ignore identical update. ListenerState& listener_state = xds_client()->listener_map_[listener_name]; diff --git a/src/core/ext/xds/xds_http_fault_filter.h b/src/core/ext/xds/xds_http_fault_filter.h index 61be7612f52..60c49524328 100644 --- a/src/core/ext/xds/xds_http_fault_filter.h +++ b/src/core/ext/xds/xds_http_fault_filter.h @@ -52,6 +52,10 @@ class XdsHttpFaultFilter : public XdsHttpFilterImpl { absl::StatusOr GenerateServiceConfig( const FilterConfig& hcm_filter_config, const FilterConfig* filter_config_override) const override; + + bool IsSupportedOnClients() const override { return true; } + + bool IsSupportedOnServers() const override { return false; } }; } // namespace grpc_core diff --git a/src/core/ext/xds/xds_http_filters.cc b/src/core/ext/xds/xds_http_filters.cc index 9acbd9b6d14..9bd4858b2cb 100644 --- a/src/core/ext/xds/xds_http_filters.cc +++ b/src/core/ext/xds/xds_http_filters.cc @@ -61,6 +61,10 @@ class XdsHttpRouterFilter : public XdsHttpFilterImpl { const FilterConfig* /*filter_config_override*/) const override { return absl::UnimplementedError("router filter should never be called"); } + + bool IsSupportedOnClients() const override { return true; } + + bool IsSupportedOnServers() const override { return true; } }; using FilterOwnerList = std::vector>; diff --git a/src/core/ext/xds/xds_http_filters.h b/src/core/ext/xds/xds_http_filters.h index 61e96cde665..332419681ed 100644 --- a/src/core/ext/xds/xds_http_filters.h +++ b/src/core/ext/xds/xds_http_filters.h @@ -101,6 +101,12 @@ class XdsHttpFilterImpl { virtual absl::StatusOr GenerateServiceConfig( const FilterConfig& hcm_filter_config, const FilterConfig* filter_config_override) const = 0; + + // Returns true if the filter is supported on clients; false otherwise + virtual bool IsSupportedOnClients() const = 0; + + // Returns true if the filter is supported on servers; false otherwise + virtual bool IsSupportedOnServers() const = 0; }; class XdsHttpFilterRegistry { diff --git a/src/proto/grpc/testing/xds/v3/listener.proto b/src/proto/grpc/testing/xds/v3/listener.proto index 40ce59cbe09..dd307340b1b 100644 --- a/src/proto/grpc/testing/xds/v3/listener.proto +++ b/src/proto/grpc/testing/xds/v3/listener.proto @@ -40,6 +40,21 @@ message ApiListener { google.protobuf.Any api_listener = 1; } +message Filter { + reserved 3; + + // The name of the filter to instantiate. The name must match a + // :ref:`supported filter `. + string name = 1; + + // [#extension-category: envoy.filters.network] + oneof config_type { + // Filter specific configuration which depends on the filter being + // instantiated. See the supported filters for further documentation. + google.protobuf.Any typed_config = 4; + } +} + message FilterChainMatch { // If non-empty, a list of application protocols (e.g. ALPN for TLS protocol) to consider when // determining a filter chain match. Those values will be compared against the application @@ -70,6 +85,12 @@ message FilterChain { // The criteria to use when matching a connection to this filter chain. FilterChainMatch filter_chain_match = 1; + // A list of individual network filters that make up the filter chain for + // connections established with the listener. Order matters as the filters are + // processed sequentially as connection events happen. Note: If the filter + // list is empty, the connection will close by default. + repeated Filter filters = 3; + // Optional custom transport socket implementation to use for downstream connections. // To setup TLS, set a transport socket with name `tls` and // :ref:`DownstreamTlsContext ` in the `typed_config`. diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 7726b5c679c..d099f4c6240 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -1525,6 +1525,48 @@ std::shared_ptr CreateTlsFallbackCredentials() { return channel_creds; } +// A No-op HTTP filter used for verifying parsing logic. +class NoOpHttpFilter : public grpc_core::XdsHttpFilterImpl { + public: + NoOpHttpFilter(std::string name, bool supported_on_clients, + bool supported_on_servers) + : name_(std::move(name)), + supported_on_clients_(supported_on_clients), + supported_on_servers_(supported_on_servers) {} + + void PopulateSymtab(upb_symtab* /* symtab */) const override {} + + absl::StatusOr + GenerateFilterConfig(upb_strview /* serialized_filter_config */, + upb_arena* /* arena */) const override { + return grpc_core::XdsHttpFilterImpl::FilterConfig{name_, grpc_core::Json()}; + } + + absl::StatusOr + GenerateFilterConfigOverride(upb_strview /*serialized_filter_config*/, + upb_arena* /*arena*/) const override { + return grpc_core::XdsHttpFilterImpl::FilterConfig{name_, grpc_core::Json()}; + } + + const grpc_channel_filter* channel_filter() const override { return nullptr; } + + absl::StatusOr + GenerateServiceConfig( + const FilterConfig& /*hcm_filter_config*/, + const FilterConfig* /*filter_config_override*/) const override { + return grpc_core::XdsHttpFilterImpl::ServiceConfigJsonEntry{name_, ""}; + } + + bool IsSupportedOnClients() const override { return supported_on_clients_; } + + bool IsSupportedOnServers() const override { return supported_on_servers_; } + + private: + const std::string name_; + const bool supported_on_clients_; + const bool supported_on_servers_; +}; + namespace { void* response_generator_arg_copy(void* p) { @@ -3479,6 +3521,65 @@ TEST_P(LdsTest, RejectsUnparseableHttpFilterType) { gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"); } +// Test that we NACK HTTP filters unsupported on client-side. +TEST_P(LdsTest, RejectsHttpFiltersNotSupportedOnClients) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION", "true"); + auto listener = default_listener_; + HttpConnectionManager http_connection_manager; + listener.mutable_api_listener()->mutable_api_listener()->UnpackTo( + &http_connection_manager); + auto* filter = http_connection_manager.add_http_filters(); + filter->set_name("grpc.testing.server_only_http_filter"); + filter->mutable_typed_config()->set_type_url( + "grpc.testing.server_only_http_filter"); + listener.mutable_api_listener()->mutable_api_listener()->PackFrom( + http_connection_manager); + SetListenerAndRouteConfiguration(0, listener, default_route_config_); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + // Wait until xDS server sees NACK. + do { + CheckRpcSendFailure(); + } while (balancers_[0]->ads_service()->lds_response_state().state == + AdsServiceImpl::ResponseState::SENT); + const auto response_state = + balancers_[0]->ads_service()->lds_response_state(); + EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); + EXPECT_THAT( + response_state.error_message, + ::testing::HasSubstr("Filter grpc.testing.server_only_http_filter is not " + "supported on clients")); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"); +} + +// Test that we ignore optional HTTP filters unsupported on client-side. +TEST_P(LdsTest, IgnoresOptionalHttpFiltersNotSupportedOnClients) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION", "true"); + auto listener = default_listener_; + HttpConnectionManager http_connection_manager; + listener.mutable_api_listener()->mutable_api_listener()->UnpackTo( + &http_connection_manager); + auto* filter = http_connection_manager.add_http_filters(); + filter->set_name("grpc.testing.server_only_http_filter"); + filter->mutable_typed_config()->set_type_url( + "grpc.testing.server_only_http_filter"); + filter->set_is_optional(true); + listener.mutable_api_listener()->mutable_api_listener()->PackFrom( + http_connection_manager); + SetListenerAndRouteConfiguration(0, listener, default_route_config_); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", GetBackendPorts(0, 1)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + WaitForBackend(0); + EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state().state, + AdsServiceImpl::ResponseState::ACKED); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"); +} + using LdsRdsTest = BasicTest; // Tests that LDS client should send an ACK upon correct LDS response (with @@ -7221,7 +7322,8 @@ TEST_P(XdsEnabledServerTest, Basic) { ipv6_only_ ? "::1" : "127.0.0.1"); listener.mutable_address()->mutable_socket_address()->set_port_value( backends_[0]->port()); - listener.add_filter_chains(); + listener.add_filter_chains()->add_filters()->mutable_typed_config()->PackFrom( + HttpConnectionManager()); balancers_[0]->ads_service()->SetLdsResource(listener); WaitForBackend(0); CheckRpcSendOk(); @@ -7232,7 +7334,8 @@ TEST_P(XdsEnabledServerTest, BadLdsUpdateNoApiListenerNorAddress) { listener.set_name( absl::StrCat("grpc/server?xds.resource.listening_address=", ipv6_only_ ? "[::1]:" : "127.0.0.1:", backends_[0]->port())); - listener.add_filter_chains(); + listener.add_filter_chains()->add_filters()->mutable_typed_config()->PackFrom( + HttpConnectionManager()); balancers_[0]->ads_service()->SetLdsResource(listener); CheckRpcSendFailure(1, RpcOptions().set_wait_for_ready(true)); const auto response_state = @@ -7254,6 +7357,8 @@ TEST_P(XdsEnabledServerTest, BadLdsUpdateBothApiListenerAndAddress) { listener.mutable_address()->mutable_socket_address()->set_port_value( backends_[0]->port()); auto* filter_chain = listener.add_filter_chains(); + filter_chain->add_filters()->mutable_typed_config()->PackFrom( + HttpConnectionManager()); auto* transport_socket = filter_chain->mutable_transport_socket(); transport_socket->set_name("envoy.transport_sockets.tls"); listener.mutable_api_listener(); @@ -7267,6 +7372,128 @@ TEST_P(XdsEnabledServerTest, BadLdsUpdateBothApiListenerAndAddress) { ::testing::HasSubstr("Listener has both address and ApiListener")); } +TEST_P(XdsEnabledServerTest, UnsupportedL4Filter) { + Listener listener; + listener.set_name( + absl::StrCat("grpc/server?xds.resource.listening_address=", + ipv6_only_ ? "[::1]:" : "127.0.0.1:", backends_[0]->port())); + balancers_[0]->ads_service()->SetLdsResource(listener); + listener.mutable_address()->mutable_socket_address()->set_address( + ipv6_only_ ? "::1" : "127.0.0.1"); + listener.mutable_address()->mutable_socket_address()->set_port_value( + backends_[0]->port()); + auto* filter_chain = listener.add_filter_chains(); + filter_chain->add_filters()->mutable_typed_config()->PackFrom(default_listener_ /* any proto object other than HttpConnectionManager */); + auto* transport_socket = filter_chain->mutable_transport_socket(); + transport_socket->set_name("envoy.transport_sockets.tls"); + balancers_[0]->ads_service()->SetLdsResource(listener); + CheckRpcSendFailure(1, RpcOptions().set_wait_for_ready(true)); + const auto response_state = + balancers_[0]->ads_service()->lds_response_state(); + EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); + EXPECT_THAT(response_state.error_message, + ::testing::HasSubstr("Unsupported filter type")); +} + +TEST_P(XdsEnabledServerTest, UnsupportedHttpFilter) { + // Set env var to enable filters parsing. + gpr_setenv("GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION", "true"); + Listener listener; + listener.set_name( + absl::StrCat("grpc/server?xds.resource.listening_address=", + ipv6_only_ ? "[::1]:" : "127.0.0.1:", backends_[0]->port())); + listener.mutable_address()->mutable_socket_address()->set_address( + ipv6_only_ ? "::1" : "127.0.0.1"); + listener.mutable_address()->mutable_socket_address()->set_port_value( + backends_[0]->port()); + HttpConnectionManager http_connection_manager; + auto* http_filter = http_connection_manager.add_http_filters(); + http_filter->set_name("grpc.testing.unsupported_http_filter"); + http_filter->mutable_typed_config()->set_type_url( + "grpc.testing.unsupported_http_filter"); + listener.add_filter_chains()->add_filters()->mutable_typed_config()->PackFrom( + http_connection_manager); + balancers_[0]->ads_service()->SetLdsResource(listener); + listener.set_name( + absl::StrCat("grpc/server?xds.resource.listening_address=[::1]:", + backends_[0]->port())); + balancers_[0]->ads_service()->SetLdsResource(listener); + CheckRpcSendFailure(1, RpcOptions().set_wait_for_ready(true)); + const auto response_state = + balancers_[0]->ads_service()->lds_response_state(); + EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); + EXPECT_THAT(response_state.error_message, + ::testing::HasSubstr("no filter registered for config type " + "grpc.testing.unsupported_http_filter")); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"); +} + +TEST_P(XdsEnabledServerTest, HttpFilterNotSupportedOnServer) { + // Set env var to enable filters parsing. + gpr_setenv("GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION", "true"); + Listener listener; + listener.set_name( + absl::StrCat("grpc/server?xds.resource.listening_address=", + ipv6_only_ ? "[::1]:" : "127.0.0.1:", backends_[0]->port())); + listener.mutable_address()->mutable_socket_address()->set_address( + ipv6_only_ ? "::1" : "127.0.0.1"); + listener.mutable_address()->mutable_socket_address()->set_port_value( + backends_[0]->port()); + HttpConnectionManager http_connection_manager; + auto* http_filter = http_connection_manager.add_http_filters(); + http_filter->set_name("grpc.testing.client_only_http_filter"); + http_filter->mutable_typed_config()->set_type_url( + "grpc.testing.client_only_http_filter"); + listener.add_filter_chains()->add_filters()->mutable_typed_config()->PackFrom( + http_connection_manager); + balancers_[0]->ads_service()->SetLdsResource(listener); + listener.set_name( + absl::StrCat("grpc/server?xds.resource.listening_address=[::1]:", + backends_[0]->port())); + balancers_[0]->ads_service()->SetLdsResource(listener); + CheckRpcSendFailure(1, RpcOptions().set_wait_for_ready(true)); + const auto response_state = + balancers_[0]->ads_service()->lds_response_state(); + EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); + EXPECT_THAT( + response_state.error_message, + ::testing::HasSubstr("Filter grpc.testing.client_only_http_filter is not " + "supported on servers")); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"); +} + +TEST_P(XdsEnabledServerTest, + HttpFilterNotSupportedOnServerIgnoredWhenOptional) { + // Set env var to enable filters parsing. + gpr_setenv("GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION", "true"); + Listener listener; + listener.set_name( + absl::StrCat("grpc/server?xds.resource.listening_address=", + ipv6_only_ ? "[::1]:" : "127.0.0.1:", backends_[0]->port())); + listener.mutable_address()->mutable_socket_address()->set_address( + ipv6_only_ ? "::1" : "127.0.0.1"); + listener.mutable_address()->mutable_socket_address()->set_port_value( + backends_[0]->port()); + HttpConnectionManager http_connection_manager; + auto* http_filter = http_connection_manager.add_http_filters(); + http_filter->set_name("grpc.testing.client_only_http_filter"); + http_filter->mutable_typed_config()->set_type_url( + "grpc.testing.client_only_http_filter"); + http_filter->set_is_optional(true); + listener.add_filter_chains()->add_filters()->mutable_typed_config()->PackFrom( + http_connection_manager); + balancers_[0]->ads_service()->SetLdsResource(listener); + listener.set_name( + absl::StrCat("grpc/server?xds.resource.listening_address=[::1]:", + backends_[0]->port())); + balancers_[0]->ads_service()->SetLdsResource(listener); + WaitForBackend(0); + const auto response_state = + balancers_[0]->ads_service()->lds_response_state(); + EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::ACKED); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"); +} + // Verify that a mismatch of listening address results in "not serving" status. TEST_P(XdsEnabledServerTest, ListenerAddressMismatch) { Listener listener; @@ -7277,7 +7504,8 @@ TEST_P(XdsEnabledServerTest, ListenerAddressMismatch) { ipv6_only_ ? "::1" : "127.0.0.1"); listener.mutable_address()->mutable_socket_address()->set_port_value( backends_[0]->port()); - listener.add_filter_chains(); + listener.add_filter_chains()->add_filters()->mutable_typed_config()->PackFrom( + HttpConnectionManager()); balancers_[0]->ads_service()->SetLdsResource(listener); WaitForBackend(0); CheckRpcSendOk(); @@ -7354,6 +7582,8 @@ class XdsServerSecurityTest : public XdsEnd2endTest { listener.mutable_address()->mutable_socket_address()->set_port_value( backends_[0]->port()); auto* filter_chain = listener.add_filter_chains(); + filter_chain->add_filters()->mutable_typed_config()->PackFrom( + HttpConnectionManager()); if (!identity_instance_name.empty()) { auto* transport_socket = filter_chain->mutable_transport_socket(); transport_socket->set_name("envoy.transport_sockets.tls"); @@ -7541,6 +7771,8 @@ TEST_P(XdsServerSecurityTest, TlsConfigurationWithoutRootProviderInstance) { socket_address->set_address(ipv6_only_ ? "::1" : "127.0.0.1"); socket_address->set_port_value(backends_[0]->port()); auto* filter_chain = listener.add_filter_chains(); + filter_chain->add_filters()->mutable_typed_config()->PackFrom( + HttpConnectionManager()); auto* transport_socket = filter_chain->mutable_transport_socket(); transport_socket->set_name("envoy.transport_sockets.tls"); DownstreamTlsContext downstream_tls_context; @@ -9829,6 +10061,14 @@ int main(int argc, char** argv) { absl::make_unique( "fake2", &grpc::testing::g_fake2_cert_data_map)); grpc_init(); + grpc_core::XdsHttpFilterRegistry::RegisterFilter( + absl::make_unique( + "grpc.testing.client_only_http_filter", true, false), + {"grpc.testing.client_only_http_filter"}); + grpc_core::XdsHttpFilterRegistry::RegisterFilter( + absl::make_unique( + "grpc.testing.server_only_http_filter", false, true), + {"grpc.testing.server_only_http_filter"}); const auto result = RUN_ALL_TESTS(); grpc_shutdown(); return result;