Merge pull request #22388 from markdroth/xds_lds_dup_fix

xds: fix duplicate LDS update detection
pull/22371/head
Mark D. Roth 5 years ago committed by GitHub
commit dd165a3d79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  2. 18
      src/core/ext/filters/client_channel/xds/xds_api.h
  3. 61
      src/core/ext/filters/client_channel/xds/xds_client.cc
  4. 8
      src/core/ext/filters/client_channel/xds/xds_client.h

@ -25,6 +25,8 @@
#include <limits.h>
#include <string.h>
#include "absl/types/optional.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
@ -80,7 +82,7 @@ class XdsConfig : public LoadBalancingPolicy::Config {
XdsConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy,
std::string eds_service_name,
Optional<std::string> lrs_load_reporting_server_name)
absl::optional<std::string> lrs_load_reporting_server_name)
: child_policy_(std::move(child_policy)),
fallback_policy_(std::move(fallback_policy)),
eds_service_name_(std::move(eds_service_name)),
@ -101,7 +103,7 @@ class XdsConfig : public LoadBalancingPolicy::Config {
return eds_service_name_.empty() ? nullptr : eds_service_name_.c_str();
};
const Optional<std::string>& lrs_load_reporting_server_name() const {
const absl::optional<std::string>& lrs_load_reporting_server_name() const {
return lrs_load_reporting_server_name_;
};
@ -109,7 +111,7 @@ class XdsConfig : public LoadBalancingPolicy::Config {
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy_;
std::string eds_service_name_;
Optional<std::string> lrs_load_reporting_server_name_;
absl::optional<std::string> lrs_load_reporting_server_name_;
};
class XdsLb : public LoadBalancingPolicy {
@ -1656,7 +1658,7 @@ class XdsFactory : public LoadBalancingPolicyFactory {
}
}
if (error_list.empty()) {
Optional<std::string> optional_lrs_load_reporting_server_name;
absl::optional<std::string> optional_lrs_load_reporting_server_name;
if (lrs_load_reporting_server_name != nullptr) {
optional_lrs_load_reporting_server_name.emplace(
std::string(lrs_load_reporting_server_name));

@ -25,12 +25,13 @@
#include <set>
#include "absl/types/optional.h"
#include <grpc/slice_buffer.h>
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/xds/xds_bootstrap.h"
#include "src/core/ext/filters/client_channel/xds/xds_client_stats.h"
#include "src/core/lib/gprpp/optional.h"
namespace grpc_core {
@ -46,14 +47,25 @@ class XdsApi {
struct RdsUpdate {
// The name to use in the CDS request.
std::string cluster_name;
bool operator==(const RdsUpdate& other) const {
return cluster_name == other.cluster_name;
}
};
// TODO(roth): When we can use absl::variant<>, consider using that
// here, to enforce the fact that only one of the two fields can be set.
struct LdsUpdate {
// The name to use in the RDS request.
std::string route_config_name;
// The name to use in the CDS request. Present if the LDS response has it
// inlined.
Optional<RdsUpdate> rds_update;
absl::optional<RdsUpdate> rds_update;
bool operator==(const LdsUpdate& other) const {
return route_config_name == other.route_config_name &&
rds_update == other.rds_update;
}
};
using LdsUpdateMap = std::map<std::string /*server_name*/, LdsUpdate>;
@ -68,7 +80,7 @@ class XdsApi {
// If not set, load reporting will be disabled.
// If set to the empty string, will use the same server we obtained the CDS
// data from.
Optional<std::string> lrs_load_reporting_server_name;
absl::optional<std::string> lrs_load_reporting_server_name;
};
using CdsUpdateMap = std::map<std::string /*cluster_name*/, CdsUpdate>;

@ -801,11 +801,12 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
GRPC_ERROR_REF(state.error), !sent_initial_message_);
state.subscribed_resources[xds_client()->server_name_]->Start(Ref());
} else if (type_url == XdsApi::kRdsTypeUrl) {
resource_names.insert(xds_client()->route_config_name_);
resource_names.insert(xds_client()->lds_result_->route_config_name);
request_payload_slice = xds_client()->api_.CreateRdsRequest(
xds_client()->route_config_name_, state.version, state.nonce,
GRPC_ERROR_REF(state.error), !sent_initial_message_);
state.subscribed_resources[xds_client()->route_config_name_]->Start(Ref());
xds_client()->lds_result_->route_config_name, state.version,
state.nonce, GRPC_ERROR_REF(state.error), !sent_initial_message_);
state.subscribed_resources[xds_client()->lds_result_->route_config_name]
->Start(Ref());
} else if (type_url == XdsApi::kCdsTypeUrl) {
resource_names = ClusterNamesForRequest();
request_payload_slice = xds_client()->api_.CreateCdsRequest(
@ -888,23 +889,23 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
"LDS update does not include requested resource"));
return;
}
const std::string& cluster_name =
lds_update->rds_update.has_value()
? lds_update->rds_update.value().cluster_name
: "";
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] LDS update received: route_config_name=%s, "
"cluster_name=%s (empty if RDS is needed to obtain it)",
xds_client(), lds_update->route_config_name.c_str(),
cluster_name.c_str());
"cluster_name=%s",
xds_client(),
(lds_update->route_config_name.empty()
? lds_update->route_config_name.c_str()
: "<inlined>"),
(lds_update->rds_update.has_value()
? lds_update->rds_update->cluster_name.c_str()
: "<to be obtained via RDS>"));
}
auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
auto& state = lds_state.subscribed_resources[xds_client()->server_name_];
if (state != nullptr) state->Finish();
// Ignore identical update.
if (xds_client()->route_config_name_ == lds_update->route_config_name &&
xds_client()->cluster_name_ == cluster_name) {
if (xds_client()->lds_result_ == lds_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] LDS update identical to current, ignoring.",
@ -912,20 +913,19 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
}
return;
}
if (!xds_client()->route_config_name_.empty()) {
if (xds_client()->lds_result_.has_value() &&
!xds_client()->lds_result_->route_config_name.empty()) {
Unsubscribe(
XdsApi::kRdsTypeUrl, xds_client()->route_config_name_,
XdsApi::kRdsTypeUrl, xds_client()->lds_result_->route_config_name,
/*delay_unsubscription=*/!lds_update->route_config_name.empty());
}
xds_client()->route_config_name_ = std::move(lds_update->route_config_name);
if (lds_update->rds_update.has_value()) {
// If cluster_name was found inlined in LDS response, notify the watcher
// immediately.
xds_client()->cluster_name_ =
std::move(lds_update->rds_update.value().cluster_name);
xds_client()->lds_result_ = std::move(lds_update);
if (xds_client()->lds_result_->rds_update.has_value()) {
// If the RouteConfiguration was found inlined in LDS response, notify
// the watcher immediately.
RefCountedPtr<ServiceConfig> service_config;
grpc_error* error = xds_client()->CreateServiceConfig(
xds_client()->cluster_name_, &service_config);
xds_client()->lds_result_->rds_update->cluster_name, &service_config);
if (error == GRPC_ERROR_NONE) {
xds_client()->service_config_watcher_->OnServiceConfigChanged(
std::move(service_config));
@ -934,7 +934,8 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
}
} else {
// Send RDS request for dynamic resolution.
Subscribe(XdsApi::kRdsTypeUrl, xds_client()->route_config_name_);
Subscribe(XdsApi::kRdsTypeUrl,
xds_client()->lds_result_->route_config_name);
}
}
@ -955,10 +956,11 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
}
auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
auto& state =
rds_state.subscribed_resources[xds_client()->route_config_name_];
rds_state
.subscribed_resources[xds_client()->lds_result_->route_config_name];
if (state != nullptr) state->Finish();
// Ignore identical update.
if (xds_client()->cluster_name_ == rds_update->cluster_name) {
if (xds_client()->rds_result_ == rds_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] RDS update identical to current, ignoring.",
@ -966,11 +968,11 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
}
return;
}
xds_client()->cluster_name_ = std::move(rds_update->cluster_name);
xds_client()->rds_result_ = std::move(rds_update);
// Notify the watcher.
RefCountedPtr<ServiceConfig> service_config;
grpc_error* error = xds_client()->CreateServiceConfig(
xds_client()->cluster_name_, &service_config);
xds_client()->rds_result_->cluster_name, &service_config);
if (error == GRPC_ERROR_NONE) {
xds_client()->service_config_watcher_->OnServiceConfigChanged(
std::move(service_config));
@ -1215,7 +1217,10 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
std::string type_url;
// Note that ParseAdsResponse() also validates the response.
grpc_error* parse_error = xds_client->api_.ParseAdsResponse(
response_slice, xds_client->server_name_, xds_client->route_config_name_,
response_slice, xds_client->server_name_,
(xds_client->lds_result_.has_value()
? xds_client->lds_result_->route_config_name
: ""),
ads_calld->ClusterNamesForRequest(),
ads_calld->EdsServiceNamesForRequest(), &lds_update, &rds_update,
&cds_update_map, &eds_update_map, &version, &nonce, &type_url);

@ -21,13 +21,14 @@
#include <set>
#include "absl/types/optional.h"
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/filters/client_channel/xds/xds_api.h"
#include "src/core/ext/filters/client_channel/xds/xds_bootstrap.h"
#include "src/core/ext/filters/client_channel/xds/xds_client_stats.h"
#include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/optional.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -246,8 +247,9 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
// The channel for communicating with the xds server.
OrphanablePtr<ChannelState> chand_;
std::string route_config_name_;
std::string cluster_name_;
absl::optional<XdsApi::LdsUpdate> lds_result_;
absl::optional<XdsApi::RdsUpdate> rds_result_;
// One entry for each watched CDS resource.
std::map<std::string /*cluster_name*/, ClusterState> cluster_map_;
// One entry for each watched EDS resource.

Loading…
Cancel
Save