From 5acae9fc0717e4b8041a22c39d99f19c752fe19d Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 15 Sep 2020 07:45:31 -0700 Subject: [PATCH] Change XdsClient to support multiple LDS and RDS watchers. --- .../filters/client_channel/client_channel.cc | 4 + .../client_channel/lb_policy/xds/cds.cc | 5 +- .../client_channel/lb_policy/xds/eds.cc | 16 +- .../resolver/xds/xds_resolver.cc | 182 ++++++--- .../client_channel/resolving_lb_policy.cc | 4 + src/core/ext/xds/xds_api.cc | 83 ++-- src/core/ext/xds/xds_api.h | 15 +- src/core/ext/xds/xds_client.cc | 366 +++++++++++------- src/core/ext/xds/xds_client.h | 80 +++- 9 files changed, 524 insertions(+), 231 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index f3d13330e61..24237d8fb29 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1443,6 +1443,10 @@ ChannelData::ChannelConfigHelper::ApplyServiceConfig( // If resolver did not return a service config or returned an invalid service // config, we need a fallback service config. if (result.service_config_error != GRPC_ERROR_NONE) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s", + chand_, grpc_error_string(result.service_config_error)); + } // If the service config was invalid, then fallback to the saved service // config. If there is no saved config either, use the default service // config. diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index ca618d081d6..5f991f90910 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -288,7 +288,6 @@ CdsLb::~CdsLb() { if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] destroying cds LB policy", this); } - grpc_channel_args_destroy(args_); } void CdsLb::ShutdownLocked() { @@ -305,8 +304,10 @@ void CdsLb::ShutdownLocked() { } xds_client_->CancelClusterDataWatch(config_->cluster(), cluster_watcher_); } - xds_client_.reset(); + xds_client_.reset(DEBUG_LOCATION, "CdsLb"); } + grpc_channel_args_destroy(args_); + args_ = nullptr; } void CdsLb::MaybeDestroyChildPolicyLocked() { diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc index 406e0f05cba..dfa32f26f37 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc @@ -400,7 +400,6 @@ EdsLb::~EdsLb() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { gpr_log(GPR_INFO, "[edslb %p] destroying xds LB policy", this); } - grpc_channel_args_destroy(args_); } void EdsLb::ShutdownLocked() { @@ -426,9 +425,15 @@ void EdsLb::ShutdownLocked() { xds_client()->CancelEndpointDataWatch(GetEdsResourceName(), endpoint_watcher_); } - xds_client_from_channel_.reset(); + xds_client_from_channel_.reset(DEBUG_LOCATION, "EdsLb"); + } + if (xds_client_ != nullptr) { + grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(), + interested_parties()); + xds_client_.reset(); } - xds_client_.reset(); + grpc_channel_args_destroy(args_); + args_ = nullptr; } void EdsLb::MaybeDestroyChildPolicyLocked() { @@ -456,11 +461,12 @@ void EdsLb::UpdateLocked(UpdateArgs args) { if (xds_client_from_channel_ == nullptr) { grpc_error* error = GRPC_ERROR_NONE; xds_client_ = MakeOrphanable( - work_serializer(), interested_parties(), GetEdsResourceName(), - nullptr /* service config watcher */, *args_, &error); + work_serializer(), GetEdsResourceName(), *args_, &error); // TODO(roth): If we decide that we care about EDS-only mode, add // proper error handling here. GPR_ASSERT(error == GRPC_ERROR_NONE); + grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(), + interested_parties()); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { gpr_log(GPR_INFO, "[edslb %p] Created xds client %p", this, xds_client_.get()); diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index 2ce29e9d070..e961c3b4b84 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -66,19 +66,26 @@ class XdsResolver : public Resolver { void StartLocked() override; - void ShutdownLocked() override { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { - gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this); - } - xds_client_.reset(); - } + void ShutdownLocked() override; private: class ListenerWatcher : public XdsClient::ListenerWatcherInterface { public: explicit ListenerWatcher(RefCountedPtr resolver) : resolver_(std::move(resolver)) {} - void OnListenerChanged(std::vector routes) override; + void OnListenerChanged(XdsApi::LdsUpdate listener) override; + void OnError(grpc_error* error) override; + void OnResourceDoesNotExist() override; + + private: + RefCountedPtr resolver_; + }; + + class RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface { + public: + explicit RouteConfigWatcher(RefCountedPtr resolver) + : resolver_(std::move(resolver)) {} + void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override; void OnError(grpc_error* error) override; void OnResourceDoesNotExist() override; @@ -125,16 +132,21 @@ class XdsResolver : public Resolver { std::map> clusters_; }; - void OnListenerChanged(std::vector routes); - grpc_error* CreateServiceConfig(RefCountedPtr* service_config); + void OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update); void OnError(grpc_error* error); - void PropagateUpdate(); + void OnResourceDoesNotExist(); + + grpc_error* CreateServiceConfig(RefCountedPtr* service_config); + void GenerateResult(); void MaybeRemoveUnusedClusters(); std::string server_name_; const grpc_channel_args* args_; grpc_pollset_set* interested_parties_; OrphanablePtr xds_client_; + XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr; + std::string route_config_name_; + XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr; ClusterState::ClusterStateMap cluster_state_map_; std::vector current_update_; }; @@ -144,34 +156,69 @@ class XdsResolver : public Resolver { // void XdsResolver::ListenerWatcher::OnListenerChanged( - std::vector routes) { + XdsApi::LdsUpdate listener) { if (resolver_->xds_client_ == nullptr) return; if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", resolver_.get()); } - resolver_->OnListenerChanged(std::move(routes)); + if (listener.route_config_name != resolver_->route_config_name_) { + if (resolver_->route_config_watcher_ != nullptr) { + resolver_->xds_client_->CancelRouteConfigDataWatch( + resolver_->route_config_name_, resolver_->route_config_watcher_, + /*delay_unsubscription=*/!listener.route_config_name.empty()); + resolver_->route_config_watcher_ = nullptr; + } + resolver_->route_config_name_ = std::move(listener.route_config_name); + if (!resolver_->route_config_name_.empty()) { + auto watcher = absl::make_unique(resolver_->Ref()); + resolver_->route_config_watcher_ = watcher.get(); + resolver_->xds_client_->WatchRouteConfigData( + resolver_->route_config_name_, std::move(watcher)); + } + } + if (resolver_->route_config_name_.empty()) { + GPR_ASSERT(listener.rds_update.has_value()); + resolver_->OnRouteConfigUpdate(std::move(*listener.rds_update)); + } } void XdsResolver::ListenerWatcher::OnError(grpc_error* error) { if (resolver_->xds_client_ == nullptr) return; - gpr_log(GPR_ERROR, "[xds_resolver %p] received error: %s", resolver_.get(), - grpc_error_string(error)); + gpr_log(GPR_ERROR, "[xds_resolver %p] received listener error: %s", + resolver_.get(), grpc_error_string(error)); resolver_->OnError(error); } void XdsResolver::ListenerWatcher::OnResourceDoesNotExist() { if (resolver_->xds_client_ == nullptr) return; - gpr_log(GPR_ERROR, - "[xds_resolver %p] LDS/RDS resource does not exist -- returning " - "empty service config", - resolver_.get()); - Result result; - result.service_config = - ServiceConfig::Create("{}", &result.service_config_error); - GPR_ASSERT(result.service_config != nullptr); - result.args = grpc_channel_args_copy(resolver_->args_); - resolver_->result_handler()->ReturnResult(std::move(result)); + resolver_->OnResourceDoesNotExist(); +} + +// +// XdsResolver::RouteConfigWatcher +// + +void XdsResolver::RouteConfigWatcher::OnRouteConfigChanged( + XdsApi::RdsUpdate route_config) { + if (resolver_->xds_client_ == nullptr) return; + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { + gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config data", + resolver_.get()); + } + resolver_->OnRouteConfigUpdate(std::move(route_config)); +} + +void XdsResolver::RouteConfigWatcher::OnError(grpc_error* error) { + if (resolver_->xds_client_ == nullptr) return; + gpr_log(GPR_ERROR, "[xds_resolver %p] received route config error: %s", + resolver_.get(), grpc_error_string(error)); + resolver_->OnError(error); +} + +void XdsResolver::RouteConfigWatcher::OnResourceDoesNotExist() { + if (resolver_->xds_client_ == nullptr) return; + resolver_->OnResourceDoesNotExist(); } // @@ -420,24 +467,78 @@ ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig( void XdsResolver::StartLocked() { grpc_error* error = GRPC_ERROR_NONE; - xds_client_ = MakeOrphanable( - work_serializer(), interested_parties_, server_name_, - absl::make_unique(Ref()), *args_, &error); + xds_client_ = MakeOrphanable(work_serializer(), server_name_, + *args_, &error); if (error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Failed to create xds client -- channel will remain in " "TRANSIENT_FAILURE: %s", grpc_error_string(error)); result_handler()->ReturnError(error); + return; + } + grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(), + interested_parties_); + auto watcher = absl::make_unique(Ref()); + listener_watcher_ = watcher.get(); + xds_client_->WatchListenerData(server_name_, std::move(watcher)); +} + +void XdsResolver::ShutdownLocked() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { + gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this); + } + if (xds_client_ != nullptr) { + if (listener_watcher_ != nullptr) { + xds_client_->CancelListenerDataWatch(server_name_, listener_watcher_, + /*delay_unsubscription=*/false); + } + if (route_config_watcher_ != nullptr) { + xds_client_->CancelRouteConfigDataWatch( + server_name_, route_config_watcher_, /*delay_unsubscription=*/false); + } + grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(), + interested_parties_); + xds_client_.reset(); + } +} + +void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) { + // Find the relevant VirtualHost from the RouteConfiguration. + XdsApi::RdsUpdate::VirtualHost* vhost = + rds_update.FindVirtualHostForDomain(server_name_); + if (vhost == nullptr) { + OnError(GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("could not find VirtualHost for ", server_name_, + " in RouteConfiguration") + .c_str())); + return; } + // Save the list of routes in the resolver. + current_update_ = std::move(vhost->routes); + // Send a new result to the channel. + GenerateResult(); +} + +void XdsResolver::OnError(grpc_error* error) { + grpc_arg xds_client_arg = xds_client_->MakeChannelArg(); + Result result; + result.args = grpc_channel_args_copy_and_add(args_, &xds_client_arg, 1); + result.service_config_error = error; + result_handler()->ReturnResult(std::move(result)); } -void XdsResolver::OnListenerChanged(std::vector routes) { - // Save the update in the resolver. - current_update_ = std::move(routes); - // Propagate the update by creating XdsConfigSelector, CreateServiceConfig, - // and ReturnResult. - PropagateUpdate(); +void XdsResolver::OnResourceDoesNotExist() { + gpr_log(GPR_ERROR, + "[xds_resolver %p] LDS/RDS resource does not exist -- returning " + "empty service config", + this); + Result result; + result.service_config = + ServiceConfig::Create("{}", &result.service_config_error); + GPR_ASSERT(result.service_config != nullptr); + result.args = grpc_channel_args_copy(args_); + result_handler()->ReturnResult(std::move(result)); } grpc_error* XdsResolver::CreateServiceConfig( @@ -472,15 +573,7 @@ grpc_error* XdsResolver::CreateServiceConfig( return error; } -void XdsResolver::OnError(grpc_error* error) { - grpc_arg xds_client_arg = xds_client_->MakeChannelArg(); - Result result; - result.args = grpc_channel_args_copy_and_add(args_, &xds_client_arg, 1); - result.service_config_error = error; - result_handler()->ReturnResult(std::move(result)); -} - -void XdsResolver::PropagateUpdate() { +void XdsResolver::GenerateResult() { // First create XdsConfigSelector, which may add new entries to the cluster // state map, and then CreateServiceConfig for LB policies. auto config_selector = @@ -516,9 +609,8 @@ void XdsResolver::MaybeRemoveUnusedClusters() { } } if (update_needed && xds_client_ != nullptr) { - // Propagate the update by creating XdsConfigSelector, CreateServiceConfig, - // and ReturnResult. - PropagateUpdate(); + // Send a new result to the channel. + GenerateResult(); } } diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/src/core/ext/filters/client_channel/resolving_lb_policy.cc index e95d1a779f3..6f14c1bcc65 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.cc +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.cc @@ -173,6 +173,10 @@ ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() { void ResolvingLoadBalancingPolicy::ShutdownLocked() { if (resolver_ != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { + gpr_log(GPR_INFO, "resolving_lb=%p: shutting down resolver=%p", this, + resolver_.get()); + } resolver_.reset(); if (lb_policy_ != nullptr) { if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc index a39f51f85b7..7d9d2482122 100644 --- a/src/core/ext/xds/xds_api.cc +++ b/src/core/ext/xds/xds_api.cc @@ -228,8 +228,8 @@ std::string XdsApi::Route::Matchers::HeaderMatcher::ToString() const { std::string XdsApi::Route::Matchers::ToString() const { std::vector contents; contents.push_back(path_matcher.ToString()); - for (const auto& header_it : header_matchers) { - contents.push_back(header_it.ToString()); + for (const HeaderMatcher& header_matcher : header_matchers) { + contents.push_back(header_matcher.ToString()); } if (fraction_per_million.has_value()) { contents.push_back(absl::StrFormat("Fraction Per Million %d", @@ -248,8 +248,8 @@ std::string XdsApi::Route::ToString() const { if (!cluster_name.empty()) { contents.push_back(absl::StrFormat("Cluster name: %s", cluster_name)); } - for (const auto& weighted_it : weighted_clusters) { - contents.push_back(weighted_it.ToString()); + for (const ClusterWeight& cluster_weight : weighted_clusters) { + contents.push_back(cluster_weight.ToString()); } return absl::StrJoin(contents, "\n"); } @@ -333,8 +333,8 @@ MatchType DomainPatternMatchType(const std::string& domain_pattern) { } // namespace -const XdsApi::RdsUpdate::VirtualHost* -XdsApi::RdsUpdate::FindVirtualHostForDomain(const std::string& domain) const { +XdsApi::RdsUpdate::VirtualHost* XdsApi::RdsUpdate::FindVirtualHostForDomain( + const std::string& domain) { // Find the best matched virtual host. // The search order for 4 groups of domain patterns: // 1. Exact match. @@ -344,12 +344,12 @@ XdsApi::RdsUpdate::FindVirtualHostForDomain(const std::string& domain) const { // Within each group, longest match wins. // If the same best matched domain pattern appears in multiple virtual hosts, // the first matched virtual host wins. - const VirtualHost* target_vhost = nullptr; + VirtualHost* target_vhost = nullptr; MatchType best_match_type = INVALID_MATCH; size_t longest_match = 0; // Check each domain pattern in each virtual host to determine the best // matched virtual host. - for (const VirtualHost& vhost : virtual_hosts) { + for (VirtualHost& vhost : virtual_hosts) { for (const std::string& domain_pattern : vhost.domains) { // Check the match type first. Skip the pattern if it's not better than // current match. @@ -1568,7 +1568,9 @@ grpc_error* RouteConfigParse( std::string domain_pattern = UpbStringToStdString(domains[j]); const MatchType match_type = DomainPatternMatchType(domain_pattern); if (match_type == INVALID_MATCH) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Invalid domain pattern."); + return GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("Invalid domain pattern \"", domain_pattern, "\".") + .c_str()); } vhost.domains.emplace_back(std::move(domain_pattern)); } @@ -1624,8 +1626,8 @@ grpc_error* RouteConfigParse( grpc_error* LdsResponseParse( XdsClient* client, TraceFlag* tracer, const envoy_service_discovery_v3_DiscoveryResponse* response, - const std::string& expected_server_name, - absl::optional* lds_update, upb_arena* arena) { + const std::set& expected_listener_names, + XdsApi::LdsUpdateMap* lds_update_map, upb_arena* arena) { // Get the resources from the response. size_t size; const google_protobuf_Any* const* resources = @@ -1647,9 +1649,19 @@ grpc_error* LdsResponseParse( return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode listener."); } // Check listener name. Ignore unexpected listeners. - absl::string_view name = - UpbStringToAbsl(envoy_config_listener_v3_Listener_name(listener)); - if (name != expected_server_name) continue; + std::string listener_name = + UpbStringToStdString(envoy_config_listener_v3_Listener_name(listener)); + if (expected_listener_names.find(listener_name) == + expected_listener_names.end()) { + continue; + } + // Fail if listener name is duplicated. + if (lds_update_map->find(listener_name) != lds_update_map->end()) { + return GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("duplicate listener name \"", listener_name, "\"") + .c_str()); + } + XdsApi::LdsUpdate& lds_update = (*lds_update_map)[listener_name]; // Get api_listener and decode it to http_connection_manager. const envoy_config_listener_v3_ApiListener* api_listener = envoy_config_listener_v3_Listener_api_listener(listener); @@ -1677,9 +1689,8 @@ grpc_error* LdsResponseParse( grpc_error* error = RouteConfigParse(client, tracer, route_config, &rds_update); if (error != GRPC_ERROR_NONE) return error; - lds_update->emplace(); - (*lds_update)->rds_update = std::move(rds_update); - return GRPC_ERROR_NONE; + lds_update.rds_update = std::move(rds_update); + continue; } // Validate that RDS must be used to get the route_config dynamically. if (!envoy_extensions_filters_network_http_connection_manager_v3_HttpConnectionManager_has_rds( @@ -1703,11 +1714,9 @@ grpc_error* LdsResponseParse( "HttpConnectionManager ConfigSource for RDS does not specify ADS."); } // Get the route_config_name. - lds_update->emplace(); - (*lds_update)->route_config_name = UpbStringToStdString( + lds_update.route_config_name = UpbStringToStdString( envoy_extensions_filters_network_http_connection_manager_v3_Rds_route_config_name( rds)); - return GRPC_ERROR_NONE; } return GRPC_ERROR_NONE; } @@ -1716,7 +1725,7 @@ grpc_error* RdsResponseParse( XdsClient* client, TraceFlag* tracer, const envoy_service_discovery_v3_DiscoveryResponse* response, const std::set& expected_route_configuration_names, - absl::optional* rds_update, upb_arena* arena) { + XdsApi::RdsUpdateMap* rds_update_map, upb_arena* arena) { // Get the resources from the response. size_t size; const google_protobuf_Any* const* resources = @@ -1738,19 +1747,25 @@ grpc_error* RdsResponseParse( return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode route_config."); } // Check route_config_name. Ignore unexpected route_config. - absl::string_view route_config_name = UpbStringToAbsl( + std::string route_config_name = UpbStringToStdString( envoy_config_route_v3_RouteConfiguration_name(route_config)); if (expected_route_configuration_names.find(route_config_name) == expected_route_configuration_names.end()) { continue; } + // Fail if route config name is duplicated. + if (rds_update_map->find(route_config_name) != rds_update_map->end()) { + return GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("duplicate route config name \"", route_config_name, + "\"") + .c_str()); + } // Parse the route_config. - XdsApi::RdsUpdate local_rds_update; + XdsApi::RdsUpdate& rds_update = + (*rds_update_map)[std::move(route_config_name)]; grpc_error* error = - RouteConfigParse(client, tracer, route_config, &local_rds_update); + RouteConfigParse(client, tracer, route_config, &rds_update); if (error != GRPC_ERROR_NONE) return error; - rds_update->emplace(std::move(local_rds_update)); - return GRPC_ERROR_NONE; } return GRPC_ERROR_NONE; } @@ -1766,7 +1781,6 @@ grpc_error* CdsResponseParse( envoy_service_discovery_v3_DiscoveryResponse_resources(response, &size); // Parse all the resources in the CDS response. for (size_t i = 0; i < size; ++i) { - XdsApi::CdsUpdate cds_update; // Check the type_url of the resource. absl::string_view type_url = UpbStringToAbsl(google_protobuf_Any_type_url(resources[i])); @@ -1795,6 +1809,7 @@ grpc_error* CdsResponseParse( absl::StrCat("duplicate resource name \"", cluster_name, "\"") .c_str()); } + XdsApi::CdsUpdate& cds_update = (*cds_update_map)[std::move(cluster_name)]; // Check the cluster_discovery_type. if (!envoy_config_cluster_v3_Cluster_has_type(cluster)) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType not found."); @@ -1836,7 +1851,6 @@ grpc_error* CdsResponseParse( } cds_update.lrs_load_reporting_server_name.emplace(""); } - cds_update_map->emplace(std::move(cluster_name), std::move(cds_update)); } return GRPC_ERROR_NONE; } @@ -1962,7 +1976,6 @@ grpc_error* EdsResponseParse( const google_protobuf_Any* const* resources = envoy_service_discovery_v3_DiscoveryResponse_resources(response, &size); for (size_t i = 0; i < size; ++i) { - XdsApi::EdsUpdate eds_update; // Check the type_url of the resource. absl::string_view type_url = UpbStringToAbsl(google_protobuf_Any_type_url(resources[i])); @@ -1995,6 +2008,8 @@ grpc_error* EdsResponseParse( absl::StrCat("duplicate resource name \"", eds_service_name, "\"") .c_str()); } + XdsApi::EdsUpdate& eds_update = + (*eds_update_map)[std::move(eds_service_name)]; // Get the endpoints. size_t locality_size; const envoy_config_endpoint_v3_LocalityLbEndpoints* const* endpoints = @@ -2038,7 +2053,6 @@ grpc_error* EdsResponseParse( if (error != GRPC_ERROR_NONE) return error; } } - eds_update_map->emplace(std::move(eds_service_name), std::move(eds_update)); } return GRPC_ERROR_NONE; } @@ -2059,7 +2073,8 @@ std::string TypeUrlInternalToExternal(absl::string_view type_url) { } // namespace XdsApi::AdsParseResult XdsApi::ParseAdsResponse( - const grpc_slice& encoded_response, const std::string& expected_server_name, + const grpc_slice& encoded_response, + const std::set& expected_listener_names, const std::set& expected_route_configuration_names, const std::set& expected_cluster_names, const std::set& expected_eds_service_names) { @@ -2087,12 +2102,12 @@ XdsApi::AdsParseResult XdsApi::ParseAdsResponse( // Parse the response according to the resource type. if (IsLds(result.type_url)) { result.parse_error = - LdsResponseParse(client_, tracer_, response, expected_server_name, - &result.lds_update, arena.ptr()); + LdsResponseParse(client_, tracer_, response, expected_listener_names, + &result.lds_update_map, arena.ptr()); } else if (IsRds(result.type_url)) { result.parse_error = RdsResponseParse(client_, tracer_, response, expected_route_configuration_names, - &result.rds_update, arena.ptr()); + &result.rds_update_map, arena.ptr()); } else if (IsCds(result.type_url)) { result.parse_error = CdsResponseParse(client_, tracer_, response, expected_cluster_names, diff --git a/src/core/ext/xds/xds_api.h b/src/core/ext/xds/xds_api.h index f6e0c7db555..9cce2d37064 100644 --- a/src/core/ext/xds/xds_api.h +++ b/src/core/ext/xds/xds_api.h @@ -147,8 +147,7 @@ class XdsApi { return virtual_hosts == other.virtual_hosts; } std::string ToString() const; - const VirtualHost* FindVirtualHostForDomain( - const std::string& domain) const; + VirtualHost* FindVirtualHostForDomain(const std::string& domain); }; // TODO(roth): When we can use absl::variant<>, consider using that @@ -179,6 +178,12 @@ class XdsApi { // If set to the empty string, will use the same server we obtained the CDS // data from. absl::optional lrs_load_reporting_server_name; + + bool operator==(const CdsUpdate& other) const { + return eds_service_name == other.eds_service_name && + lrs_load_reporting_server_name == + other.lrs_load_reporting_server_name; + } }; using CdsUpdateMap = std::map; @@ -296,14 +301,14 @@ class XdsApi { std::string version; std::string nonce; std::string type_url; - absl::optional lds_update; - absl::optional rds_update; + LdsUpdateMap lds_update_map; + RdsUpdateMap rds_update_map; CdsUpdateMap cds_update_map; EdsUpdateMap eds_update_map; }; AdsParseResult ParseAdsResponse( const grpc_slice& encoded_response, - const std::string& expected_server_name, + const std::set& expected_listener_names, const std::set& expected_route_configuration_names, const std::set& expected_cluster_names, const std::set& expected_eds_service_names); diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index d17255fd342..216ca0e1aec 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -144,14 +144,14 @@ class XdsClient::ChannelState::AdsCallState void Orphan() override { Finish(); - Unref(); + Unref(DEBUG_LOCATION, "Orphan"); } void Start(RefCountedPtr ads_calld) { if (sent_) return; sent_ = true; ads_calld_ = std::move(ads_calld); - Ref().release(); + Ref(DEBUG_LOCATION, "timer").release(); timer_pending_ = true; grpc_timer_init( &timer_, @@ -186,27 +186,34 @@ class XdsClient::ChannelState::AdsCallState gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(), grpc_error_string(watcher_error)); } - if (type_url_ == XdsApi::kLdsTypeUrl || - type_url_ == XdsApi::kRdsTypeUrl) { - ads_calld_->xds_client()->listener_watcher_->OnError(watcher_error); + if (type_url_ == XdsApi::kLdsTypeUrl) { + ListenerState& state = ads_calld_->xds_client()->listener_map_[name_]; + for (const auto& p : state.watchers) { + p.first->OnError(GRPC_ERROR_REF(watcher_error)); + } + } else if (type_url_ == XdsApi::kRdsTypeUrl) { + RouteConfigState& state = + ads_calld_->xds_client()->route_config_map_[name_]; + for (const auto& p : state.watchers) { + p.first->OnError(GRPC_ERROR_REF(watcher_error)); + } } else if (type_url_ == XdsApi::kCdsTypeUrl) { ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_]; for (const auto& p : state.watchers) { p.first->OnError(GRPC_ERROR_REF(watcher_error)); } - GRPC_ERROR_UNREF(watcher_error); } else if (type_url_ == XdsApi::kEdsTypeUrl) { EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_]; for (const auto& p : state.watchers) { p.first->OnError(GRPC_ERROR_REF(watcher_error)); } - GRPC_ERROR_UNREF(watcher_error); } else { GPR_UNREACHABLE_CODE(return ); } + GRPC_ERROR_UNREF(watcher_error); } ads_calld_.reset(); - Unref(); + Unref(DEBUG_LOCATION, "timer"); GRPC_ERROR_UNREF(error); } @@ -235,8 +242,8 @@ class XdsClient::ChannelState::AdsCallState void SendMessageLocked(const std::string& type_url); - void AcceptLdsUpdate(absl::optional lds_update); - void AcceptRdsUpdate(absl::optional rds_update); + void AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map); + void AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map); void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map); void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map); @@ -489,6 +496,7 @@ XdsClient::ChannelState::~ChannelState() { this); } grpc_channel_destroy(channel_); + xds_client_.reset(DEBUG_LOCATION, "ChannelState"); } void XdsClient::ChannelState::Orphan() { @@ -525,7 +533,7 @@ void XdsClient::ChannelState::StartConnectivityWatchLocked() { grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_)); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); - watcher_ = new StateWatcher(Ref()); + watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "ChannelState+watch")); grpc_client_channel_start_connectivity_watch( client_channel_elem, GRPC_CHANNEL_IDLE, OrphanablePtr(watcher_)); @@ -560,8 +568,11 @@ void XdsClient::ChannelState::Unsubscribe(const std::string& type_url, const std::string& name, bool delay_unsubscription) { if (ads_calld_ != nullptr) { - ads_calld_->calld()->Unsubscribe(type_url, name, delay_unsubscription); - if (!ads_calld_->calld()->HasSubscribedResources()) ads_calld_.reset(); + auto* calld = ads_calld_->calld(); + if (calld != nullptr) { + calld->Unsubscribe(type_url, name, delay_unsubscription); + if (!calld->HasSubscribedResources()) ads_calld_.reset(); + } } } @@ -678,7 +689,6 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( // activity in xds_client()->interested_parties_, which is comprised of // the polling entities from client_channel. GPR_ASSERT(xds_client() != nullptr); - GPR_ASSERT(!xds_client()->server_name_.empty()); // Create a call with the specified method name. const auto& method = xds_client()->bootstrap_->server().ShouldUseV3() @@ -717,13 +727,11 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( // Op: send request message. GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, grpc_schedule_on_exec_ctx); - if (xds_client()->listener_watcher_ != nullptr) { - Subscribe(XdsApi::kLdsTypeUrl, xds_client()->server_name_); - if (xds_client()->lds_result_.has_value() && - !xds_client()->lds_result_->route_config_name.empty()) { - Subscribe(XdsApi::kRdsTypeUrl, - xds_client()->lds_result_->route_config_name); - } + for (const auto& p : xds_client()->listener_map_) { + Subscribe(XdsApi::kLdsTypeUrl, std::string(p.first)); + } + for (const auto& p : xds_client()->route_config_map_) { + Subscribe(XdsApi::kRdsTypeUrl, std::string(p.first)); } for (const auto& p : xds_client()->cluster_map_) { Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first)); @@ -867,113 +875,128 @@ bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const { } void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( - absl::optional lds_update) { - if (!lds_update.has_value()) { - gpr_log(GPR_INFO, - "[xds_client %p] LDS update does not include requested resource", - xds_client()); - if (xds_client()->lds_result_.has_value() && - !xds_client()->lds_result_->route_config_name.empty()) { - Unsubscribe(XdsApi::kRdsTypeUrl, - xds_client()->lds_result_->route_config_name, - /*delay_unsubscription=*/false); - xds_client()->rds_result_.reset(); - } - xds_client()->lds_result_.reset(); - xds_client()->listener_watcher_->OnResourceDoesNotExist(); - return; - } + XdsApi::LdsUpdateMap lds_update_map) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, - "[xds_client %p] LDS update received: route_config_name=%s", - xds_client(), - (!lds_update->route_config_name.empty() - ? lds_update->route_config_name.c_str() - : "")); - if (lds_update->rds_update.has_value()) { - gpr_log(GPR_INFO, "RouteConfiguration: %s", - lds_update->rds_update->ToString().c_str()); - } + "[xds_client %p] LDS update received containing %" PRIuPTR + " resources", + xds_client(), lds_update_map.size()); } auto& lds_state = state_map_[XdsApi::kLdsTypeUrl]; - auto& state = lds_state.subscribed_resources[xds_client()->server_name_]; - if (state != nullptr) state->Finish(); - // Ignore identical update. - if (xds_client()->lds_result_ == lds_update) { + std::set rds_resource_names_seen; + for (auto& p : lds_update_map) { + const std::string& listener_name = p.first; + XdsApi::LdsUpdate& lds_update = p.second; + auto& state = lds_state.subscribed_resources[listener_name]; + if (state != nullptr) state->Finish(); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] LDS update identical to current, ignoring.", - xds_client()); + gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: route_config_name=%s", + xds_client(), listener_name.c_str(), + (!lds_update.route_config_name.empty() + ? lds_update.route_config_name.c_str() + : "")); + if (lds_update.rds_update.has_value()) { + gpr_log(GPR_INFO, "RouteConfiguration: %s", + lds_update.rds_update->ToString().c_str()); + } + } + // Record the RDS resource names seen. + if (!lds_update.route_config_name.empty()) { + rds_resource_names_seen.insert(lds_update.route_config_name); + } + // Ignore identical update. + ListenerState& listener_state = xds_client()->listener_map_[listener_name]; + if (listener_state.update.has_value() && + *listener_state.update == lds_update) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] LDS update for %s identical to current, " + "ignoring.", + xds_client(), listener_name.c_str()); + } + continue; + } + // Update the listener state. + listener_state.update = std::move(lds_update); + // Notify watchers. + for (const auto& p : listener_state.watchers) { + p.first->OnListenerChanged(*listener_state.update); } - return; } - if (xds_client()->lds_result_.has_value() && - !xds_client()->lds_result_->route_config_name.empty()) { - Unsubscribe( - XdsApi::kRdsTypeUrl, xds_client()->lds_result_->route_config_name, - /*delay_unsubscription=*/!lds_update->route_config_name.empty()); - xds_client()->rds_result_.reset(); - } - xds_client()->lds_result_ = std::move(lds_update); - if (xds_client()->lds_result_->rds_update.has_value()) { - // If the RouteConfiguration was found inlined in LDS response, notify - // the watcher immediately. - const XdsApi::RdsUpdate::VirtualHost* vhost = - xds_client()->lds_result_->rds_update->FindVirtualHostForDomain( - xds_client()->server_name_); - if (vhost == nullptr) { - xds_client()->listener_watcher_->OnError( - GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "no VirtualHost found for domain")); - } else { - xds_client()->listener_watcher_->OnListenerChanged(vhost->routes); + // For any subscribed resource that is not present in the update, + // remove it from the cache and notify watchers that it does not exist. + for (const auto& p : lds_state.subscribed_resources) { + const std::string& listener_name = p.first; + if (lds_update_map.find(listener_name) == lds_update_map.end()) { + ListenerState& listener_state = + xds_client()->listener_map_[listener_name]; + // If the resource was newly requested but has not yet been received, + // we don't want to generate an error for the watchers, because this LDS + // response may be in reaction to an earlier request that did not yet + // request the new resource, so its absence from the response does not + // necessarily indicate that the resource does not exist. + // For that case, we rely on the request timeout instead. + if (!listener_state.update.has_value()) continue; + listener_state.update.reset(); + for (const auto& p : listener_state.watchers) { + p.first->OnResourceDoesNotExist(); + } + } + } + // For any RDS resource that is no longer referred to by any LDS + // resources, remove it from the cache and notify watchers that it + // does not exist. + auto& rds_state = state_map_[XdsApi::kRdsTypeUrl]; + for (const auto& p : rds_state.subscribed_resources) { + const std::string& rds_resource_name = p.first; + if (rds_resource_names_seen.find(rds_resource_name) == + rds_resource_names_seen.end()) { + RouteConfigState& route_config_state = + xds_client()->route_config_map_[rds_resource_name]; + route_config_state.update.reset(); + for (const auto& p : route_config_state.watchers) { + p.first->OnResourceDoesNotExist(); + } } - } else { - // Send RDS request for dynamic resolution. - Subscribe(XdsApi::kRdsTypeUrl, - xds_client()->lds_result_->route_config_name); } } void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate( - absl::optional rds_update) { - if (!rds_update.has_value()) { - gpr_log(GPR_INFO, - "[xds_client %p] RDS update does not include requested resource", - xds_client()); - xds_client()->rds_result_.reset(); - xds_client()->listener_watcher_->OnResourceDoesNotExist(); - return; - } + XdsApi::RdsUpdateMap rds_update_map) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, "[xds_client %p] RDS update received:\n%s", xds_client(), - rds_update->ToString().c_str()); + gpr_log(GPR_INFO, + "[xds_client %p] RDS update received containing %" PRIuPTR + " resources", + xds_client(), rds_update_map.size()); } - auto& rds_state = state_map_[XdsApi::kRdsTypeUrl]; - auto& state = - rds_state - .subscribed_resources[xds_client()->lds_result_->route_config_name]; - if (state != nullptr) state->Finish(); - // Ignore identical update. - if (xds_client()->rds_result_ == rds_update) { + auto& rds_state = state_map_[XdsApi::kLdsTypeUrl]; + for (auto& p : rds_update_map) { + const std::string& route_config_name = p.first; + XdsApi::RdsUpdate& rds_update = p.second; + auto& state = rds_state.subscribed_resources[route_config_name]; + if (state != nullptr) state->Finish(); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] RDS update identical to current, ignoring.", - xds_client()); + gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(), + rds_update.ToString().c_str()); + } + RouteConfigState& route_config_state = + xds_client()->route_config_map_[route_config_name]; + // Ignore identical update. + if (route_config_state.update.has_value() && + *route_config_state.update == rds_update) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] RDS resource identical to current, ignoring", + xds_client()); + } + continue; + } + // Update the cache. + route_config_state.update = std::move(rds_update); + // Notify all watchers. + for (const auto& p : route_config_state.watchers) { + p.first->OnRouteConfigChanged(*route_config_state.update); } - return; - } - xds_client()->rds_result_ = std::move(rds_update); - // Notify the watcher. - const XdsApi::RdsUpdate::VirtualHost* vhost = - xds_client()->rds_result_->FindVirtualHostForDomain( - xds_client()->server_name_); - if (vhost == nullptr) { - xds_client()->listener_watcher_->OnError( - GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "no VirtualHost found for domain")); - } else { - xds_client()->listener_watcher_->OnListenerChanged(vhost->routes); } } @@ -1008,9 +1031,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate( // Ignore identical update. ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name]; if (cluster_state.update.has_value() && - cds_update.eds_service_name == cluster_state.update->eds_service_name && - cds_update.lrs_load_reporting_server_name == - cluster_state.update->lrs_load_reporting_server_name) { + *cluster_state.update == cds_update) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] CDS update identical to current, ignoring.", @@ -1157,7 +1178,7 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { recv_message_payload_ = nullptr; // Parse and validate the response. XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse( - response_slice, xds_client()->server_name_, + response_slice, ResourceNamesForRequest(XdsApi::kLdsTypeUrl), ResourceNamesForRequest(XdsApi::kRdsTypeUrl), ResourceNamesForRequest(XdsApi::kCdsTypeUrl), ResourceNamesForRequest(XdsApi::kEdsTypeUrl)); @@ -1187,9 +1208,9 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { seen_response_ = true; // Accept the ADS response according to the type_url. if (result.type_url == XdsApi::kLdsTypeUrl) { - AcceptLdsUpdate(std::move(result.lds_update)); + AcceptLdsUpdate(std::move(result.lds_update_map)); } else if (result.type_url == XdsApi::kRdsTypeUrl) { - AcceptRdsUpdate(std::move(result.rds_update)); + AcceptRdsUpdate(std::move(result.rds_update_map)); } else if (result.type_url == XdsApi::kCdsTypeUrl) { AcceptCdsUpdate(std::move(result.cds_update_map)); } else if (result.type_url == XdsApi::kEdsTypeUrl) { @@ -1272,7 +1293,7 @@ XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest( for (auto& p : it->second.subscribed_resources) { resource_names.insert(p.first); OrphanablePtr& state = p.second; - state->Start(Ref()); + state->Start(Ref(DEBUG_LOCATION, "ResourceState")); } } return resource_names; @@ -1746,19 +1767,16 @@ grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap, } // namespace XdsClient::XdsClient(std::shared_ptr work_serializer, - grpc_pollset_set* interested_parties, absl::string_view server_name, - std::unique_ptr watcher, const grpc_channel_args& channel_args, grpc_error** error) : InternallyRefCounted(&grpc_xds_client_trace), request_timeout_(GetRequestTimeout(channel_args)), work_serializer_(std::move(work_serializer)), - interested_parties_(interested_parties), + interested_parties_(grpc_pollset_set_create()), bootstrap_( XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)), api_(this, &grpc_xds_client_trace, bootstrap_.get()), - server_name_(server_name), - listener_watcher_(std::move(watcher)) { + server_name_(server_name) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this); } @@ -1781,15 +1799,13 @@ XdsClient::XdsClient(std::shared_ptr work_serializer, } chand_ = MakeOrphanable( Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel); - if (listener_watcher_ != nullptr) { - chand_->Subscribe(XdsApi::kLdsTypeUrl, std::string(server_name)); - } } XdsClient::~XdsClient() { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this); } + grpc_pollset_set_destroy(interested_parties_); } void XdsClient::Orphan() { @@ -1804,13 +1820,88 @@ void XdsClient::Orphan() { // possible for ADS calls to be in progress. Unreffing the loadbalancing // policies before those calls are done would lead to issues such as // https://github.com/grpc/grpc/issues/20928. - if (listener_watcher_ != nullptr) { + if (!listener_map_.empty()) { cluster_map_.clear(); endpoint_map_.clear(); } Unref(DEBUG_LOCATION, "XdsClient::Orphan()"); } +void XdsClient::WatchListenerData( + absl::string_view listener_name, + std::unique_ptr watcher) { + std::string listener_name_str = std::string(listener_name); + ListenerState& listener_state = listener_map_[listener_name_str]; + ListenerWatcherInterface* w = watcher.get(); + listener_state.watchers[w] = std::move(watcher); + // If we've already received an LDS update, notify the new watcher + // immediately. + if (listener_state.update.has_value()) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s", + this, listener_name_str.c_str()); + } + w->OnListenerChanged(*listener_state.update); + } + chand_->Subscribe(XdsApi::kLdsTypeUrl, listener_name_str); +} + +void XdsClient::CancelListenerDataWatch(absl::string_view listener_name, + ListenerWatcherInterface* watcher, + bool delay_unsubscription) { + if (shutting_down_) return; + std::string listener_name_str = std::string(listener_name); + ListenerState& listener_state = listener_map_[listener_name_str]; + auto it = listener_state.watchers.find(watcher); + if (it != listener_state.watchers.end()) { + listener_state.watchers.erase(it); + if (listener_state.watchers.empty()) { + listener_map_.erase(listener_name_str); + chand_->Unsubscribe(XdsApi::kLdsTypeUrl, listener_name_str, + delay_unsubscription); + } + } +} + +void XdsClient::WatchRouteConfigData( + absl::string_view route_config_name, + std::unique_ptr watcher) { + std::string route_config_name_str = std::string(route_config_name); + RouteConfigState& route_config_state = + route_config_map_[route_config_name_str]; + RouteConfigWatcherInterface* w = watcher.get(); + route_config_state.watchers[w] = std::move(watcher); + // If we've already received an RDS update, notify the new watcher + // immediately. + if (route_config_state.update.has_value()) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] returning cached route config data for %s", this, + route_config_name_str.c_str()); + } + w->OnRouteConfigChanged(*route_config_state.update); + } + chand_->Subscribe(XdsApi::kRdsTypeUrl, route_config_name_str); +} + +void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name, + RouteConfigWatcherInterface* watcher, + bool delay_unsubscription) { + if (shutting_down_) return; + std::string route_config_name_str = std::string(route_config_name); + RouteConfigState& route_config_state = + route_config_map_[route_config_name_str]; + auto it = route_config_state.watchers.find(watcher); + if (it != route_config_state.watchers.end()) { + route_config_state.watchers.erase(it); + if (route_config_state.watchers.empty()) { + route_config_map_.erase(route_config_name_str); + chand_->Unsubscribe(XdsApi::kRdsTypeUrl, route_config_name_str, + delay_unsubscription); + } + } +} + void XdsClient::WatchClusterData( absl::string_view cluster_name, std::unique_ptr watcher) { @@ -1818,7 +1909,7 @@ void XdsClient::WatchClusterData( ClusterState& cluster_state = cluster_map_[cluster_name_str]; ClusterWatcherInterface* w = watcher.get(); cluster_state.watchers[w] = std::move(watcher); - // If we've already received an CDS update, notify the new watcher + // If we've already received a CDS update, notify the new watcher // immediately. if (cluster_state.update.has_value()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { @@ -2048,8 +2139,17 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshot( } void XdsClient::NotifyOnError(grpc_error* error) { - if (listener_watcher_ != nullptr) { - listener_watcher_->OnError(GRPC_ERROR_REF(error)); + for (const auto& p : listener_map_) { + const ListenerState& listener_state = p.second; + for (const auto& p : listener_state.watchers) { + p.first->OnError(GRPC_ERROR_REF(error)); + } + } + for (const auto& p : route_config_map_) { + const RouteConfigState& route_config_state = p.second; + for (const auto& p : route_config_state.watchers) { + p.first->OnError(GRPC_ERROR_REF(error)); + } } for (const auto& p : cluster_map_) { const ClusterState& cluster_state = p.second; @@ -2093,8 +2193,8 @@ RefCountedPtr XdsClient::GetFromChannelArgs( const grpc_channel_args& args) { XdsClient* xds_client = grpc_channel_args_find_pointer(&args, GRPC_ARG_XDS_CLIENT); - if (xds_client != nullptr) return xds_client->Ref(); - return nullptr; + if (xds_client == nullptr) return nullptr; + return xds_client->Ref(DEBUG_LOCATION, "GetFromChannelArgs"); } grpc_channel_args* XdsClient::RemoveFromChannelArgs( diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index c9184348a8c..25408b70641 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -20,6 +20,7 @@ #include #include +#include #include "absl/strings/string_view.h" #include "absl/types/optional.h" @@ -45,7 +46,19 @@ class XdsClient : public InternallyRefCounted { public: virtual ~ListenerWatcherInterface() = default; - virtual void OnListenerChanged(std::vector routes) = 0; + virtual void OnListenerChanged(XdsApi::LdsUpdate listener) = 0; + + virtual void OnError(grpc_error* error) = 0; + + virtual void OnResourceDoesNotExist() = 0; + }; + + // RouteConfiguration data watcher interface. Implemented by callers. + class RouteConfigWatcherInterface { + public: + virtual ~RouteConfigWatcherInterface() = default; + + virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) = 0; virtual void OnError(grpc_error* error) = 0; @@ -78,14 +91,44 @@ class XdsClient : public InternallyRefCounted { // If *error is not GRPC_ERROR_NONE after construction, then there was // an error initializing the client. + // TODO(roth): Remove the server_name parameter as part of sharing the + // XdsClient instance between channels. XdsClient(std::shared_ptr work_serializer, - grpc_pollset_set* interested_parties, absl::string_view server_name, - std::unique_ptr watcher, + absl::string_view server_name, const grpc_channel_args& channel_args, grpc_error** error); ~XdsClient(); + grpc_pollset_set* interested_parties() const { return interested_parties_; } + void Orphan() override; + // Start and cancel listener data watch for a listener. + // The XdsClient takes ownership of the watcher, but the caller may + // keep a raw pointer to the watcher, which may be used only for + // cancellation. (Because the caller does not own the watcher, the + // pointer must not be used for any other purpose.) + // If the caller is going to start a new watch after cancelling the + // old one, it should set delay_unsubscription to true. + void WatchListenerData(absl::string_view listener_name, + std::unique_ptr watcher); + void CancelListenerDataWatch(absl::string_view listener_name, + ListenerWatcherInterface* watcher, + bool delay_unsubscription = false); + + // Start and cancel route config data watch for a listener. + // The XdsClient takes ownership of the watcher, but the caller may + // keep a raw pointer to the watcher, which may be used only for + // cancellation. (Because the caller does not own the watcher, the + // pointer must not be used for any other purpose.) + // If the caller is going to start a new watch after cancelling the + // old one, it should set delay_unsubscription to true. + void WatchRouteConfigData( + absl::string_view route_config_name, + std::unique_ptr watcher); + void CancelRouteConfigDataWatch(absl::string_view route_config_name, + RouteConfigWatcherInterface* watcher, + bool delay_unsubscription = false); + // Start and cancel cluster data watch for a cluster. // The XdsClient takes ownership of the watcher, but the caller may // keep a raw pointer to the watcher, which may be used only for @@ -200,6 +243,22 @@ class XdsClient : public InternallyRefCounted { OrphanablePtr> lrs_calld_; }; + struct ListenerState { + std::map> + watchers; + // The latest data seen from LDS. + absl::optional update; + }; + + struct RouteConfigState { + std::map> + watchers; + // The latest data seen from RDS. + absl::optional update; + }; + struct ClusterState { std::map> watchers; @@ -250,19 +309,26 @@ class XdsClient : public InternallyRefCounted { std::unique_ptr bootstrap_; XdsApi api_; + // TODO(roth): In order to share the XdsClient instance between + // channels and servers, we will need to remove this field. In order + // to do that, we'll need to figure out if we can stop sending the + // server name as part of the node metadata in the LRS request. const std::string server_name_; - std::unique_ptr listener_watcher_; // The channel for communicating with the xds server. OrphanablePtr chand_; - absl::optional lds_result_; - absl::optional rds_result_; - + // One entry for each watched LDS resource. + std::map listener_map_; + // One entry for each watched RDS resource. + std::map + route_config_map_; // One entry for each watched CDS resource. std::map cluster_map_; // One entry for each watched EDS resource. std::map endpoint_map_; + + // Load report data. std::map< std::pair, LoadReportState>