From d812d9d557ee51cffa4a251a1a0feb23354f5a44 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 17 Mar 2020 15:16:49 -0700 Subject: [PATCH] fix duplicate LDS update detection --- .../client_channel/lb_policy/xds/xds.cc | 10 +-- .../ext/filters/client_channel/xds/xds_api.h | 18 +++++- .../filters/client_channel/xds/xds_client.cc | 61 ++++++++++--------- .../filters/client_channel/xds/xds_client.h | 8 ++- 4 files changed, 59 insertions(+), 38 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index a9205b5f4e0..0c390c2a1fe 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -25,6 +25,8 @@ #include #include +#include "absl/types/optional.h" + #include #include #include @@ -80,7 +82,7 @@ class XdsConfig : public LoadBalancingPolicy::Config { XdsConfig(RefCountedPtr child_policy, RefCountedPtr fallback_policy, std::string eds_service_name, - Optional lrs_load_reporting_server_name) + absl::optional lrs_load_reporting_server_name) : child_policy_(std::move(child_policy)), fallback_policy_(std::move(fallback_policy)), eds_service_name_(std::move(eds_service_name)), @@ -101,7 +103,7 @@ class XdsConfig : public LoadBalancingPolicy::Config { return eds_service_name_.empty() ? nullptr : eds_service_name_.c_str(); }; - const Optional& lrs_load_reporting_server_name() const { + const absl::optional& lrs_load_reporting_server_name() const { return lrs_load_reporting_server_name_; }; @@ -109,7 +111,7 @@ class XdsConfig : public LoadBalancingPolicy::Config { RefCountedPtr child_policy_; RefCountedPtr fallback_policy_; std::string eds_service_name_; - Optional lrs_load_reporting_server_name_; + absl::optional lrs_load_reporting_server_name_; }; class XdsLb : public LoadBalancingPolicy { @@ -1656,7 +1658,7 @@ class XdsFactory : public LoadBalancingPolicyFactory { } } if (error_list.empty()) { - Optional optional_lrs_load_reporting_server_name; + absl::optional optional_lrs_load_reporting_server_name; if (lrs_load_reporting_server_name != nullptr) { optional_lrs_load_reporting_server_name.emplace( std::string(lrs_load_reporting_server_name)); diff --git a/src/core/ext/filters/client_channel/xds/xds_api.h b/src/core/ext/filters/client_channel/xds/xds_api.h index b428aa4bbc7..28cae1ae351 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.h +++ b/src/core/ext/filters/client_channel/xds/xds_api.h @@ -25,12 +25,13 @@ #include +#include "absl/types/optional.h" + #include #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/xds/xds_bootstrap.h" #include "src/core/ext/filters/client_channel/xds/xds_client_stats.h" -#include "src/core/lib/gprpp/optional.h" namespace grpc_core { @@ -46,14 +47,25 @@ class XdsApi { struct RdsUpdate { // The name to use in the CDS request. std::string cluster_name; + + bool operator==(const RdsUpdate& other) const { + return cluster_name == other.cluster_name; + } }; + // TODO(roth): When we can use absl::variant<>, consider using that + // here, to enforce the fact that only one of the two fields can be set. struct LdsUpdate { // The name to use in the RDS request. std::string route_config_name; // The name to use in the CDS request. Present if the LDS response has it // inlined. - Optional rds_update; + absl::optional rds_update; + + bool operator==(const LdsUpdate& other) const { + return route_config_name == other.route_config_name && + rds_update == other.rds_update; + } }; using LdsUpdateMap = std::map; @@ -68,7 +80,7 @@ class XdsApi { // If not set, load reporting will be disabled. // If set to the empty string, will use the same server we obtained the CDS // data from. - Optional lrs_load_reporting_server_name; + absl::optional lrs_load_reporting_server_name; }; using CdsUpdateMap = std::map; diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index 65e1e5c4308..3357e308ad5 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -801,11 +801,12 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked( GRPC_ERROR_REF(state.error), !sent_initial_message_); state.subscribed_resources[xds_client()->server_name_]->Start(Ref()); } else if (type_url == XdsApi::kRdsTypeUrl) { - resource_names.insert(xds_client()->route_config_name_); + resource_names.insert(xds_client()->lds_result_->route_config_name); request_payload_slice = xds_client()->api_.CreateRdsRequest( - xds_client()->route_config_name_, state.version, state.nonce, - GRPC_ERROR_REF(state.error), !sent_initial_message_); - state.subscribed_resources[xds_client()->route_config_name_]->Start(Ref()); + xds_client()->lds_result_->route_config_name, state.version, + state.nonce, GRPC_ERROR_REF(state.error), !sent_initial_message_); + state.subscribed_resources[xds_client()->lds_result_->route_config_name] + ->Start(Ref()); } else if (type_url == XdsApi::kCdsTypeUrl) { resource_names = ClusterNamesForRequest(); request_payload_slice = xds_client()->api_.CreateCdsRequest( @@ -888,23 +889,23 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( "LDS update does not include requested resource")); return; } - const std::string& cluster_name = - lds_update->rds_update.has_value() - ? lds_update->rds_update.value().cluster_name - : ""; if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] LDS update received: route_config_name=%s, " - "cluster_name=%s (empty if RDS is needed to obtain it)", - xds_client(), lds_update->route_config_name.c_str(), - cluster_name.c_str()); + "cluster_name=%s", + xds_client(), + (lds_update->route_config_name.empty() + ? lds_update->route_config_name.c_str() + : ""), + (lds_update->rds_update.has_value() + ? lds_update->rds_update->cluster_name.c_str() + : "")); } auto& lds_state = state_map_[XdsApi::kLdsTypeUrl]; auto& state = lds_state.subscribed_resources[xds_client()->server_name_]; if (state != nullptr) state->Finish(); // Ignore identical update. - if (xds_client()->route_config_name_ == lds_update->route_config_name && - xds_client()->cluster_name_ == cluster_name) { + if (xds_client()->lds_result_ == lds_update) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] LDS update identical to current, ignoring.", @@ -912,20 +913,19 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( } return; } - if (!xds_client()->route_config_name_.empty()) { + if (xds_client()->lds_result_.has_value() && + !xds_client()->lds_result_->route_config_name.empty()) { Unsubscribe( - XdsApi::kRdsTypeUrl, xds_client()->route_config_name_, + XdsApi::kRdsTypeUrl, xds_client()->lds_result_->route_config_name, /*delay_unsubscription=*/!lds_update->route_config_name.empty()); } - xds_client()->route_config_name_ = std::move(lds_update->route_config_name); - if (lds_update->rds_update.has_value()) { - // If cluster_name was found inlined in LDS response, notify the watcher - // immediately. - xds_client()->cluster_name_ = - std::move(lds_update->rds_update.value().cluster_name); + xds_client()->lds_result_ = std::move(lds_update); + if (xds_client()->lds_result_->rds_update.has_value()) { + // If the RouteConfiguration was found inlined in LDS response, notify + // the watcher immediately. RefCountedPtr service_config; grpc_error* error = xds_client()->CreateServiceConfig( - xds_client()->cluster_name_, &service_config); + xds_client()->lds_result_->rds_update->cluster_name, &service_config); if (error == GRPC_ERROR_NONE) { xds_client()->service_config_watcher_->OnServiceConfigChanged( std::move(service_config)); @@ -934,7 +934,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( } } else { // Send RDS request for dynamic resolution. - Subscribe(XdsApi::kRdsTypeUrl, xds_client()->route_config_name_); + Subscribe(XdsApi::kRdsTypeUrl, + xds_client()->lds_result_->route_config_name); } } @@ -955,10 +956,11 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate( } auto& rds_state = state_map_[XdsApi::kRdsTypeUrl]; auto& state = - rds_state.subscribed_resources[xds_client()->route_config_name_]; + rds_state + .subscribed_resources[xds_client()->lds_result_->route_config_name]; if (state != nullptr) state->Finish(); // Ignore identical update. - if (xds_client()->cluster_name_ == rds_update->cluster_name) { + if (xds_client()->rds_result_ == rds_update) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] RDS update identical to current, ignoring.", @@ -966,11 +968,11 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate( } return; } - xds_client()->cluster_name_ = std::move(rds_update->cluster_name); + xds_client()->rds_result_ = std::move(rds_update); // Notify the watcher. RefCountedPtr service_config; grpc_error* error = xds_client()->CreateServiceConfig( - xds_client()->cluster_name_, &service_config); + xds_client()->rds_result_->cluster_name, &service_config); if (error == GRPC_ERROR_NONE) { xds_client()->service_config_watcher_->OnServiceConfigChanged( std::move(service_config)); @@ -1215,7 +1217,10 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( std::string type_url; // Note that ParseAdsResponse() also validates the response. grpc_error* parse_error = xds_client->api_.ParseAdsResponse( - response_slice, xds_client->server_name_, xds_client->route_config_name_, + response_slice, xds_client->server_name_, + (xds_client->lds_result_.has_value() + ? xds_client->lds_result_->route_config_name + : ""), ads_calld->ClusterNamesForRequest(), ads_calld->EdsServiceNamesForRequest(), &lds_update, &rds_update, &cds_update_map, &eds_update_map, &version, &nonce, &type_url); diff --git a/src/core/ext/filters/client_channel/xds/xds_client.h b/src/core/ext/filters/client_channel/xds/xds_client.h index 228b9a21b47..609890dc02e 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.h +++ b/src/core/ext/filters/client_channel/xds/xds_client.h @@ -21,13 +21,14 @@ #include +#include "absl/types/optional.h" + #include "src/core/ext/filters/client_channel/service_config.h" #include "src/core/ext/filters/client_channel/xds/xds_api.h" #include "src/core/ext/filters/client_channel/xds/xds_bootstrap.h" #include "src/core/ext/filters/client_channel/xds/xds_client_stats.h" #include "src/core/lib/gprpp/map.h" #include "src/core/lib/gprpp/memory.h" -#include "src/core/lib/gprpp/optional.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -246,8 +247,9 @@ class XdsClient : public InternallyRefCounted { // The channel for communicating with the xds server. OrphanablePtr chand_; - std::string route_config_name_; - std::string cluster_name_; + absl::optional lds_result_; + absl::optional rds_result_; + // One entry for each watched CDS resource. std::map cluster_map_; // One entry for each watched EDS resource.