diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index 5e7d34823fb..51e250a3cad 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -295,7 +295,8 @@ void CdsLb::UpdateLocked(UpdateArgs args) { old_config->cluster().c_str()); } xds_client_->CancelClusterDataWatch( - StringView(old_config->cluster().c_str()), cluster_watcher_); + StringView(old_config->cluster().c_str()), cluster_watcher_, + /*delay_unsubscription=*/true); } if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this, diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 5277f22a32a..044f7e75122 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -792,7 +792,8 @@ void XdsLb::UpdateLocked(UpdateArgs args) { old_eds_service_name); } xds_client()->CancelEndpointDataWatch(StringView(old_eds_service_name), - endpoint_watcher_); + endpoint_watcher_, + /*delay_unsubscription=*/true); } if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, "[xdslb %p] starting watch for %s", this, @@ -1098,7 +1099,9 @@ void XdsLb::LocalityMap::UpdateXdsPickerLocked() { const auto& locality_name = p.first; Locality* locality = p.second.get(); // Skip the localities that are not in the latest locality map update. - if (!locality_map_update()->Contains(locality_name)) continue; + const auto* locality_update = locality_map_update(); + if (locality_update == nullptr) continue; + if (!locality_update->Contains(locality_name)) continue; if (locality->connectivity_state() != GRPC_CHANNEL_READY) continue; end += locality->weight(); picker_list.push_back( diff --git a/src/core/ext/filters/client_channel/xds/xds_api.cc b/src/core/ext/filters/client_channel/xds/xds_api.cc index 9dc6d39ce66..9f0b17f5865 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.cc +++ b/src/core/ext/filters/client_channel/xds/xds_api.cc @@ -1045,15 +1045,12 @@ grpc_error* RouteConfigParse( grpc_error* LdsResponseParse(XdsClient* client, TraceFlag* tracer, const envoy_api_v2_DiscoveryResponse* response, const std::string& expected_server_name, - XdsApi::LdsUpdate* lds_update, upb_arena* arena) { + absl::optional* lds_update, + upb_arena* arena) { // Get the resources from the response. size_t size; const google_protobuf_Any* const* resources = envoy_api_v2_DiscoveryResponse_resources(response, &size); - if (size < 1) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "LDS response contains 0 resource."); - } for (size_t i = 0; i < size; ++i) { // Check the type_url of the resource. const upb_strview type_url = google_protobuf_Any_type_url(resources[i]); @@ -1096,11 +1093,8 @@ grpc_error* LdsResponseParse(XdsClient* client, TraceFlag* tracer, grpc_error* error = RouteConfigParse(client, tracer, route_config, expected_server_name, &rds_update); if (error != GRPC_ERROR_NONE) return error; - lds_update->rds_update.emplace(std::move(rds_update)); - const upb_strview route_config_name = - envoy_api_v2_RouteConfiguration_name(route_config); - lds_update->route_config_name = - std::string(route_config_name.data, route_config_name.size); + lds_update->emplace(); + (*lds_update)->rds_update.emplace(std::move(rds_update)); return GRPC_ERROR_NONE; } // Validate that RDS must be used to get the route_config dynamically. @@ -1116,27 +1110,24 @@ grpc_error* LdsResponseParse(XdsClient* client, TraceFlag* tracer, const upb_strview route_config_name = envoy_config_filter_network_http_connection_manager_v2_Rds_route_config_name( rds); - lds_update->route_config_name = + lds_update->emplace(); + (*lds_update)->route_config_name = std::string(route_config_name.data, route_config_name.size); return GRPC_ERROR_NONE; } - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "No listener found for expected server name."); + return GRPC_ERROR_NONE; } grpc_error* RdsResponseParse(XdsClient* client, TraceFlag* tracer, const envoy_api_v2_DiscoveryResponse* response, const std::string& expected_server_name, const std::string& expected_route_config_name, - XdsApi::RdsUpdate* rds_update, upb_arena* arena) { + absl::optional* rds_update, + upb_arena* arena) { // Get the resources from the response. size_t size; const google_protobuf_Any* const* resources = envoy_api_v2_DiscoveryResponse_resources(response, &size); - if (size < 1) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "RDS response contains 0 resource."); - } for (size_t i = 0; i < size; ++i) { // Check the type_url of the resource. const upb_strview type_url = google_protobuf_Any_type_url(resources[i]); @@ -1162,25 +1153,21 @@ grpc_error* RdsResponseParse(XdsClient* client, TraceFlag* tracer, grpc_error* error = RouteConfigParse( client, tracer, route_config, expected_server_name, &local_rds_update); if (error != GRPC_ERROR_NONE) return error; - *rds_update = std::move(local_rds_update); + rds_update->emplace(std::move(local_rds_update)); return GRPC_ERROR_NONE; } - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "No route config found for expected name."); + return GRPC_ERROR_NONE; } grpc_error* CdsResponseParse(XdsClient* client, TraceFlag* tracer, const envoy_api_v2_DiscoveryResponse* response, + const std::set& expected_cluster_names, XdsApi::CdsUpdateMap* cds_update_map, upb_arena* arena) { // Get the resources from the response. size_t size; const google_protobuf_Any* const* resources = envoy_api_v2_DiscoveryResponse_resources(response, &size); - if (size < 1) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "CDS response contains 0 resource."); - } // Parse all the resources in the CDS response. for (size_t i = 0; i < size; ++i) { XdsApi::CdsUpdate cds_update; @@ -1197,6 +1184,13 @@ grpc_error* CdsResponseParse(XdsClient* client, TraceFlag* tracer, return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode cluster."); } MaybeLogCluster(client, tracer, cluster); + // Ignore unexpected cluster names. + upb_strview cluster_name = envoy_api_v2_Cluster_name(cluster); + StringView cluster_name_strview(cluster_name.data, cluster_name.size); + if (expected_cluster_names.find(cluster_name_strview) == + expected_cluster_names.end()) { + continue; + } // Check the cluster_discovery_type. if (!envoy_api_v2_Cluster_has_type(cluster)) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType not found."); @@ -1235,7 +1229,6 @@ grpc_error* CdsResponseParse(XdsClient* client, TraceFlag* tracer, } cds_update.lrs_load_reporting_server_name.emplace(""); } - upb_strview cluster_name = envoy_api_v2_Cluster_name(cluster); cds_update_map->emplace(std::string(cluster_name.data, cluster_name.size), std::move(cds_update)); } @@ -1363,10 +1356,6 @@ grpc_error* EdsResponsedParse( size_t size; const google_protobuf_Any* const* resources = envoy_api_v2_DiscoveryResponse_resources(response, &size); - if (size < 1) { - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "EDS response contains 0 resource."); - } for (size_t i = 0; i < size; ++i) { XdsApi::EdsUpdate eds_update; // Check the type_url of the resource. @@ -1442,8 +1431,10 @@ grpc_error* EdsResponsedParse( grpc_error* XdsApi::ParseAdsResponse( const grpc_slice& encoded_response, const std::string& expected_server_name, const std::string& expected_route_config_name, + const std::set& expected_cluster_names, const std::set& expected_eds_service_names, - LdsUpdate* lds_update, RdsUpdate* rds_update, CdsUpdateMap* cds_update_map, + absl::optional* lds_update, + absl::optional* rds_update, CdsUpdateMap* cds_update_map, EdsUpdateMap* eds_update_map, std::string* version, std::string* nonce, std::string* type_url) { upb::Arena arena; @@ -1477,8 +1468,8 @@ grpc_error* XdsApi::ParseAdsResponse( expected_route_config_name, rds_update, arena.ptr()); } else if (*type_url == kCdsTypeUrl) { - return CdsResponseParse(client_, tracer_, response, cds_update_map, - arena.ptr()); + return CdsResponseParse(client_, tracer_, response, expected_cluster_names, + cds_update_map, arena.ptr()); } else if (*type_url == kEdsTypeUrl) { return EdsResponsedParse(client_, tracer_, response, expected_eds_service_names, eds_update_map, diff --git a/src/core/ext/filters/client_channel/xds/xds_api.h b/src/core/ext/filters/client_channel/xds/xds_api.h index 9e2e0358e3c..b428aa4bbc7 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.h +++ b/src/core/ext/filters/client_channel/xds/xds_api.h @@ -232,10 +232,12 @@ class XdsApi { const grpc_slice& encoded_response, const std::string& expected_server_name, const std::string& expected_route_config_name, + const std::set& expected_cluster_names, const std::set& expected_eds_service_names, - LdsUpdate* lds_update, RdsUpdate* rds_update, - CdsUpdateMap* cds_update_map, EdsUpdateMap* eds_update_map, - std::string* version, std::string* nonce, std::string* type_url); + absl::optional* lds_update, + absl::optional* rds_update, CdsUpdateMap* cds_update_map, + EdsUpdateMap* eds_update_map, std::string* version, std::string* nonce, + std::string* type_url); // Creates an LRS request querying \a server_name. grpc_slice CreateLrsInitialRequest(const std::string& server_name); diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index 58a80849236..65e1e5c4308 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -128,7 +128,8 @@ class XdsClient::ChannelState::AdsCallState bool seen_response() const { return seen_response_; } void Subscribe(const std::string& type_url, const std::string& name); - void Unsubscribe(const std::string& type_url, const std::string& name); + void Unsubscribe(const std::string& type_url, const std::string& name, + bool delay_unsubscription); bool HasSubscribedResources() const; @@ -240,8 +241,8 @@ class XdsClient::ChannelState::AdsCallState void SendMessageLocked(const std::string& type_url); - void AcceptLdsUpdate(XdsApi::LdsUpdate lds_update); - void AcceptRdsUpdate(XdsApi::RdsUpdate rds_update); + void AcceptLdsUpdate(absl::optional lds_update); + void AcceptRdsUpdate(absl::optional rds_update); void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map); void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map); @@ -557,9 +558,10 @@ void XdsClient::ChannelState::Subscribe(const std::string& type_url, } void XdsClient::ChannelState::Unsubscribe(const std::string& type_url, - const std::string& name) { + const std::string& name, + bool delay_unsubscription) { if (ads_calld_ != nullptr) { - ads_calld_->calld()->Unsubscribe(type_url, name); + ads_calld_->calld()->Unsubscribe(type_url, name, delay_unsubscription); if (!ads_calld_->calld()->HasSubscribedResources()) ads_calld_.reset(); } } @@ -862,9 +864,10 @@ void XdsClient::ChannelState::AdsCallState::Subscribe( } void XdsClient::ChannelState::AdsCallState::Unsubscribe( - const std::string& type_url, const std::string& name) { + const std::string& type_url, const std::string& name, + bool delay_unsubscription) { state_map_[type_url].subscribed_resources.erase(name); - SendMessageLocked(type_url); + if (!delay_unsubscription) SendMessageLocked(type_url); } bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const { @@ -875,24 +878,32 @@ bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const { } void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( - XdsApi::LdsUpdate lds_update) { + absl::optional lds_update) { + if (!lds_update.has_value()) { + gpr_log(GPR_INFO, + "[xds_client %p] LDS update does not include requested resource", + xds_client()); + xds_client()->service_config_watcher_->OnError( + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "LDS update does not include requested resource")); + return; + } const std::string& cluster_name = - lds_update.rds_update.has_value() - ? lds_update.rds_update.value().cluster_name + lds_update->rds_update.has_value() + ? lds_update->rds_update.value().cluster_name : ""; if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, - "[xds_client %p] LDS update received: " - "route_config_name=%s, " + "[xds_client %p] LDS update received: route_config_name=%s, " "cluster_name=%s (empty if RDS is needed to obtain it)", - xds_client(), lds_update.route_config_name.c_str(), + xds_client(), lds_update->route_config_name.c_str(), cluster_name.c_str()); } auto& lds_state = state_map_[XdsApi::kLdsTypeUrl]; auto& state = lds_state.subscribed_resources[xds_client()->server_name_]; if (state != nullptr) state->Finish(); // Ignore identical update. - if (xds_client()->route_config_name_ == lds_update.route_config_name && + if (xds_client()->route_config_name_ == lds_update->route_config_name && xds_client()->cluster_name_ == cluster_name) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, @@ -901,12 +912,17 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( } return; } - xds_client()->route_config_name_ = std::move(lds_update.route_config_name); - if (lds_update.rds_update.has_value()) { + if (!xds_client()->route_config_name_.empty()) { + Unsubscribe( + XdsApi::kRdsTypeUrl, xds_client()->route_config_name_, + /*delay_unsubscription=*/!lds_update->route_config_name.empty()); + } + xds_client()->route_config_name_ = std::move(lds_update->route_config_name); + if (lds_update->rds_update.has_value()) { // If cluster_name was found inlined in LDS response, notify the watcher // immediately. xds_client()->cluster_name_ = - std::move(lds_update.rds_update.value().cluster_name); + std::move(lds_update->rds_update.value().cluster_name); RefCountedPtr service_config; grpc_error* error = xds_client()->CreateServiceConfig( xds_client()->cluster_name_, &service_config); @@ -923,19 +939,26 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( } void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate( - XdsApi::RdsUpdate rds_update) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + absl::optional rds_update) { + if (!rds_update.has_value()) { gpr_log(GPR_INFO, - "[xds_client %p] RDS update received: " - "cluster_name=%s", - xds_client(), rds_update.cluster_name.c_str()); + "[xds_client %p] RDS update does not include requested resource", + xds_client()); + xds_client()->service_config_watcher_->OnError( + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "RDS update does not include requested resource")); + return; + } + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, "[xds_client %p] RDS update received: cluster_name=%s", + xds_client(), rds_update->cluster_name.c_str()); } auto& rds_state = state_map_[XdsApi::kRdsTypeUrl]; auto& state = rds_state.subscribed_resources[xds_client()->route_config_name_]; if (state != nullptr) state->Finish(); // Ignore identical update. - if (xds_client()->cluster_name_ == rds_update.cluster_name) { + if (xds_client()->cluster_name_ == rds_update->cluster_name) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] RDS update identical to current, ignoring.", @@ -943,7 +966,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate( } return; } - xds_client()->cluster_name_ = std::move(rds_update.cluster_name); + xds_client()->cluster_name_ = std::move(rds_update->cluster_name); // Notify the watcher. RefCountedPtr service_config; grpc_error* error = xds_client()->CreateServiceConfig( @@ -959,6 +982,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate( void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate( XdsApi::CdsUpdateMap cds_update_map) { auto& cds_state = state_map_[XdsApi::kCdsTypeUrl]; + std::set eds_resource_names_seen; for (auto& p : cds_update_map) { const char* cluster_name = p.first.c_str(); XdsApi::CdsUpdate& cds_update = p.second; @@ -967,21 +991,22 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate( if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] CDS update (cluster=%s) received: " - "eds_service_name=%s, " - "lrs_load_reporting_server_name=%s", + "eds_service_name=%s, lrs_load_reporting_server_name=%s", xds_client(), cluster_name, cds_update.eds_service_name.c_str(), cds_update.lrs_load_reporting_server_name.has_value() ? cds_update.lrs_load_reporting_server_name.value().c_str() : "(N/A)"); } - ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name]; + // Record the EDS resource names seen. + eds_resource_names_seen.insert(cds_update.eds_service_name.empty() + ? cluster_name + : cds_update.eds_service_name); // Ignore identical update. + ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name]; if (cluster_state.update.has_value() && - cds_update.eds_service_name == - cluster_state.update.value().eds_service_name && - cds_update.lrs_load_reporting_server_name.value() == - cluster_state.update.value() - .lrs_load_reporting_server_name.value()) { + cds_update.eds_service_name == cluster_state.update->eds_service_name && + cds_update.lrs_load_reporting_server_name == + cluster_state.update->lrs_load_reporting_server_name) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] CDS update identical to current, ignoring.", @@ -990,12 +1015,41 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate( continue; } // Update the cluster state. - cluster_state.update.emplace(std::move(cds_update)); + cluster_state.update = std::move(cds_update); // Notify all watchers. for (const auto& p : cluster_state.watchers) { p.first->OnClusterChanged(cluster_state.update.value()); } } + // For any subscribed resource that is not present in the update, + // remove it from the cache and notify watchers of the error. + for (const auto& p : cds_state.subscribed_resources) { + const std::string& cluster_name = p.first; + if (cds_update_map.find(cluster_name) == cds_update_map.end()) { + ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name]; + cluster_state.update.reset(); + for (const auto& p : cluster_state.watchers) { + p.first->OnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Cluster not present in CDS update")); + } + } + } + // Also remove any EDS resources that are no longer referred to by any CDS + // resources. + auto& eds_state = state_map_[XdsApi::kEdsTypeUrl]; + for (const auto& p : eds_state.subscribed_resources) { + const std::string& eds_resource_name = p.first; + if (eds_resource_names_seen.find(eds_resource_name) == + eds_resource_names_seen.end()) { + EndpointState& endpoint_state = + xds_client()->endpoint_map_[eds_resource_name]; + endpoint_state.update.reset(); + for (const auto& p : endpoint_state.watchers) { + p.first->OnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "ClusterLoadAssignment resource removed due to CDS update")); + } + } + } } void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate( @@ -1058,25 +1112,27 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate( EndpointState& endpoint_state = xds_client()->endpoint_map_[eds_service_name]; // Ignore identical update. - const XdsApi::EdsUpdate& prev_update = endpoint_state.update; - const bool priority_list_changed = - prev_update.priority_list_update != eds_update.priority_list_update; - const bool drop_config_changed = - prev_update.drop_config == nullptr || - *prev_update.drop_config != *eds_update.drop_config; - if (!priority_list_changed && !drop_config_changed) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - gpr_log(GPR_INFO, - "[xds_client %p] EDS update identical to current, ignoring.", - xds_client()); + if (endpoint_state.update.has_value()) { + const XdsApi::EdsUpdate& prev_update = endpoint_state.update.value(); + const bool priority_list_changed = + prev_update.priority_list_update != eds_update.priority_list_update; + const bool drop_config_changed = + prev_update.drop_config == nullptr || + *prev_update.drop_config != *eds_update.drop_config; + if (!priority_list_changed && !drop_config_changed) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { + gpr_log(GPR_INFO, + "[xds_client %p] EDS update identical to current, ignoring.", + xds_client()); + } + continue; } - continue; } // Update the cluster state. endpoint_state.update = std::move(eds_update); // Notify all watchers. for (const auto& p : endpoint_state.watchers) { - p.first->OnEndpointChanged(endpoint_state.update); + p.first->OnEndpointChanged(endpoint_state.update.value()); } } } @@ -1150,8 +1206,8 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( // mode. We will also need to cancel the timer when we receive a serverlist // from the balancer. // Parse the response. - XdsApi::LdsUpdate lds_update; - XdsApi::RdsUpdate rds_update; + absl::optional lds_update; + absl::optional rds_update; XdsApi::CdsUpdateMap cds_update_map; XdsApi::EdsUpdateMap eds_update_map; std::string version; @@ -1160,6 +1216,7 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( // Note that ParseAdsResponse() also validates the response. grpc_error* parse_error = xds_client->api_.ParseAdsResponse( response_slice, xds_client->server_name_, xds_client->route_config_name_, + ads_calld->ClusterNamesForRequest(), ads_calld->EdsServiceNamesForRequest(), &lds_update, &rds_update, &cds_update_map, &eds_update_map, &version, &nonce, &type_url); grpc_slice_unref_internal(response_slice); @@ -1822,7 +1879,8 @@ void XdsClient::WatchClusterData( } void XdsClient::CancelClusterDataWatch(StringView cluster_name, - ClusterWatcherInterface* watcher) { + ClusterWatcherInterface* watcher, + bool delay_unsubscription) { if (shutting_down_) return; std::string cluster_name_str = std::string(cluster_name); ClusterState& cluster_state = cluster_map_[cluster_name_str]; @@ -1831,7 +1889,8 @@ void XdsClient::CancelClusterDataWatch(StringView cluster_name, cluster_state.watchers.erase(it); if (cluster_state.watchers.empty()) { cluster_map_.erase(cluster_name_str); - chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str); + chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str, + delay_unsubscription); } } } @@ -1845,18 +1904,19 @@ void XdsClient::WatchEndpointData( endpoint_state.watchers[w] = std::move(watcher); // If we've already received an EDS update, notify the new watcher // immediately. - if (!endpoint_state.update.priority_list_update.empty()) { + if (endpoint_state.update.has_value()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s", this, StringViewToCString(eds_service_name).get()); } - w->OnEndpointChanged(endpoint_state.update); + w->OnEndpointChanged(endpoint_state.update.value()); } chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str); } void XdsClient::CancelEndpointDataWatch(StringView eds_service_name, - EndpointWatcherInterface* watcher) { + EndpointWatcherInterface* watcher, + bool delay_unsubscription) { if (shutting_down_) return; std::string eds_service_name_str = std::string(eds_service_name); EndpointState& endpoint_state = endpoint_map_[eds_service_name_str]; @@ -1865,7 +1925,8 @@ void XdsClient::CancelEndpointDataWatch(StringView eds_service_name, endpoint_state.watchers.erase(it); if (endpoint_state.watchers.empty()) { endpoint_map_.erase(eds_service_name_str); - chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str); + chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str, + delay_unsubscription); } } } diff --git a/src/core/ext/filters/client_channel/xds/xds_client.h b/src/core/ext/filters/client_channel/xds/xds_client.h index 02e80680399..228b9a21b47 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.h +++ b/src/core/ext/filters/client_channel/xds/xds_client.h @@ -86,20 +86,26 @@ class XdsClient : public InternallyRefCounted { // keep a raw pointer to the watcher, which may be used only for // cancellation. (Because the caller does not own the watcher, the // pointer must not be used for any other purpose.) + // If the caller is going to start a new watch after cancelling the + // old one, it should set delay_unsubscription to true. void WatchClusterData(StringView cluster_name, std::unique_ptr watcher); void CancelClusterDataWatch(StringView cluster_name, - ClusterWatcherInterface* watcher); + ClusterWatcherInterface* watcher, + bool delay_unsubscription = false); // Start and cancel endpoint data watch for a cluster. // The XdsClient takes ownership of the watcher, but the caller may // keep a raw pointer to the watcher, which may be used only for // cancellation. (Because the caller does not own the watcher, the // pointer must not be used for any other purpose.) + // If the caller is going to start a new watch after cancelling the + // old one, it should set delay_unsubscription to true. void WatchEndpointData(StringView eds_service_name, std::unique_ptr watcher); void CancelEndpointDataWatch(StringView eds_service_name, - EndpointWatcherInterface* watcher); + EndpointWatcherInterface* watcher, + bool delay_unsubscription = false); // Adds and removes drop stats for cluster_name and eds_service_name. RefCountedPtr AddClusterDropStats( @@ -167,7 +173,8 @@ class XdsClient : public InternallyRefCounted { void CancelConnectivityWatchLocked(); void Subscribe(const std::string& type_url, const std::string& name); - void Unsubscribe(const std::string& type_url, const std::string& name); + void Unsubscribe(const std::string& type_url, const std::string& name, + bool delay_unsubscription); private: class StateWatcher; @@ -189,7 +196,7 @@ class XdsClient : public InternallyRefCounted { std::map> watchers; // The latest data seen from CDS. - Optional update; + absl::optional update; }; struct EndpointState { @@ -197,7 +204,7 @@ class XdsClient : public InternallyRefCounted { std::unique_ptr> watchers; // The latest data seen from EDS. - XdsApi::EdsUpdate update; + absl::optional update; }; struct LoadReportState { @@ -241,9 +248,9 @@ class XdsClient : public InternallyRefCounted { std::string route_config_name_; std::string cluster_name_; - // All the received clusters are cached, no matter they are watched or not. + // One entry for each watched CDS resource. std::map cluster_map_; - // Only the watched EDS service names are stored. + // One entry for each watched EDS resource. std::map endpoint_map_; std::map< std::pair, diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index e8430e3b01c..bf173df0af0 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -351,7 +351,6 @@ class ClientStats { std::map dropped_requests_; }; -// TODO(roth): Change this service to a real fake. class AdsServiceImpl : public AggregatedDiscoveryService::Service, public std::enable_shared_from_this { public: @@ -496,6 +495,7 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, SubscriptionState* subscription_state, ResourceState* resource_state, UpdateQueue* update_queue) { + // The update_queue will be null if we were not previously subscribed. if (subscription_state->update_queue != nullptr) return; subscription_state->update_queue = update_queue; resource_state->subscriptions.emplace(subscription_state); @@ -594,7 +594,7 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, // Main loop to look for requests and updates. while (true) { // Look for new requests and and decide what to handle. - DiscoveryResponse response; + absl::optional response; // Boolean to keep track if the loop received any work to do: a request // or an update; regardless whether a response was actually sent out. bool did_work = false; @@ -647,8 +647,9 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, this, request.type_url().c_str(), resource_name.c_str(), resource_state.version); resources_added_to_response.emplace(resource_name); + if (!response.has_value()) response.emplace(); if (resource_state.resource.has_value()) { - response.add_resources()->CopyFrom( + response->add_resources()->CopyFrom( resource_state.resource.value()); } } @@ -664,17 +665,17 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, request.type_url(), ++resource_type_version[request.type_url()], subscription_name_map, resources_added_to_response, - &response); + &response.value()); } } } } - if (!response.resources().empty()) { + if (response.has_value()) { gpr_log(GPR_INFO, "ADS[%p]: Sending response: %s", this, - response.DebugString().c_str()); - stream->Write(response); + response->DebugString().c_str()); + stream->Write(response.value()); } - response.Clear(); + response.reset(); // Look for updates and decide what to handle. { grpc_core::MutexLock lock(&ads_mu_); @@ -700,21 +701,22 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, "ADS[%p]: Sending update for type=%s name=%s version=%d", this, resource_type.c_str(), resource_name.c_str(), resource_state.version); + response.emplace(); if (resource_state.resource.has_value()) { - response.add_resources()->CopyFrom( + response->add_resources()->CopyFrom( resource_state.resource.value()); - CompleteBuildingDiscoveryResponse( - resource_type, ++resource_type_version[resource_type], - subscription_name_map, {resource_name}, &response); } + CompleteBuildingDiscoveryResponse( + resource_type, ++resource_type_version[resource_type], + subscription_name_map, {resource_name}, &response.value()); } } } } - if (!response.resources().empty()) { + if (response.has_value()) { gpr_log(GPR_INFO, "ADS[%p]: Sending update response: %s", this, - response.DebugString().c_str()); - stream->Write(response); + response->DebugString().c_str()); + stream->Write(response.value()); } // If we didn't find anything to do, delay before the next loop // iteration; otherwise, check whether we should exit and then @@ -765,6 +767,18 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, resource_types_to_ignore_.emplace(type_url); } + void UnsetResource(const std::string& type_url, const std::string& name) { + grpc_core::MutexLock lock(&ads_mu_); + ResourceState& state = resource_map_[type_url][name]; + ++state.version; + state.resource.reset(); + gpr_log(GPR_INFO, "ADS[%p]: Unsetting %s resource %s to version %u", this, + type_url.c_str(), name.c_str(), state.version); + for (SubscriptionState* subscription : state.subscriptions) { + subscription->update_queue->emplace_back(type_url, name); + } + } + void SetResource(google::protobuf::Any resource, const std::string& type_url, const std::string& name) { grpc_core::MutexLock lock(&ads_mu_); @@ -1639,6 +1653,68 @@ TEST_P(BasicTest, BackendsRestart) { true /* wait_for_ready */); } +using XdsResolverOnlyTest = BasicTest; + +// Tests switching over from one cluster to another. +TEST_P(XdsResolverOnlyTest, ChangeClusters) { + const char* kNewClusterName = "new_cluster_name"; + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", GetBackendPorts(0, 2)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName); + // We need to wait for all backends to come online. + WaitForAllBackends(0, 2); + // Populate new EDS resource. + AdsServiceImpl::EdsResourceArgs args2({ + {"locality0", GetBackendPorts(2, 4)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args2, kNewClusterName), + kNewClusterName); + // Populate new CDS resource. + Cluster new_cluster = balancers_[0]->ads_service()->default_cluster(); + new_cluster.set_name(kNewClusterName); + balancers_[0]->ads_service()->SetCdsResource(new_cluster, kNewClusterName); + // Change RDS resource to point to new cluster. + RouteConfiguration new_route_config = + balancers_[0]->ads_service()->default_route_config(); + new_route_config.mutable_virtual_hosts(0) + ->mutable_routes(0) + ->mutable_route() + ->set_cluster(kNewClusterName); + Listener listener = + balancers_[0]->ads_service()->BuildListener(new_route_config); + balancers_[0]->ads_service()->SetLdsResource(listener, kDefaultResourceName); + // Wait for all new backends to be used. + std::tuple counts = WaitForAllBackends(2, 4); + // Make sure no RPCs failed in the transition. + EXPECT_EQ(0, std::get<1>(counts)); +} + +// Tests that things keep workng if the cluster resource disappears. +TEST_P(XdsResolverOnlyTest, ClusterRemoved) { + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", GetBackendPorts()}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName); + // We need to wait for all backends to come online. + WaitForAllBackends(); + // Unset CDS resource. + balancers_[0]->ads_service()->UnsetResource(kCdsTypeUrl, + kDefaultResourceName); + // Make sure RPCs are still succeeding. + CheckRpcSendOk(100 * num_backends_); + // Make sure we ACK'ed the update. + EXPECT_EQ(balancers_[0]->ads_service()->cds_response_state(), + AdsServiceImpl::ACKED); +} + using SecureNamingTest = BasicTest; // Tests that secure naming check passes if target name is expected. @@ -3307,6 +3383,12 @@ INSTANTIATE_TEST_SUITE_P(XdsTest, EdsTest, TestType(true, true)), &TestTypeName); +// XdsResolverOnlyTest depends on XdsResolver. +INSTANTIATE_TEST_SUITE_P(XdsTest, XdsResolverOnlyTest, + ::testing::Values(TestType(true, false), + TestType(true, true)), + &TestTypeName); + INSTANTIATE_TEST_SUITE_P(XdsTest, LocalityMapTest, ::testing::Values(TestType(false, true), TestType(false, false),