[xDS] ref-count xDS resources instead of copying them (#34111)

pull/34174/head
Mark D. Roth 2 years ago committed by GitHub
parent 88df0a1c71
commit de98c1c9ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/core/BUILD
  2. 22
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  3. 79
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  4. 87
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  5. 14
      src/core/ext/xds/xds_client.cc
  6. 4
      src/core/ext/xds/xds_client.h
  7. 28
      src/core/ext/xds/xds_cluster.cc
  8. 1
      src/core/ext/xds/xds_cluster.h
  9. 24
      src/core/ext/xds/xds_endpoint.cc
  10. 1
      src/core/ext/xds/xds_endpoint.h
  11. 28
      src/core/ext/xds/xds_listener.cc
  12. 16
      src/core/ext/xds/xds_listener.h
  13. 8
      src/core/ext/xds/xds_resource_type.h
  14. 20
      src/core/ext/xds/xds_resource_type_impl.h
  15. 21
      src/core/ext/xds/xds_route_config.cc
  16. 2
      src/core/ext/xds/xds_route_config.h
  17. 81
      src/core/ext/xds/xds_server_config_fetcher.cc
  18. 5
      test/core/xds/xds_client_fuzzer.cc
  19. 123
      test/core/xds/xds_client_test.cc
  20. 51
      test/core/xds/xds_cluster_resource_type_test.cc
  21. 27
      test/core/xds/xds_endpoint_resource_type_test.cc
  22. 33
      test/core/xds/xds_listener_resource_type_test.cc
  23. 44
      test/core/xds/xds_route_config_resource_type_test.cc

@ -4516,6 +4516,7 @@ grpc_cc_library(
"lb_policy",
"lb_policy_factory",
"lb_policy_registry",
"no_destruct",
"pollset_set",
"ref_counted_string",
"validation_errors",

@ -123,7 +123,8 @@ class CdsLb : public LoadBalancingPolicy {
ClusterWatcher(RefCountedPtr<CdsLb> parent, std::string name)
: parent_(std::move(parent)), name_(std::move(name)) {}
void OnResourceChanged(XdsClusterResource cluster_data) override {
void OnResourceChanged(
std::shared_ptr<const XdsClusterResource> cluster_data) override {
RefCountedPtr<ClusterWatcher> self = Ref();
parent_->work_serializer()->Run(
[self = std::move(self),
@ -160,7 +161,7 @@ class CdsLb : public LoadBalancingPolicy {
// Not owned, so do not dereference.
ClusterWatcher* watcher = nullptr;
// Most recent update obtained from this watcher.
absl::optional<XdsClusterResource> update;
std::shared_ptr<const XdsClusterResource> update;
};
// Delegating helper to be passed to child policy.
@ -174,7 +175,7 @@ class CdsLb : public LoadBalancingPolicy {
const std::string& name, int depth, Json::Array* discovery_mechanisms,
std::set<std::string>* clusters_added);
void OnClusterChanged(const std::string& name,
XdsClusterResource cluster_data);
std::shared_ptr<const XdsClusterResource> cluster_data);
void OnError(const std::string& name, absl::Status status);
void OnResourceDoesNotExist(const std::string& name);
@ -328,7 +329,7 @@ absl::StatusOr<bool> CdsLb::GenerateDiscoveryMechanismForCluster(
return false;
}
// Don't have the update we need yet.
if (!state.update.has_value()) return false;
if (state.update == nullptr) return false;
// For AGGREGATE clusters, recursively expand to child clusters.
auto* aggregate =
absl::get_if<XdsClusterResource::Aggregate>(&state.update->type);
@ -422,13 +423,15 @@ absl::StatusOr<bool> CdsLb::GenerateDiscoveryMechanismForCluster(
return true;
}
void CdsLb::OnClusterChanged(const std::string& name,
XdsClusterResource cluster_data) {
void CdsLb::OnClusterChanged(
const std::string& name,
std::shared_ptr<const XdsClusterResource> cluster_data) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(
GPR_INFO,
"[cdslb %p] received CDS update for cluster %s from xds client %p: %s",
this, name.c_str(), xds_client_.get(), cluster_data.ToString().c_str());
this, name.c_str(), xds_client_.get(),
cluster_data->ToString().c_str());
}
// Store the update in the map if we are still interested in watching this
// cluster (i.e., it is not cancelled already).
@ -436,10 +439,9 @@ void CdsLb::OnClusterChanged(const std::string& name,
// that was scheduled before the deletion, so we can just ignore it.
auto it = watchers_.find(name);
if (it == watchers_.end()) return;
it->second.update = cluster_data;
it->second.update = std::move(cluster_data);
// Take care of integration with new certificate code.
absl::Status status =
UpdateXdsCertificateProvider(name, it->second.update.value());
absl::Status status = UpdateXdsCertificateProvider(name, *it->second.update);
if (!status.ok()) {
return OnError(name, status);
}

@ -54,6 +54,7 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/ref_counted_string.h"
@ -212,7 +213,8 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
~EndpointWatcher() override {
discovery_mechanism_.reset(DEBUG_LOCATION, "EndpointWatcher");
}
void OnResourceChanged(XdsEndpointResource update) override {
void OnResourceChanged(
std::shared_ptr<const XdsEndpointResource> update) override {
RefCountedPtr<EndpointWatcher> self = Ref();
discovery_mechanism_->parent()->work_serializer()->Run(
[self = std::move(self), update = std::move(update)]() mutable {
@ -241,15 +243,16 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
// Code accessing protected methods of `DiscoveryMechanism` need to be
// in methods of this class rather than in lambdas to work around an MSVC
// bug.
void OnResourceChangedHelper(XdsEndpointResource update) {
void OnResourceChangedHelper(
std::shared_ptr<const XdsEndpointResource> update) {
std::string resolution_note;
if (update.priorities.empty()) {
if (update->priorities.empty()) {
resolution_note = absl::StrCat(
"EDS resource ", discovery_mechanism_->GetEdsResourceName(),
" contains no localities");
} else {
std::set<std::string> empty_localities;
for (const auto& priority : update.priorities) {
for (const auto& priority : update->priorities) {
for (const auto& p : priority.localities) {
if (p.second.endpoints.empty()) {
empty_localities.insert(p.first->AsHumanReadableString());
@ -345,7 +348,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
struct DiscoveryMechanismEntry {
OrphanablePtr<DiscoveryMechanism> discovery_mechanism;
// Most recent update reported by the discovery mechanism.
absl::optional<XdsEndpointResource> latest_update;
std::shared_ptr<const XdsEndpointResource> latest_update;
// Last resolution note reported by the discovery mechanism, if any.
std::string resolution_note;
// State used to retain child policy names for priority policy.
@ -376,7 +379,8 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
void ShutdownLocked() override;
void OnEndpointChanged(size_t index, XdsEndpointResource update,
void OnEndpointChanged(size_t index,
std::shared_ptr<const XdsEndpointResource> update,
std::string resolution_note);
void OnError(size_t index, std::string resolution_note);
void OnResourceDoesNotExist(size_t index, std::string resolution_note);
@ -503,14 +507,14 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
return;
}
// Convert resolver result to EDS update.
XdsEndpointResource update;
auto update = std::make_shared<XdsEndpointResource>();
XdsEndpointResource::Priority::Locality locality;
locality.name = MakeRefCounted<XdsLocalityName>("", "", "");
locality.lb_weight = 1;
locality.endpoints = std::move(*result.addresses);
XdsEndpointResource::Priority priority;
priority.localities.emplace(locality.name.get(), std::move(locality));
update.priorities.emplace_back(std::move(priority));
update->priorities.emplace_back(std::move(priority));
lb_policy->OnEndpointChanged(index, std::move(update),
std::move(result.resolution_note));
}
@ -623,9 +627,21 @@ void XdsClusterResolverLb::ExitIdleLocked() {
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
}
void XdsClusterResolverLb::OnEndpointChanged(size_t index,
XdsEndpointResource update,
std::string resolution_note) {
// We need at least one priority for each discovery mechanism, just so that we
// have a child in which to create the xds_cluster_impl policy. This ensures
// that we properly handle the case of a discovery mechanism dropping 100% of
// calls, the OnError() case, and the OnResourceDoesNotExist() case.
const XdsEndpointResource::PriorityList& GetUpdatePriorityList(
const XdsEndpointResource& update) {
static const NoDestruct<XdsEndpointResource::PriorityList>
kPriorityListWithEmptyPriority(1);
if (update.priorities.empty()) return *kPriorityListWithEmptyPriority;
return update.priorities;
}
void XdsClusterResolverLb::OnEndpointChanged(
size_t index, std::shared_ptr<const XdsEndpointResource> update,
std::string resolution_note) {
if (shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
gpr_log(GPR_INFO,
@ -634,11 +650,8 @@ void XdsClusterResolverLb::OnEndpointChanged(size_t index,
this, index, resolution_note.c_str());
}
DiscoveryMechanismEntry& discovery_entry = discovery_mechanisms_[index];
// We need at least one priority for each discovery mechanism, just so that we
// have a child in which to create the xds_cluster_impl policy. This ensures
// that we properly handle the case of a discovery mechanism dropping 100% of
// calls, the OnError() case, and the OnResourceDoesNotExist() case.
if (update.priorities.empty()) update.priorities.emplace_back();
const XdsEndpointResource::PriorityList& priority_list =
GetUpdatePriorityList(*update);
// Update priority_child_numbers, reusing old child numbers in an
// intelligent way to avoid unnecessary churn.
// First, build some maps from locality to child number and the reverse
@ -647,8 +660,9 @@ void XdsClusterResolverLb::OnEndpointChanged(size_t index,
locality_child_map;
std::map<size_t, std::set<XdsLocalityName*, XdsLocalityName::Less>>
child_locality_map;
if (discovery_entry.latest_update.has_value()) {
const auto& prev_priority_list = discovery_entry.latest_update->priorities;
if (discovery_entry.latest_update != nullptr) {
const auto& prev_priority_list =
GetUpdatePriorityList(*discovery_entry.latest_update);
for (size_t priority = 0; priority < prev_priority_list.size();
++priority) {
size_t child_number = discovery_entry.priority_child_numbers[priority];
@ -662,8 +676,8 @@ void XdsClusterResolverLb::OnEndpointChanged(size_t index,
}
// Construct new list of children.
std::vector<size_t> priority_child_numbers;
for (size_t priority = 0; priority < update.priorities.size(); ++priority) {
const auto& localities = update.priorities[priority].localities;
for (size_t priority = 0; priority < priority_list.size(); ++priority) {
const auto& localities = priority_list[priority].localities;
absl::optional<size_t> child_number;
// If one of the localities in this priority already existed, reuse its
// child number.
@ -716,7 +730,7 @@ void XdsClusterResolverLb::OnEndpointChanged(size_t index,
// will put the channel into TRANSIENT_FAILURE instead of CONNECTING
// while we're still waiting for the other discovery mechanism(s).
for (DiscoveryMechanismEntry& mechanism : discovery_mechanisms_) {
if (!mechanism.latest_update.has_value()) return;
if (mechanism.latest_update == nullptr) return;
}
// Update child policy.
// TODO(roth): If the child policy reports an error with the update,
@ -730,10 +744,11 @@ void XdsClusterResolverLb::OnError(size_t index, std::string resolution_note) {
" reported error: %s",
this, index, resolution_note.c_str());
if (shutting_down_) return;
if (!discovery_mechanisms_[index].latest_update.has_value()) {
if (discovery_mechanisms_[index].latest_update == nullptr) {
// Call OnEndpointChanged() with an empty update just like
// OnResourceDoesNotExist().
OnEndpointChanged(index, XdsEndpointResource(), std::move(resolution_note));
OnEndpointChanged(index, std::make_shared<XdsEndpointResource>(),
std::move(resolution_note));
}
}
@ -745,7 +760,8 @@ void XdsClusterResolverLb::OnResourceDoesNotExist(size_t index,
this, index, resolution_note.c_str());
if (shutting_down_) return;
// Call OnEndpointChanged() with an empty update.
OnEndpointChanged(index, XdsEndpointResource(), std::move(resolution_note));
OnEndpointChanged(index, std::make_shared<XdsEndpointResource>(),
std::move(resolution_note));
}
//
@ -755,11 +771,10 @@ void XdsClusterResolverLb::OnResourceDoesNotExist(size_t index,
ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() {
ServerAddressList addresses;
for (const auto& discovery_entry : discovery_mechanisms_) {
for (size_t priority = 0;
priority < discovery_entry.latest_update->priorities.size();
++priority) {
const auto& priority_entry =
discovery_entry.latest_update->priorities[priority];
const auto& priority_list =
GetUpdatePriorityList(*discovery_entry.latest_update);
for (size_t priority = 0; priority < priority_list.size(); ++priority) {
const auto& priority_entry = priority_list[priority];
std::string priority_child_name =
discovery_entry.GetChildPolicyName(priority);
for (const auto& p : priority_entry.localities) {
@ -803,10 +818,10 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
Json::Object priority_children;
Json::Array priority_priorities;
for (const auto& discovery_entry : discovery_mechanisms_) {
const auto& priority_list =
GetUpdatePriorityList(*discovery_entry.latest_update);
const auto& discovery_config = discovery_entry.config();
for (size_t priority = 0;
priority < discovery_entry.latest_update->priorities.size();
++priority) {
for (size_t priority = 0; priority < priority_list.size(); ++priority) {
// Determine what xDS LB policy to use.
Json child_policy;
if (!discovery_entry.discovery_mechanism->override_child_policy()

@ -143,7 +143,8 @@ class XdsResolver : public Resolver {
public:
explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
: resolver_(std::move(resolver)) {}
void OnResourceChanged(XdsListenerResource listener) override {
void OnResourceChanged(
std::shared_ptr<const XdsListenerResource> listener) override {
RefCountedPtr<ListenerWatcher> self = Ref();
resolver_->work_serializer_->Run(
[self = std::move(self), listener = std::move(listener)]() mutable {
@ -180,7 +181,8 @@ class XdsResolver : public Resolver {
public:
explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
: resolver_(std::move(resolver)) {}
void OnResourceChanged(XdsRouteConfigResource route_config) override {
void OnResourceChanged(
std::shared_ptr<const XdsRouteConfigResource> route_config) override {
RefCountedPtr<RouteConfigWatcher> self = Ref();
resolver_->work_serializer_->Run(
[self = std::move(self),
@ -397,8 +399,9 @@ class XdsResolver : public Resolver {
return it->second->Ref();
}
void OnListenerUpdate(XdsListenerResource listener);
void OnRouteConfigUpdate(XdsRouteConfigResource rds_update);
void OnListenerUpdate(std::shared_ptr<const XdsListenerResource> listener);
void OnRouteConfigUpdate(
std::shared_ptr<const XdsRouteConfigResource> rds_update);
void OnError(absl::string_view context, absl::Status status);
void OnResourceDoesNotExist(std::string context);
@ -418,17 +421,13 @@ class XdsResolver : public Resolver {
uint64_t channel_id_;
ListenerWatcher* listener_watcher_ = nullptr;
// This will not contain the RouteConfiguration, even if it comes with the
// LDS response; instead, the relevant VirtualHost from the
// RouteConfiguration will be saved in current_virtual_host_.
XdsListenerResource::HttpConnectionManager current_listener_;
std::shared_ptr<const XdsListenerResource> current_listener_;
std::string route_config_name_;
RouteConfigWatcher* route_config_watcher_ = nullptr;
absl::optional<XdsRouteConfigResource::VirtualHost> current_virtual_host_;
std::map<std::string /*cluster_specifier_plugin_name*/,
std::string /*LB policy config*/>
cluster_specifier_plugin_map_;
std::shared_ptr<const XdsRouteConfigResource> current_route_config_;
const XdsRouteConfigResource::VirtualHost* current_virtual_host_ = nullptr;
std::map<absl::string_view, WeakRefCountedPtr<ClusterRef>> cluster_ref_map_;
};
@ -546,11 +545,12 @@ XdsResolver::RouteConfigData::CreateMethodConfig(
route_action.max_stream_duration->ToJsonString()));
}
// Handle xDS HTTP filters.
const auto& hcm = absl::get<XdsListenerResource::HttpConnectionManager>(
resolver->current_listener_->listener);
auto result = XdsRouting::GeneratePerHTTPFilterConfigs(
static_cast<const GrpcXdsBootstrap&>(resolver->xds_client_->bootstrap())
.http_filter_registry(),
resolver->current_listener_.http_filters,
resolver->current_virtual_host_.value(), route, cluster_weight,
hcm.http_filters, *resolver->current_virtual_host_, route, cluster_weight,
resolver->args_);
if (!result.ok()) return result.status();
for (const auto& p : result->per_filter_configs) {
@ -673,7 +673,9 @@ XdsResolver::XdsConfigSelector::XdsConfigSelector(
const auto& http_filter_registry =
static_cast<const GrpcXdsBootstrap&>(resolver_->xds_client_->bootstrap())
.http_filter_registry();
for (const auto& http_filter : resolver_->current_listener_.http_filters) {
const auto& hcm = absl::get<XdsListenerResource::HttpConnectionManager>(
resolver_->current_listener_->listener);
for (const auto& http_filter : hcm.http_filters) {
// Find filter. This is guaranteed to succeed, because it's checked
// at config validation time in the XdsApi code.
const XdsHttpFilterImpl* filter_impl =
@ -1006,26 +1008,27 @@ void XdsResolver::ShutdownLocked() {
}
}
void XdsResolver::OnListenerUpdate(XdsListenerResource listener) {
void XdsResolver::OnListenerUpdate(
std::shared_ptr<const XdsListenerResource> listener) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this);
}
if (xds_client_ == nullptr) return;
auto* hcm = absl::get_if<XdsListenerResource::HttpConnectionManager>(
&listener.listener);
const auto* hcm = absl::get_if<XdsListenerResource::HttpConnectionManager>(
&listener->listener);
if (hcm == nullptr) {
return OnError(lds_resource_name_,
absl::UnavailableError("not an API listener"));
}
current_listener_ = std::move(*hcm);
MatchMutable(
&current_listener_.route_config,
current_listener_ = std::move(listener);
Match(
hcm->route_config,
// RDS resource name
[&](std::string* rds_name) {
[&](const std::string& rds_name) {
// If the RDS name changed, update the RDS watcher.
// Note that this will be true on the initial update, because
// route_config_name_ will be empty.
if (route_config_name_ != *rds_name) {
if (route_config_name_ != rds_name) {
// If we already had a watch (i.e., if the previous config had
// a different RDS name), stop the previous watch.
// There will be no previous watch if either (a) this is the
@ -1038,7 +1041,7 @@ void XdsResolver::OnListenerUpdate(XdsListenerResource listener) {
route_config_watcher_ = nullptr;
}
// Start watch for the new RDS resource name.
route_config_name_ = std::move(*rds_name);
route_config_name_ = rds_name;
auto watcher = MakeRefCounted<RouteConfigWatcher>(Ref());
route_config_watcher_ = watcher.get();
XdsRouteConfigResourceType::StartWatch(
@ -1051,7 +1054,7 @@ void XdsResolver::OnListenerUpdate(XdsListenerResource listener) {
}
},
// inlined RouteConfig
[&](XdsRouteConfigResource* route_config) {
[&](const std::shared_ptr<const XdsRouteConfigResource>& route_config) {
// If the previous update specified an RDS resource instead of
// having an inlined RouteConfig, we need to cancel the RDS watch.
if (route_config_watcher_ != nullptr) {
@ -1060,7 +1063,7 @@ void XdsResolver::OnListenerUpdate(XdsListenerResource listener) {
route_config_watcher_ = nullptr;
route_config_name_.clear();
}
OnRouteConfigUpdate(std::move(*route_config));
OnRouteConfigUpdate(route_config);
});
}
@ -1081,16 +1084,15 @@ class VirtualHostListIterator : public XdsRouting::VirtualHostListIterator {
const std::vector<XdsRouteConfigResource::VirtualHost>* virtual_hosts_;
};
void XdsResolver::OnRouteConfigUpdate(XdsRouteConfigResource rds_update) {
void XdsResolver::OnRouteConfigUpdate(
std::shared_ptr<const XdsRouteConfigResource> rds_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this);
}
if (xds_client_ == nullptr) {
return;
}
if (xds_client_ == nullptr) return;
// Find the relevant VirtualHost from the RouteConfiguration.
auto vhost_index = XdsRouting::FindVirtualHostForDomain(
VirtualHostListIterator(&rds_update.virtual_hosts),
VirtualHostListIterator(&rds_update->virtual_hosts),
data_plane_authority_);
if (!vhost_index.has_value()) {
OnError(
@ -1101,9 +1103,8 @@ void XdsResolver::OnRouteConfigUpdate(XdsRouteConfigResource rds_update) {
return;
}
// Save the virtual host in the resolver.
current_virtual_host_ = std::move(rds_update.virtual_hosts[*vhost_index]);
cluster_specifier_plugin_map_ =
std::move(rds_update.cluster_specifier_plugin_map);
current_route_config_ = std::move(rds_update);
current_virtual_host_ = &current_route_config_->virtual_hosts[*vhost_index];
// Send a new result to the channel.
GenerateResult();
}
@ -1130,10 +1131,8 @@ void XdsResolver::OnResourceDoesNotExist(std::string context) {
"[xds_resolver %p] LDS/RDS resource does not exist -- clearing "
"update and returning empty service config",
this);
if (xds_client_ == nullptr) {
return;
}
current_virtual_host_.reset();
if (xds_client_ == nullptr) return;
current_virtual_host_ = nullptr;
Result result;
result.addresses.emplace();
result.service_config = ServiceConfigImpl::Create(args_, "{}");
@ -1154,7 +1153,8 @@ XdsResolver::CreateServiceConfig() {
" \"childPolicy\": %s\n"
" }",
cluster.first,
cluster_specifier_plugin_map_[std::string(child_name)]));
current_route_config_->cluster_specifier_plugin_map.at(
std::string(child_name))));
} else {
absl::ConsumePrefix(&child_name, "cluster:");
clusters.push_back(
@ -1185,12 +1185,13 @@ XdsResolver::CreateServiceConfig() {
}
void XdsResolver::GenerateResult() {
if (!current_virtual_host_.has_value()) return;
if (current_virtual_host_ == nullptr) return;
// First create XdsConfigSelector, which may add new entries to the cluster
// state map, and then CreateServiceConfig for LB policies.
auto route_config_data =
RouteConfigData::Create(this, current_virtual_host_->routes,
current_listener_.http_max_stream_duration);
const auto& hcm = absl::get<XdsListenerResource::HttpConnectionManager>(
current_listener_->listener);
auto route_config_data = RouteConfigData::Create(
this, current_virtual_host_->routes, hcm.http_max_stream_duration);
if (!route_config_data.ok()) {
OnError("could not create ConfigSelector",
absl::UnavailableError(route_config_data.status().message()));

@ -865,15 +865,12 @@ void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
std::string(serialized_resource), result_.version, update_time_);
// Notify watchers.
auto& watchers_list = resource_state.watchers;
auto* value =
result_.type->CopyResource(resource_state.resource.get()).release();
xds_client()->work_serializer_.Schedule(
[watchers_list, value]()
[watchers_list, value = resource_state.resource]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) {
for (const auto& p : watchers_list) {
p.first->OnGenericResourceChanged(value);
}
delete value;
},
DEBUG_LOCATION);
}
@ -1604,12 +1601,11 @@ void XdsClient::WatchResource(const XdsResourceType* type,
"[xds_client %p] returning cached listener data for %s", this,
std::string(name).c_str());
}
auto* value = type->CopyResource(resource_state.resource.get()).release();
work_serializer_.Schedule(
[watcher, value]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
watcher->OnGenericResourceChanged(value);
delete value;
},
[watcher, value = resource_state.resource]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
watcher->OnGenericResourceChanged(value);
},
DEBUG_LOCATION);
} else if (resource_state.meta.client_status ==
XdsApi::ResourceMetadata::DOES_NOT_EXIST) {

@ -63,7 +63,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
class ResourceWatcherInterface : public RefCounted<ResourceWatcherInterface> {
public:
virtual void OnGenericResourceChanged(
const XdsResourceType::ResourceData* resource)
std::shared_ptr<const XdsResourceType::ResourceData> resource)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnError(absl::Status status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
@ -238,7 +238,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>>
watchers;
// The latest data seen for the resource.
std::unique_ptr<XdsResourceType::ResourceData> resource;
std::shared_ptr<const XdsResourceType::ResourceData> resource;
XdsApi::ResourceMetadata meta;
bool ignored_deletion = false;
};

@ -20,6 +20,7 @@
#include <stddef.h>
#include <memory>
#include <utility>
#include "absl/status/status.h"
@ -416,18 +417,18 @@ void ParseLbPolicyConfig(const XdsResourceType::DecodeContext& context,
}
}
absl::StatusOr<XdsClusterResource> CdsResourceParse(
absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse(
const XdsResourceType::DecodeContext& context,
const envoy_config_cluster_v3_Cluster* cluster) {
XdsClusterResource cds_update;
auto cds_update = std::make_shared<XdsClusterResource>();
ValidationErrors errors;
// Check the cluster discovery type.
if (envoy_config_cluster_v3_Cluster_type(cluster) ==
envoy_config_cluster_v3_Cluster_EDS) {
cds_update.type = EdsConfigParse(cluster, &errors);
cds_update->type = EdsConfigParse(cluster, &errors);
} else if (envoy_config_cluster_v3_Cluster_type(cluster) ==
envoy_config_cluster_v3_Cluster_LOGICAL_DNS) {
cds_update.type = LogicalDnsParse(cluster, &errors);
cds_update->type = LogicalDnsParse(cluster, &errors);
} else if (envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) {
ValidationErrors::ScopedField field(&errors, ".cluster_type");
const auto* custom_cluster_type =
@ -454,7 +455,7 @@ absl::StatusOr<XdsClusterResource> CdsResourceParse(
".value[envoy.extensions.clusters.aggregate.v3.ClusterConfig]");
absl::string_view serialized_config =
UpbStringToAbsl(google_protobuf_Any_value(typed_config));
cds_update.type =
cds_update->type =
AggregateClusterParse(context, serialized_config, &errors);
}
}
@ -463,13 +464,13 @@ absl::StatusOr<XdsClusterResource> CdsResourceParse(
errors.AddError("unknown discovery type");
}
// Check the LB policy.
ParseLbPolicyConfig(context, cluster, &cds_update, &errors);
ParseLbPolicyConfig(context, cluster, cds_update.get(), &errors);
// transport_socket
auto* transport_socket =
envoy_config_cluster_v3_Cluster_transport_socket(cluster);
if (transport_socket != nullptr) {
ValidationErrors::ScopedField field(&errors, ".transport_socket");
cds_update.common_tls_context =
cds_update->common_tls_context =
UpstreamTlsContextParse(context, transport_socket, &errors);
}
// Record LRS server name (if any).
@ -480,7 +481,7 @@ absl::StatusOr<XdsClusterResource> CdsResourceParse(
ValidationErrors::ScopedField field(&errors, ".lrs_server");
errors.AddError("ConfigSource is not self");
}
cds_update.lrs_load_reporting_server.emplace(
cds_update->lrs_load_reporting_server.emplace(
static_cast<const GrpcXdsBootstrap::GrpcXdsServer&>(context.server));
}
// The Cluster resource encodes the circuit breaking parameters in a list of
@ -502,7 +503,7 @@ absl::StatusOr<XdsClusterResource> CdsResourceParse(
envoy_config_cluster_v3_CircuitBreakers_Thresholds_max_requests(
threshold);
if (max_requests != nullptr) {
cds_update.max_concurrent_requests =
cds_update->max_concurrent_requests =
google_protobuf_UInt32Value_value(max_requests);
}
break;
@ -629,7 +630,7 @@ absl::StatusOr<XdsClusterResource> CdsResourceParse(
failure_percentage_ejection;
}
}
cds_update.outlier_detection = outlier_detection_update;
cds_update->outlier_detection = outlier_detection_update;
}
// Validate override host status.
if (XdsOverrideHostEnabled()) {
@ -648,7 +649,7 @@ absl::StatusOr<XdsClusterResource> CdsResourceParse(
for (size_t i = 0; i < size; ++i) {
auto status = XdsHealthStatus::FromUpb(statuses[i]);
if (status.has_value()) {
cds_update.override_host_statuses.insert(*status);
cds_update->override_host_statuses.insert(*status);
}
}
}
@ -703,10 +704,9 @@ XdsResourceType::DecodeResult XdsClusterResourceType::Decode(
} else {
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_INFO, "[xds_client %p] parsed Cluster %s: %s", context.client,
result.name->c_str(), cds_resource->ToString().c_str());
result.name->c_str(), (*cds_resource)->ToString().c_str());
}
result.resource =
std::make_unique<XdsClusterResource>(std::move(*cds_resource));
result.resource = std::move(*cds_resource);
}
return result;
}

@ -22,7 +22,6 @@
#include <stdint.h>
#include <algorithm>
#include <memory>
#include <set>
#include <string>
#include <vector>

@ -23,6 +23,7 @@
#include <algorithm>
#include <limits>
#include <memory>
#include <set>
#include <vector>
@ -353,12 +354,12 @@ void DropParseAndAppend(
drop_config->AddCategory(std::move(category), numerator);
}
absl::StatusOr<XdsEndpointResource> EdsResourceParse(
absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> EdsResourceParse(
const XdsResourceType::DecodeContext& /*context*/,
const envoy_config_endpoint_v3_ClusterLoadAssignment*
cluster_load_assignment) {
ValidationErrors errors;
XdsEndpointResource eds_resource;
auto eds_resource = std::make_shared<XdsEndpointResource>();
// endpoints
{
ValidationErrors::ScopedField field(&errors, "endpoints");
@ -374,11 +375,11 @@ absl::StatusOr<XdsEndpointResource> EdsResourceParse(
GPR_ASSERT(parsed_locality->locality.lb_weight != 0);
// Make sure prorities is big enough. Note that they might not
// arrive in priority order.
if (eds_resource.priorities.size() < parsed_locality->priority + 1) {
eds_resource.priorities.resize(parsed_locality->priority + 1);
if (eds_resource->priorities.size() < parsed_locality->priority + 1) {
eds_resource->priorities.resize(parsed_locality->priority + 1);
}
auto& locality_map =
eds_resource.priorities[parsed_locality->priority].localities;
eds_resource->priorities[parsed_locality->priority].localities;
auto it = locality_map.find(parsed_locality->locality.name.get());
if (it != locality_map.end()) {
errors.AddError(absl::StrCat(
@ -391,8 +392,8 @@ absl::StatusOr<XdsEndpointResource> EdsResourceParse(
}
}
}
for (size_t i = 0; i < eds_resource.priorities.size(); ++i) {
const auto& priority = eds_resource.priorities[i];
for (size_t i = 0; i < eds_resource->priorities.size(); ++i) {
const auto& priority = eds_resource->priorities[i];
if (priority.localities.empty()) {
errors.AddError(absl::StrCat("priority ", i, " empty"));
} else {
@ -412,7 +413,7 @@ absl::StatusOr<XdsEndpointResource> EdsResourceParse(
}
}
// policy
eds_resource.drop_config = MakeRefCounted<XdsEndpointResource::DropConfig>();
eds_resource->drop_config = MakeRefCounted<XdsEndpointResource::DropConfig>();
const auto* policy = envoy_config_endpoint_v3_ClusterLoadAssignment_policy(
cluster_load_assignment);
if (policy != nullptr) {
@ -424,7 +425,7 @@ absl::StatusOr<XdsEndpointResource> EdsResourceParse(
for (size_t i = 0; i < drop_size; ++i) {
ValidationErrors::ScopedField field(
&errors, absl::StrCat(".drop_overloads[", i, "]"));
DropParseAndAppend(drop_overload[i], eds_resource.drop_config.get(),
DropParseAndAppend(drop_overload[i], eds_resource->drop_config.get(),
&errors);
}
}
@ -466,10 +467,9 @@ XdsResourceType::DecodeResult XdsEndpointResourceType::Decode(
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_INFO, "[xds_client %p] parsed ClusterLoadAssignment %s: %s",
context.client, result.name->c_str(),
eds_resource->ToString().c_str());
(*eds_resource)->ToString().c_str());
}
result.resource =
std::make_unique<XdsEndpointResource>(std::move(*eds_resource));
result.resource = std::move(*eds_resource);
}
return result;
}

@ -23,7 +23,6 @@
#include <algorithm>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>

@ -74,8 +74,8 @@ std::string XdsListenerResource::HttpConnectionManager::ToString() const {
[](const std::string& rds_name) {
return absl::StrCat("rds_name=", rds_name);
},
[](const XdsRouteConfigResource& route_config) {
return absl::StrCat("route_config=", route_config.ToString());
[](const std::shared_ptr<const XdsRouteConfigResource>& route_config) {
return absl::StrCat("route_config=", route_config->ToString());
}));
contents.push_back(absl::StrCat("http_max_stream_duration=",
http_max_stream_duration.ToString()));
@ -490,17 +490,18 @@ XdsListenerResource::HttpConnectionManager HttpConnectionManagerParse(
return http_connection_manager;
}
absl::StatusOr<XdsListenerResource> LdsResourceParseClient(
absl::StatusOr<std::shared_ptr<const XdsListenerResource>>
LdsResourceParseClient(
const XdsResourceType::DecodeContext& context,
const envoy_config_listener_v3_ApiListener* api_listener) {
XdsListenerResource lds_update;
auto lds_update = std::make_shared<XdsListenerResource>();
ValidationErrors errors;
ValidationErrors::ScopedField field(&errors, "api_listener.api_listener");
auto* api_listener_field =
envoy_config_listener_v3_ApiListener_api_listener(api_listener);
auto extension = ExtractXdsExtension(context, api_listener_field, &errors);
if (extension.has_value()) {
lds_update.listener = HttpConnectionManagerParse(
lds_update->listener = HttpConnectionManagerParse(
/*is_client=*/true, context, std::move(*extension), &errors);
}
if (!errors.ok()) {
@ -991,9 +992,9 @@ XdsListenerResource::FilterChainMap BuildFilterChainMap(
return BuildFromInternalFilterChainMap(&internal_filter_chain_map);
}
absl::StatusOr<XdsListenerResource> LdsResourceParseServer(
const XdsResourceType::DecodeContext& context,
const envoy_config_listener_v3_Listener* listener) {
absl::StatusOr<std::shared_ptr<const XdsListenerResource>>
LdsResourceParseServer(const XdsResourceType::DecodeContext& context,
const envoy_config_listener_v3_Listener* listener) {
ValidationErrors errors;
XdsListenerResource::TcpListener tcp_listener;
// address
@ -1054,12 +1055,12 @@ absl::StatusOr<XdsListenerResource> LdsResourceParseServer(
return errors.status(absl::StatusCode::kInvalidArgument,
"errors validating server Listener");
}
XdsListenerResource lds_update;
lds_update.listener = std::move(tcp_listener);
auto lds_update = std::make_shared<XdsListenerResource>();
lds_update->listener = std::move(tcp_listener);
return lds_update;
}
absl::StatusOr<XdsListenerResource> LdsResourceParse(
absl::StatusOr<std::shared_ptr<const XdsListenerResource>> LdsResourceParse(
const XdsResourceType::DecodeContext& context,
const envoy_config_listener_v3_Listener* listener) {
// Check whether it's a client or server listener.
@ -1127,10 +1128,9 @@ XdsResourceType::DecodeResult XdsListenerResourceType::Decode(
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_INFO, "[xds_client %p] parsed Listener %s: %s",
context.client, result.name->c_str(),
listener->ToString().c_str());
(*listener)->ToString().c_str());
}
result.resource =
std::make_unique<XdsListenerResource>(std::move(*listener));
result.resource = std::move(*listener);
}
return result;
}

@ -51,7 +51,8 @@ namespace grpc_core {
struct XdsListenerResource : public XdsResourceType::ResourceData {
struct HttpConnectionManager {
// The RDS resource name or inline RouteConfiguration.
absl::variant<std::string, XdsRouteConfigResource> route_config;
absl::variant<std::string, std::shared_ptr<const XdsRouteConfigResource>>
route_config;
// Storing the Http Connection Manager Common Http Protocol Option
// max_stream_duration
@ -70,8 +71,17 @@ struct XdsListenerResource : public XdsResourceType::ResourceData {
std::vector<HttpFilter> http_filters;
bool operator==(const HttpConnectionManager& other) const {
return route_config == other.route_config &&
http_max_stream_duration == other.http_max_stream_duration &&
if (absl::holds_alternative<std::string>(route_config)) {
if (route_config != other.route_config) return false;
} else {
auto& rc1 = absl::get<std::shared_ptr<const XdsRouteConfigResource>>(
route_config);
auto* rc2 = absl::get_if<std::shared_ptr<const XdsRouteConfigResource>>(
&other.route_config);
if (rc2 == nullptr) return false;
if (!(*rc1 == **rc2)) return false;
}
return http_max_stream_duration == other.http_max_stream_duration &&
http_filters == other.http_filters;
}

@ -61,7 +61,7 @@ class XdsResourceType {
// non-OK status.
absl::optional<std::string> name;
// The parsed and validated resource, or an error status.
absl::StatusOr<std::unique_ptr<ResourceData>> resource;
absl::StatusOr<std::shared_ptr<const ResourceData>> resource;
};
virtual ~XdsResourceType() = default;
@ -79,12 +79,6 @@ class XdsResourceType {
virtual bool ResourcesEqual(const ResourceData* r1,
const ResourceData* r2) const = 0;
// Returns a copy of resource.
// Must be invoked only on resources returned by this object's Decode()
// method.
virtual std::unique_ptr<ResourceData> CopyResource(
const ResourceData* resource) const = 0;
// Indicates whether the resource type requires that all resources must
// be present in every SotW response from the server. If true, a
// response that does not include a previously seen resource will be

@ -19,6 +19,7 @@
#include <grpc/support/port_platform.h>
#include <memory>
#include <utility>
#include "absl/strings/string_view.h"
@ -30,8 +31,8 @@ namespace grpc_core {
// Base class for XdsResourceType implementations.
// Handles all down-casting logic for a particular resource type struct.
// ResourceTypeStruct must inherit from XdsResourceType::ResourceData,
// must be copy-constructible, and must implement operator==().
// ResourceTypeStruct must inherit from XdsResourceType::ResourceData
// and must implement operator==().
template <typename Subclass, typename ResourceTypeStruct>
class XdsResourceTypeImpl : public XdsResourceType {
public:
@ -40,14 +41,17 @@ class XdsResourceTypeImpl : public XdsResourceType {
// XdsClient watcher that handles down-casting.
class WatcherInterface : public XdsClient::ResourceWatcherInterface {
public:
virtual void OnResourceChanged(ResourceType listener) = 0;
virtual void OnResourceChanged(
std::shared_ptr<const ResourceType> resource) = 0;
private:
// Get result from XdsClient generic watcher interface, perform
// down-casting, and invoke the caller's OnResourceChanged() method.
void OnGenericResourceChanged(
const XdsResourceType::ResourceData* resource) override {
OnResourceChanged(*static_cast<const ResourceType*>(resource));
std::shared_ptr<const XdsResourceType::ResourceData> resource)
override {
OnResourceChanged(
std::static_pointer_cast<const ResourceType>(std::move(resource)));
}
};
@ -75,12 +79,6 @@ class XdsResourceTypeImpl : public XdsResourceType {
return *static_cast<const ResourceType*>(r1) ==
*static_cast<const ResourceType*>(r2);
}
std::unique_ptr<ResourceData> CopyResource(
const ResourceData* resource) const override {
return std::make_unique<ResourceType>(
*static_cast<const ResourceType*>(resource));
}
};
} // namespace grpc_core

@ -1036,20 +1036,20 @@ absl::optional<XdsRouteConfigResource::Route> ParseRoute(
} // namespace
XdsRouteConfigResource XdsRouteConfigResource::Parse(
std::shared_ptr<const XdsRouteConfigResource> XdsRouteConfigResource::Parse(
const XdsResourceType::DecodeContext& context,
const envoy_config_route_v3_RouteConfiguration* route_config,
ValidationErrors* errors) {
XdsRouteConfigResource rds_update;
auto rds_update = std::make_shared<XdsRouteConfigResource>();
// Get the cluster spcifier plugin map.
if (XdsRlsEnabled()) {
rds_update.cluster_specifier_plugin_map =
rds_update->cluster_specifier_plugin_map =
ClusterSpecifierPluginParse(context, route_config, errors);
}
// Build a set of configured cluster_specifier_plugin names to make sure
// each is actually referenced by a route action.
std::set<absl::string_view> cluster_specifier_plugins_not_seen;
for (auto& plugin : rds_update.cluster_specifier_plugin_map) {
for (auto& plugin : rds_update->cluster_specifier_plugin_map) {
cluster_specifier_plugins_not_seen.emplace(plugin.first);
}
// Get the virtual hosts.
@ -1060,9 +1060,9 @@ XdsRouteConfigResource XdsRouteConfigResource::Parse(
for (size_t i = 0; i < num_virtual_hosts; ++i) {
ValidationErrors::ScopedField field(
errors, absl::StrCat(".virtual_hosts[", i, "]"));
rds_update.virtual_hosts.emplace_back();
rds_update->virtual_hosts.emplace_back();
XdsRouteConfigResource::VirtualHost& vhost =
rds_update.virtual_hosts.back();
rds_update->virtual_hosts.back();
// Parse domains.
size_t domain_size;
upb_StringView const* domains = envoy_config_route_v3_VirtualHost_domains(
@ -1111,7 +1111,7 @@ XdsRouteConfigResource XdsRouteConfigResource::Parse(
for (size_t j = 0; j < num_routes; ++j) {
ValidationErrors::ScopedField field(errors, absl::StrCat("[", j, "]"));
auto route = ParseRoute(context, routes[j], virtual_host_retry_policy,
rds_update.cluster_specifier_plugin_map,
rds_update->cluster_specifier_plugin_map,
&cluster_specifier_plugins_not_seen, errors);
if (route.has_value()) vhost.routes.emplace_back(std::move(*route));
}
@ -1119,7 +1119,7 @@ XdsRouteConfigResource XdsRouteConfigResource::Parse(
// For cluster specifier plugins that were not used in any route action,
// delete them from the update, since they will never be used.
for (auto& unused_plugin : cluster_specifier_plugins_not_seen) {
rds_update.cluster_specifier_plugin_map.erase(std::string(unused_plugin));
rds_update->cluster_specifier_plugin_map.erase(std::string(unused_plugin));
}
return rds_update;
}
@ -1177,10 +1177,9 @@ XdsResourceType::DecodeResult XdsRouteConfigResourceType::Decode(
if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
gpr_log(GPR_INFO, "[xds_client %p] parsed RouteConfiguration %s: %s",
context.client, result.name->c_str(),
rds_update.ToString().c_str());
rds_update->ToString().c_str());
}
result.resource =
std::make_unique<XdsRouteConfigResource>(std::move(rds_update));
result.resource = std::move(rds_update);
}
return result;
}

@ -220,7 +220,7 @@ struct XdsRouteConfigResource : public XdsResourceType::ResourceData {
}
std::string ToString() const;
static XdsRouteConfigResource Parse(
static std::shared_ptr<const XdsRouteConfigResource> Parse(
const XdsResourceType::DecodeContext& context,
const envoy_config_route_v3_RouteConfiguration* route_config,
ValidationErrors* errors);

@ -150,7 +150,8 @@ class XdsServerConfigFetcher::ListenerWatcher
xds_client_.reset(DEBUG_LOCATION, "ListenerWatcher");
}
void OnResourceChanged(XdsListenerResource listener) override;
void OnResourceChanged(
std::shared_ptr<const XdsListenerResource> listener) override;
void OnError(absl::Status status) override;
@ -236,7 +237,9 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager
class RouteConfigWatcher;
struct RdsUpdateState {
RouteConfigWatcher* watcher;
absl::optional<absl::StatusOr<XdsRouteConfigResource>> rds_update;
absl::optional<
absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>>
rds_update;
};
class XdsServerConfigSelector;
@ -249,8 +252,9 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager
// Helper functions invoked by RouteConfigWatcher when there are updates to
// RDS resources.
void OnRouteConfigChanged(const std::string& resource_name,
XdsRouteConfigResource route_config);
void OnRouteConfigChanged(
const std::string& resource_name,
std::shared_ptr<const XdsRouteConfigResource> route_config);
void OnError(const std::string& resource_name, absl::Status status);
void OnResourceDoesNotExist(const std::string& resource_name);
@ -258,6 +262,9 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager
// This ref is only kept around till the FilterChainMatchManager becomes
// ready.
RefCountedPtr<ListenerWatcher> listener_watcher_;
// TODO(roth): Consider holding a ref to the LDS resource and storing
// a pointer to the filter chain data within that LDS resource, rather
// than copying the filter chain data here.
XdsListenerResource::FilterChainMap filter_chain_map_;
absl::optional<XdsListenerResource::FilterChainData> default_filter_chain_;
Mutex mu_;
@ -284,7 +291,8 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
: resource_name_(std::move(resource_name)),
filter_chain_match_manager_(std::move(filter_chain_match_manager)) {}
void OnResourceChanged(XdsRouteConfigResource route_config) override {
void OnResourceChanged(
std::shared_ptr<const XdsRouteConfigResource> route_config) override {
filter_chain_match_manager_->OnRouteConfigChanged(resource_name_,
std::move(route_config));
}
@ -311,7 +319,7 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
public:
static absl::StatusOr<RefCountedPtr<XdsServerConfigSelector>> Create(
const XdsHttpFilterRegistry& http_filter_registry,
XdsRouteConfigResource rds_update,
std::shared_ptr<const XdsRouteConfigResource> rds_update,
const std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>&
http_filters);
~XdsServerConfigSelector() override = default;
@ -324,6 +332,9 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
struct Route {
// true if an action other than kNonForwardingAction is configured.
bool unsupported_action;
// TODO(roth): Consider holding a ref to the RDS resource and storing
// a pointer to the matchers within that RDS resource, rather than
// copying the matchers here.
XdsRouteConfigResource::Route::Matchers matchers;
RefCountedPtr<ServiceConfig> method_config;
};
@ -376,7 +387,8 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
public:
StaticXdsServerConfigSelectorProvider(
RefCountedPtr<GrpcXdsClient> xds_client,
absl::StatusOr<XdsRouteConfigResource> static_resource,
absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
static_resource,
std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
http_filters)
: xds_client_(std::move(xds_client)),
@ -407,7 +419,11 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
private:
RefCountedPtr<GrpcXdsClient> xds_client_;
absl::StatusOr<XdsRouteConfigResource> static_resource_;
absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
static_resource_;
// TODO(roth): Consider holding a ref to the LDS resource and storing
// a pointer to the HTTP filters within that LDS resource, rather than
// copying the HTTP filters here.
std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
http_filters_;
std::unique_ptr<ServerConfigSelectorProvider::ServerConfigSelectorWatcher>
@ -422,7 +438,8 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
public:
DynamicXdsServerConfigSelectorProvider(
RefCountedPtr<GrpcXdsClient> xds_client, std::string resource_name,
absl::StatusOr<XdsRouteConfigResource> initial_resource,
absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
initial_resource,
std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
http_filters);
@ -440,19 +457,24 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
private:
class RouteConfigWatcher;
void OnRouteConfigChanged(XdsRouteConfigResource rds_update);
void OnRouteConfigChanged(
std::shared_ptr<const XdsRouteConfigResource> rds_update);
void OnError(absl::Status status);
void OnResourceDoesNotExist();
RefCountedPtr<GrpcXdsClient> xds_client_;
std::string resource_name_;
// TODO(roth): Consider holding a ref to the LDS resource and storing
// a pointer to the HTTP filters within that LDS resource, rather than
// copying the HTTP filters here.
std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
http_filters_;
RouteConfigWatcher* route_config_watcher_ = nullptr;
Mutex mu_;
std::unique_ptr<ServerConfigSelectorProvider::ServerConfigSelectorWatcher>
watcher_ ABSL_GUARDED_BY(mu_);
absl::StatusOr<XdsRouteConfigResource> resource_ ABSL_GUARDED_BY(mu_);
absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> resource_
ABSL_GUARDED_BY(mu_);
};
// A watcher implementation for updating the RDS resource used by
@ -465,7 +487,8 @@ class XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
WeakRefCountedPtr<DynamicXdsServerConfigSelectorProvider> parent)
: parent_(std::move(parent)) {}
void OnResourceChanged(XdsRouteConfigResource route_config) override {
void OnResourceChanged(
std::shared_ptr<const XdsRouteConfigResource> route_config) override {
parent_->OnRouteConfigChanged(std::move(route_config));
}
@ -551,14 +574,14 @@ XdsServerConfigFetcher::ListenerWatcher::ListenerWatcher(
listening_address_(std::move(listening_address)) {}
void XdsServerConfigFetcher::ListenerWatcher::OnResourceChanged(
XdsListenerResource listener) {
std::shared_ptr<const XdsListenerResource> listener) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_server_config_fetcher_trace)) {
gpr_log(GPR_INFO,
"[ListenerWatcher %p] Received LDS update from xds client %p: %s",
this, xds_client_.get(), listener.ToString().c_str());
this, xds_client_.get(), listener->ToString().c_str());
}
auto* tcp_listener =
absl::get_if<XdsListenerResource::TcpListener>(&listener.listener);
absl::get_if<XdsListenerResource::TcpListener>(&listener->listener);
if (tcp_listener == nullptr) {
MutexLock lock(&mu_);
OnFatalError(
@ -573,8 +596,7 @@ void XdsServerConfigFetcher::ListenerWatcher::OnResourceChanged(
}
auto new_filter_chain_match_manager = MakeRefCounted<FilterChainMatchManager>(
xds_client_->Ref(DEBUG_LOCATION, "FilterChainMatchManager"),
std::move(tcp_listener->filter_chain_map),
std::move(tcp_listener->default_filter_chain));
tcp_listener->filter_chain_map, tcp_listener->default_filter_chain);
MutexLock lock(&mu_);
if (filter_chain_match_manager_ == nullptr ||
!(new_filter_chain_match_manager->filter_chain_map() ==
@ -824,8 +846,9 @@ XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
}
void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
OnRouteConfigChanged(const std::string& resource_name,
XdsRouteConfigResource route_config) {
OnRouteConfigChanged(
const std::string& resource_name,
std::shared_ptr<const XdsRouteConfigResource> route_config) {
RefCountedPtr<ListenerWatcher> listener_watcher;
{
MutexLock lock(&mu_);
@ -1091,7 +1114,8 @@ absl::StatusOr<ChannelArgs> XdsServerConfigFetcher::ListenerWatcher::
filter_chain->http_connection_manager.route_config,
// RDS resource name
[&](const std::string& rds_name) {
absl::StatusOr<XdsRouteConfigResource> initial_resource;
absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
initial_resource;
{
MutexLock lock(&mu_);
initial_resource = rds_map_[rds_name].rds_update.value();
@ -1104,7 +1128,7 @@ absl::StatusOr<ChannelArgs> XdsServerConfigFetcher::ListenerWatcher::
filter_chain->http_connection_manager.http_filters);
},
// inline RouteConfig
[&](const XdsRouteConfigResource& route_config) {
[&](const std::shared_ptr<const XdsRouteConfigResource>& route_config) {
server_config_selector_provider =
MakeRefCounted<StaticXdsServerConfigSelectorProvider>(
xds_client_->Ref(DEBUG_LOCATION,
@ -1140,19 +1164,19 @@ absl::StatusOr<
XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
XdsServerConfigSelector::Create(
const XdsHttpFilterRegistry& http_filter_registry,
XdsRouteConfigResource rds_update,
std::shared_ptr<const XdsRouteConfigResource> rds_update,
const std::vector<
XdsListenerResource::HttpConnectionManager::HttpFilter>&
http_filters) {
auto config_selector = MakeRefCounted<XdsServerConfigSelector>();
for (auto& vhost : rds_update.virtual_hosts) {
for (auto& vhost : rds_update->virtual_hosts) {
config_selector->virtual_hosts_.emplace_back();
auto& virtual_host = config_selector->virtual_hosts_.back();
virtual_host.domains = std::move(vhost.domains);
virtual_host.domains = vhost.domains;
for (auto& route : vhost.routes) {
virtual_host.routes.emplace_back();
auto& config_selector_route = virtual_host.routes.back();
config_selector_route.matchers = std::move(route.matchers);
config_selector_route.matchers = route.matchers;
config_selector_route.unsupported_action =
absl::get_if<XdsRouteConfigResource::Route::NonForwardingAction>(
&route.action) == nullptr;
@ -1234,7 +1258,8 @@ XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
DynamicXdsServerConfigSelectorProvider::
DynamicXdsServerConfigSelectorProvider(
RefCountedPtr<GrpcXdsClient> xds_client, std::string resource_name,
absl::StatusOr<XdsRouteConfigResource> initial_resource,
absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
initial_resource,
std::vector<XdsListenerResource::HttpConnectionManager::HttpFilter>
http_filters)
: xds_client_(std::move(xds_client)),
@ -1264,7 +1289,7 @@ XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
std::unique_ptr<
ServerConfigSelectorProvider::ServerConfigSelectorWatcher>
watcher) {
absl::StatusOr<XdsRouteConfigResource> resource;
absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> resource;
{
MutexLock lock(&mu_);
GPR_ASSERT(watcher_ == nullptr);
@ -1288,7 +1313,7 @@ void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
void XdsServerConfigFetcher::ListenerWatcher::FilterChainMatchManager::
DynamicXdsServerConfigSelectorProvider::OnRouteConfigChanged(
XdsRouteConfigResource rds_update) {
std::shared_ptr<const XdsRouteConfigResource> rds_update) {
MutexLock lock(&mu_);
resource_ = std::move(rds_update);
if (watcher_ == nullptr) {

@ -147,10 +147,11 @@ class Fuzzer {
: resource_name_(std::move(resource_name)) {}
void OnResourceChanged(
typename ResourceType::ResourceType resource) override {
std::shared_ptr<const typename ResourceType::ResourceType> resource)
override {
gpr_log(GPR_INFO, "==> OnResourceChanged(%s %s): %s",
std::string(ResourceType::Get()->type_url()).c_str(),
resource_name_.c_str(), resource.ToString().c_str());
resource_name_.c_str(), resource->ToString().c_str());
}
void OnError(absl::Status status) override {

@ -252,24 +252,26 @@ class XdsClientTest : public ::testing::Test {
return !queue_.empty();
}
absl::optional<ResourceStruct> WaitForNextResource(
std::shared_ptr<const ResourceStruct> WaitForNextResource(
absl::Duration timeout = absl::Seconds(1),
SourceLocation location = SourceLocation()) {
MutexLock lock(&mu_);
if (!WaitForEventLocked(timeout)) return absl::nullopt;
if (!WaitForEventLocked(timeout)) return nullptr;
Event& event = queue_.front();
if (!absl::holds_alternative<ResourceStruct>(event)) {
if (!absl::holds_alternative<std::shared_ptr<const ResourceStruct>>(
event)) {
EXPECT_TRUE(false)
<< "got unexpected event "
<< (absl::holds_alternative<absl::Status>(event)
? "error"
: "does-not-exist")
<< " at " << location.file() << ":" << location.line();
return absl::nullopt;
return nullptr;
}
ResourceStruct foo = std::move(absl::get<ResourceStruct>(event));
auto foo =
std::move(absl::get<std::shared_ptr<const ResourceStruct>>(event));
queue_.pop_front();
return std::move(foo);
return foo;
}
absl::optional<absl::Status> WaitForNextError(
@ -281,7 +283,8 @@ class XdsClientTest : public ::testing::Test {
if (!absl::holds_alternative<absl::Status>(event)) {
EXPECT_TRUE(false)
<< "got unexpected event "
<< (absl::holds_alternative<ResourceStruct>(event)
<< (absl::holds_alternative<
std::shared_ptr<const ResourceStruct>>(event)
? "resource"
: "does-not-exist")
<< " at " << location.file() << ":" << location.line();
@ -300,8 +303,8 @@ class XdsClientTest : public ::testing::Test {
if (!absl::holds_alternative<DoesNotExist>(event)) {
EXPECT_TRUE(false)
<< "got unexpected event "
<< (absl::holds_alternative<ResourceStruct>(event) ? "resource"
: "error")
<< (absl::holds_alternative<absl::Status>(event) ? "error"
: "resource")
<< " at " << location.file() << ":" << location.line();
return false;
}
@ -311,9 +314,11 @@ class XdsClientTest : public ::testing::Test {
private:
struct DoesNotExist {};
using Event = absl::variant<ResourceStruct, absl::Status, DoesNotExist>;
using Event = absl::variant<std::shared_ptr<const ResourceStruct>,
absl::Status, DoesNotExist>;
void OnResourceChanged(ResourceStruct foo) override {
void OnResourceChanged(
std::shared_ptr<const ResourceStruct> foo) override {
MutexLock lock(&mu_);
queue_.push_back(std::move(foo));
cv_.Signal();
@ -758,7 +763,7 @@ TEST_F(XdsClientTest, BasicWatch) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -799,7 +804,7 @@ TEST_F(XdsClientTest, UpdateFromServer) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -818,7 +823,7 @@ TEST_F(XdsClientTest, UpdateFromServer) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 9);
// XdsClient should have sent an ACK message to the xDS server.
@ -859,7 +864,7 @@ TEST_F(XdsClientTest, MultipleWatchersForSameResource) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -874,7 +879,7 @@ TEST_F(XdsClientTest, MultipleWatchersForSameResource) {
// This watcher should get an immediate notification, because the
// resource is already cached.
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// Server should not have seen another request from the client.
@ -888,11 +893,11 @@ TEST_F(XdsClientTest, MultipleWatchersForSameResource) {
.Serialize());
// XdsClient should deliver the response to both watchers.
resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 9);
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 9);
// XdsClient should have sent an ACK message to the xDS server.
@ -937,7 +942,7 @@ TEST_F(XdsClientTest, SubscribeToMultipleResources) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -965,7 +970,7 @@ TEST_F(XdsClientTest, SubscribeToMultipleResources) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo2");
EXPECT_EQ(resource->value, 7);
// XdsClient should have sent an ACK message to the xDS server.
@ -1014,7 +1019,7 @@ TEST_F(XdsClientTest, UpdateContainsOnlyChangedResource) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -1042,7 +1047,7 @@ TEST_F(XdsClientTest, UpdateContainsOnlyChangedResource) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo2");
EXPECT_EQ(resource->value, 7);
// XdsClient should have sent an ACK message to the xDS server.
@ -1061,7 +1066,7 @@ TEST_F(XdsClientTest, UpdateContainsOnlyChangedResource) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 9);
// XdsClient should have sent an ACK message to the xDS server.
@ -1149,11 +1154,11 @@ TEST_F(XdsClientTest, ResourceValidationFailure) {
.Serialize());
// XdsClient should deliver the response to both watchers.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 9);
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 9);
// XdsClient should have sent an ACK message to the xDS server.
@ -1258,7 +1263,7 @@ TEST_F(XdsClientTest, ResourceValidationFailureMultipleResources) {
EXPECT_FALSE(watcher2->HasEvent());
// It will delivery a valid resource update for foo4.
auto resource = watcher4->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo4");
EXPECT_EQ(resource->value, 5);
// XdsClient should NACK the update.
@ -1321,7 +1326,7 @@ TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -1371,7 +1376,7 @@ TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) {
// another option is to send the errors even for newly started watchers.
auto watcher2 = StartFooWatch("foo1");
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// Cancel watches.
@ -1408,7 +1413,7 @@ TEST_F(XdsClientTest, WildcardCapableResponseWithEmptyResource) {
.Serialize());
// XdsClient will delivery a valid resource update for wc1.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "wc1");
EXPECT_EQ(resource->value, 6);
// XdsClient should NACK the update.
@ -1458,7 +1463,7 @@ TEST_F(XdsClientTest, ResourceDeletion) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "wc1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -1497,11 +1502,11 @@ TEST_F(XdsClientTest, ResourceDeletion) {
.Serialize());
// XdsClient should have delivered the response to the watchers.
resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "wc1");
EXPECT_EQ(resource->value, 7);
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "wc1");
EXPECT_EQ(resource->value, 7);
// XdsClient should have sent an ACK message to the xDS server.
@ -1545,7 +1550,7 @@ TEST_F(XdsClientTest, ResourceDeletionIgnoredWhenConfigured) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "wc1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -1569,7 +1574,7 @@ TEST_F(XdsClientTest, ResourceDeletionIgnoredWhenConfigured) {
// receive the cached resource.
auto watcher2 = StartWildcardCapableWatch("wc1");
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "wc1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -1588,11 +1593,11 @@ TEST_F(XdsClientTest, ResourceDeletionIgnoredWhenConfigured) {
.Serialize());
// XdsClient should have delivered the response to the watchers.
resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "wc1");
EXPECT_EQ(resource->value, 7);
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "wc1");
EXPECT_EQ(resource->value, 7);
// XdsClient should have sent an ACK message to the xDS server.
@ -1634,7 +1639,7 @@ TEST_F(XdsClientTest, StreamClosedByServer) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -1668,7 +1673,7 @@ TEST_F(XdsClientTest, StreamClosedByServer) {
// resource.
auto watcher2 = StartFooWatch("foo1");
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// Server now sends the requested resource.
@ -1741,7 +1746,7 @@ TEST_F(XdsClientTest, StreamClosedByServerWithoutSeeingResponse) {
.Serialize());
// Watcher gets the resource.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient sends an ACK.
@ -1816,11 +1821,11 @@ TEST_F(XdsClientTest, ConnectionFails) {
.Serialize());
// XdsClient should have delivered the response to the watchers.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -1870,11 +1875,11 @@ TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) {
.Serialize());
// XdsClient should have delivered the response to the watchers.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -1942,7 +1947,7 @@ TEST_F(XdsClientTest, ResourceDoesNotExistAfterStreamRestart) {
.Serialize());
// The resource is delivered to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient sends an ACK.
@ -1998,7 +2003,7 @@ TEST_F(XdsClientTest, DoesNotExistTimerNotStartedUntilSendCompletes) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -2051,7 +2056,7 @@ TEST_F(XdsClientTest,
.Serialize());
// XdsClient should have delivered the response to the watchers.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -2090,12 +2095,12 @@ TEST_F(XdsClientTest,
// has not changed, since the previous value was removed from the
// cache when we unsubscribed.
resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// For foo2, the watcher should receive notification for the new resource.
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo2");
EXPECT_EQ(resource->value, 7);
// Now we finally tell XdsClient that its previous send_message op is
@ -2147,7 +2152,7 @@ TEST_F(XdsClientTest, DoNotSendDoesNotExistForCachedResource) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -2225,7 +2230,7 @@ TEST_F(XdsClientTest, ResourceWrappedInResourceMessage) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -2266,7 +2271,7 @@ TEST_F(XdsClientTest, MultipleResourceTypes) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -2296,7 +2301,7 @@ TEST_F(XdsClientTest, MultipleResourceTypes) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource2 = watcher2->WaitForNextResource();
ASSERT_TRUE(resource2.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource2->name, "bar1");
EXPECT_EQ(resource2->value, "whee");
// XdsClient should have sent an ACK message to the xDS server.
@ -2353,7 +2358,7 @@ TEST_F(XdsClientTest, Federation) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -2389,7 +2394,7 @@ TEST_F(XdsClientTest, Federation) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, kXdstpResourceName);
EXPECT_EQ(resource->value, 3);
// XdsClient should have sent an ACK message to the xDS server.
@ -2439,7 +2444,7 @@ TEST_F(XdsClientTest, FederationAuthorityDefaultsToTopLevelXdsServer) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -2471,7 +2476,7 @@ TEST_F(XdsClientTest, FederationAuthorityDefaultsToTopLevelXdsServer) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, kXdstpResourceName);
EXPECT_EQ(resource->value, 3);
// XdsClient should have sent an ACK message to the xDS server.
@ -2560,7 +2565,7 @@ TEST_F(XdsClientTest, FederationDisabledWithNewStyleName) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, kXdstpResourceName);
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -2609,7 +2614,7 @@ TEST_F(XdsClientTest, FederationChannelFailureReportedToWatchers) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
@ -2645,7 +2650,7 @@ TEST_F(XdsClientTest, FederationChannelFailureReportedToWatchers) {
.Serialize());
// XdsClient should have delivered the response to the watcher.
resource = watcher2->WaitForNextResource();
ASSERT_TRUE(resource.has_value());
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, kXdstpResourceName);
EXPECT_EQ(resource->value, 3);
// XdsClient should have sent an ACK message to the xDS server.

@ -158,7 +158,8 @@ TEST_F(XdsClusterTest, MinimumValidConfig) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsClusterResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
auto* eds = absl::get_if<XdsClusterResource::Eds>(&resource.type);
ASSERT_NE(eds, nullptr);
EXPECT_EQ(eds->eds_service_name, "");
@ -190,7 +191,8 @@ TEST_F(ClusterTypeTest, EdsConfigSourceAds) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsClusterResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
auto* eds = absl::get_if<XdsClusterResource::Eds>(&resource.type);
ASSERT_NE(eds, nullptr);
EXPECT_EQ(eds->eds_service_name, "");
@ -211,7 +213,8 @@ TEST_F(ClusterTypeTest, EdsServiceName) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsClusterResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
auto* eds = absl::get_if<XdsClusterResource::Eds>(&resource.type);
ASSERT_NE(eds, nullptr);
EXPECT_EQ(eds->eds_service_name, "bar");
@ -337,7 +340,8 @@ TEST_F(ClusterTypeTest, LogicalDnsValid) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsClusterResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
auto* logical_dns =
absl::get_if<XdsClusterResource::LogicalDns>(&resource.type);
ASSERT_NE(logical_dns, nullptr);
@ -571,7 +575,8 @@ TEST_F(ClusterTypeTest, AggregateClusterValid) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsClusterResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
auto* aggregate = absl::get_if<XdsClusterResource::Aggregate>(&resource.type);
ASSERT_NE(aggregate, nullptr);
EXPECT_THAT(aggregate->prioritized_cluster_names,
@ -647,7 +652,8 @@ TEST_F(LbPolicyTest, EnumLbPolicyRingHash) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsClusterResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_EQ(JsonDump(Json::FromArray(resource.lb_policy_config)),
"[{\"ring_hash_experimental\":{"
"\"maxRingSize\":8388608,\"minRingSize\":1024}}]");
@ -670,7 +676,8 @@ TEST_F(LbPolicyTest, EnumLbPolicyRingHashSetMinAndMaxRingSize) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsClusterResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_EQ(JsonDump(Json::FromArray(resource.lb_policy_config)),
"[{\"ring_hash_experimental\":{"
"\"maxRingSize\":4096,\"minRingSize\":2048}}]");
@ -825,7 +832,8 @@ TEST_F(LbPolicyTest, LoadBalancingPolicyField) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsClusterResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_EQ(JsonDump(Json::FromArray(resource.lb_policy_config)),
"[{\"xds_wrr_locality_experimental\":{"
"\"childPolicy\":[{\"round_robin\":{}}]}}]");
@ -921,7 +929,8 @@ TEST_F(TlsConfigTest, MinimumValidConfig) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsClusterResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_EQ(resource.common_tls_context.certificate_validation_context
.ca_certificate_provider_instance.instance_name,
"provider1");
@ -1092,7 +1101,8 @@ TEST_F(LrsTest, Valid) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsClusterResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
ASSERT_TRUE(resource.lrs_load_reporting_server.has_value());
EXPECT_EQ(*resource.lrs_load_reporting_server,
xds_client_->bootstrap().server());
@ -1147,7 +1157,8 @@ TEST_F(CircuitBreakingTest, Valid) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsClusterResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_EQ(resource.max_concurrent_requests, 1701);
}
@ -1167,7 +1178,8 @@ TEST_F(CircuitBreakingTest, NoDefaultThreshold) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsClusterResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_EQ(resource.max_concurrent_requests, 1024); // Default.
}
@ -1186,7 +1198,8 @@ TEST_F(CircuitBreakingTest, DefaultThresholdWithMaxRequestsUnset) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsClusterResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_EQ(resource.max_concurrent_requests, 1024); // Default.
}
@ -1210,7 +1223,8 @@ TEST_F(OutlierDetectionTest, DefaultValues) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsClusterResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
ASSERT_TRUE(resource.outlier_detection.has_value());
EXPECT_EQ(*resource.outlier_detection, OutlierDetectionConfig());
}
@ -1241,7 +1255,8 @@ TEST_F(OutlierDetectionTest, AllFieldsSet) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsClusterResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
ASSERT_TRUE(resource.outlier_detection.has_value());
EXPECT_EQ(resource.outlier_detection->interval, Duration::Seconds(1));
EXPECT_EQ(resource.outlier_detection->base_ejection_time,
@ -1332,7 +1347,8 @@ TEST_F(HostOverrideStatusTest, IgnoredWhenNotEnabled) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsClusterResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_THAT(resource.override_host_statuses, ::testing::ElementsAre());
}
@ -1357,7 +1373,8 @@ TEST_F(HostOverrideStatusTest, PassesOnRelevantHealthStatuses) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsClusterResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_THAT(resource.override_host_statuses,
::testing::UnorderedElementsAre(
XdsHealthStatus(XdsHealthStatus::kUnknown),

@ -146,7 +146,8 @@ TEST_F(XdsEndpointTest, MinimumValidConfig) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsEndpointResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsEndpointResource&>(**decode_result.resource);
ASSERT_EQ(resource.priorities.size(), 1);
const auto& priority = resource.priorities[0];
ASSERT_EQ(priority.localities.size(), 1);
@ -192,7 +193,8 @@ TEST_F(XdsEndpointTest, EndpointWeight) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsEndpointResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsEndpointResource&>(**decode_result.resource);
ASSERT_EQ(resource.priorities.size(), 1);
const auto& priority = resource.priorities[0];
ASSERT_EQ(priority.localities.size(), 1);
@ -240,7 +242,8 @@ TEST_F(XdsEndpointTest, IgnoresLocalityWithNoWeight) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsEndpointResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsEndpointResource&>(**decode_result.resource);
ASSERT_EQ(resource.priorities.size(), 1);
const auto& priority = resource.priorities[0];
ASSERT_EQ(priority.localities.size(), 1);
@ -289,7 +292,8 @@ TEST_F(XdsEndpointTest, IgnoresLocalityWithZeroWeight) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsEndpointResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsEndpointResource&>(**decode_result.resource);
ASSERT_EQ(resource.priorities.size(), 1);
const auto& priority = resource.priorities[0];
ASSERT_EQ(priority.localities.size(), 1);
@ -329,7 +333,8 @@ TEST_F(XdsEndpointTest, LocalityWithNoEndpoints) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsEndpointResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsEndpointResource&>(**decode_result.resource);
ASSERT_EQ(resource.priorities.size(), 1);
const auto& priority = resource.priorities[0];
ASSERT_EQ(priority.localities.size(), 1);
@ -758,7 +763,8 @@ TEST_F(XdsEndpointTest, DropConfig) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsEndpointResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsEndpointResource&>(**decode_result.resource);
ASSERT_NE(resource.drop_config, nullptr);
const auto& drop_list = resource.drop_config->drop_category_list();
ASSERT_EQ(drop_list.size(), 3);
@ -796,7 +802,8 @@ TEST_F(XdsEndpointTest, CapsDropPercentageAt100) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsEndpointResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsEndpointResource&>(**decode_result.resource);
ASSERT_NE(resource.drop_config, nullptr);
const auto& drop_list = resource.drop_config->drop_category_list();
ASSERT_EQ(drop_list.size(), 1);
@ -935,7 +942,8 @@ TEST_F(XdsEndpointTest, IgnoresEndpointsInUnsupportedStates) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsEndpointResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsEndpointResource&>(**decode_result.resource);
ASSERT_EQ(resource.priorities.size(), 1);
const auto& priority = resource.priorities[0];
ASSERT_EQ(priority.localities.size(), 1);
@ -988,7 +996,8 @@ TEST_F(XdsEndpointTest, EndpointHealthStatus) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsEndpointResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsEndpointResource&>(**decode_result.resource);
ASSERT_EQ(resource.priorities.size(), 1);
const auto& priority = resource.priorities[0];
ASSERT_EQ(priority.localities.size(), 1);

@ -278,7 +278,8 @@ TEST_P(HttpConnectionManagerTest, MinimumValidConfig) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsListenerResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsListenerResource&>(**decode_result.resource);
auto http_connection_manager = GetHCMConfig(resource);
ASSERT_TRUE(http_connection_manager.has_value());
auto* rds_name =
@ -312,7 +313,8 @@ TEST_P(HttpConnectionManagerTest, RdsConfigSourceUsesAds) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsListenerResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsListenerResource&>(**decode_result.resource);
auto http_connection_manager = GetHCMConfig(resource);
ASSERT_TRUE(http_connection_manager.has_value());
auto* rds_name =
@ -425,7 +427,8 @@ TEST_P(HttpConnectionManagerTest, SetsMaxStreamDuration) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsListenerResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsListenerResource&>(**decode_result.resource);
auto http_connection_manager = GetHCMConfig(resource);
ASSERT_TRUE(http_connection_manager.has_value());
auto* rds_name =
@ -630,7 +633,8 @@ TEST_P(HttpConnectionManagerTest, HttpFilterTypeNotSupportedButOptional) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsListenerResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsListenerResource&>(**decode_result.resource);
auto http_connection_manager = GetHCMConfig(resource);
ASSERT_TRUE(http_connection_manager.has_value());
ASSERT_EQ(http_connection_manager->http_filters.size(), 1UL);
@ -767,7 +771,8 @@ TEST_F(HttpConnectionManagerClientOrServerOnlyTest,
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsListenerResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsListenerResource&>(**decode_result.resource);
auto* api_listener = absl::get_if<XdsListenerResource::HttpConnectionManager>(
&resource.listener);
ASSERT_NE(api_listener, nullptr);
@ -849,7 +854,8 @@ TEST_F(HttpConnectionManagerClientOrServerOnlyTest,
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsListenerResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsListenerResource&>(**decode_result.resource);
auto* tcp_listener =
absl::get_if<XdsListenerResource::TcpListener>(&resource.listener);
ASSERT_NE(tcp_listener, nullptr);
@ -959,7 +965,8 @@ TEST_F(TcpListenerTest, MinimumValidConfig) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsListenerResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsListenerResource&>(**decode_result.resource);
auto* tcp_listener =
absl::get_if<XdsListenerResource::TcpListener>(&resource.listener);
ASSERT_NE(tcp_listener, nullptr);
@ -1018,7 +1025,8 @@ TEST_F(TcpListenerTest, FilterChainMatchCriteria) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsListenerResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsListenerResource&>(**decode_result.resource);
auto* tcp_listener =
absl::get_if<XdsListenerResource::TcpListener>(&resource.listener);
ASSERT_NE(tcp_listener, nullptr);
@ -1479,7 +1487,8 @@ TEST_F(TcpListenerTest, DownstreamTlsContext) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsListenerResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsListenerResource&>(**decode_result.resource);
auto* tcp_listener =
absl::get_if<XdsListenerResource::TcpListener>(&resource.listener);
ASSERT_NE(tcp_listener, nullptr);
@ -1535,7 +1544,8 @@ TEST_F(TcpListenerTest, DownstreamTlsContextWithCaCertProviderInstance) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsListenerResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsListenerResource&>(**decode_result.resource);
auto* tcp_listener =
absl::get_if<XdsListenerResource::TcpListener>(&resource.listener);
ASSERT_NE(tcp_listener, nullptr);
@ -1595,7 +1605,8 @@ TEST_F(TcpListenerTest, ClientCertificateRequired) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource = static_cast<XdsListenerResource&>(**decode_result.resource);
auto& resource =
static_cast<const XdsListenerResource&>(**decode_result.resource);
auto* tcp_listener =
absl::get_if<XdsListenerResource::TcpListener>(&resource.listener);
ASSERT_NE(tcp_listener, nullptr);

@ -154,7 +154,7 @@ TEST_F(XdsRouteConfigTest, MinimumValidConfig) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
EXPECT_THAT(resource.cluster_specifier_plugin_map, ::testing::ElementsAre());
ASSERT_EQ(resource.virtual_hosts.size(), 1UL);
EXPECT_THAT(resource.virtual_hosts[0].domains, ::testing::ElementsAre("*"));
@ -208,7 +208,7 @@ TEST_F(VirtualHostTest, MultipleVirtualHosts) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
EXPECT_THAT(resource.cluster_specifier_plugin_map, ::testing::ElementsAre());
ASSERT_EQ(resource.virtual_hosts.size(), 2UL);
EXPECT_THAT(resource.virtual_hosts[0].domains, ::testing::ElementsAre("*"));
@ -413,7 +413,7 @@ TEST_P(TypedPerFilterConfigTest, Basic) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
auto& typed_per_filter_config = GetTypedPerFilterConfig(resource);
ASSERT_EQ(typed_per_filter_config.size(), 1UL);
auto it = typed_per_filter_config.begin();
@ -524,7 +524,7 @@ TEST_P(TypedPerFilterConfigTest, FilterConfigWrapper) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
auto& typed_per_filter_config = GetTypedPerFilterConfig(resource);
ASSERT_EQ(typed_per_filter_config.size(), 1UL);
auto it = typed_per_filter_config.begin();
@ -644,7 +644,7 @@ TEST_P(TypedPerFilterConfigTest,
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
auto& typed_per_filter_config = GetTypedPerFilterConfig(resource);
EXPECT_THAT(typed_per_filter_config, ::testing::ElementsAre());
}
@ -741,7 +741,7 @@ TEST_P(RetryPolicyTest, Empty) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
ASSERT_EQ(resource.virtual_hosts.size(), 1UL);
ASSERT_EQ(resource.virtual_hosts[0].routes.size(), 1UL);
auto& route = resource.virtual_hosts[0].routes[0];
@ -780,7 +780,7 @@ TEST_P(RetryPolicyTest, AllFields) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
ASSERT_EQ(resource.virtual_hosts.size(), 1UL);
ASSERT_EQ(resource.virtual_hosts[0].routes.size(), 1UL);
auto& route = resource.virtual_hosts[0].routes[0];
@ -817,7 +817,7 @@ TEST_P(RetryPolicyTest, MaxIntervalDefaultsTo10xBaseInterval) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
ASSERT_EQ(resource.virtual_hosts.size(), 1UL);
ASSERT_EQ(resource.virtual_hosts[0].routes.size(), 1UL);
auto& route = resource.virtual_hosts[0].routes[0];
@ -897,7 +897,7 @@ TEST_F(RetryPolicyOverrideTest, RoutePolicyOverridesVhostPolicy) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
ASSERT_EQ(resource.virtual_hosts.size(), 1UL);
ASSERT_EQ(resource.virtual_hosts[0].routes.size(), 1UL);
auto& route = resource.virtual_hosts[0].routes[0];
@ -990,7 +990,7 @@ TEST_F(RouteMatchTest, PathMatchers) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
ASSERT_EQ(resource.virtual_hosts.size(), 1UL);
auto& virtual_host = resource.virtual_hosts.front();
ASSERT_EQ(virtual_host.routes.size(), 3UL);
@ -1130,7 +1130,7 @@ TEST_F(RouteMatchTest, HeaderMatchers) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
ASSERT_EQ(resource.virtual_hosts.size(), 1UL);
auto& virtual_host = resource.virtual_hosts.front();
ASSERT_EQ(virtual_host.routes.size(), 1UL);
@ -1260,7 +1260,7 @@ TEST_F(RouteMatchTest, RuntimeFractionMatcher) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
ASSERT_EQ(resource.virtual_hosts.size(), 1UL);
auto& virtual_host = resource.virtual_hosts.front();
ASSERT_EQ(virtual_host.routes.size(), 4UL);
@ -1327,7 +1327,7 @@ TEST_F(MaxStreamDurationTest, GrpcTimeoutHeaderMax) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
ASSERT_EQ(resource.virtual_hosts.size(), 1UL);
ASSERT_EQ(resource.virtual_hosts[0].routes.size(), 1UL);
auto& route = resource.virtual_hosts[0].routes[0];
@ -1358,7 +1358,7 @@ TEST_F(MaxStreamDurationTest, MaxStreamDuration) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
ASSERT_EQ(resource.virtual_hosts.size(), 1UL);
ASSERT_EQ(resource.virtual_hosts[0].routes.size(), 1UL);
auto& route = resource.virtual_hosts[0].routes[0];
@ -1392,7 +1392,7 @@ TEST_F(MaxStreamDurationTest, PrefersGrpcTimeoutHeaderMaxToMaxStreamDuration) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
ASSERT_EQ(resource.virtual_hosts.size(), 1UL);
ASSERT_EQ(resource.virtual_hosts[0].routes.size(), 1UL);
auto& route = resource.virtual_hosts[0].routes[0];
@ -1503,7 +1503,7 @@ TEST_F(HashPolicyTest, ValidAndUnsupportedPolicies) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
ASSERT_EQ(resource.virtual_hosts.size(), 1UL);
ASSERT_EQ(resource.virtual_hosts[0].routes.size(), 1UL);
auto& route = resource.virtual_hosts[0].routes[0];
@ -1619,7 +1619,7 @@ TEST_F(WeightedClusterTest, Basic) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
ASSERT_EQ(resource.virtual_hosts.size(), 1UL);
ASSERT_EQ(resource.virtual_hosts[0].routes.size(), 1UL);
auto& route = resource.virtual_hosts[0].routes[0];
@ -1744,7 +1744,7 @@ TEST_F(RlsTest, Basic) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
EXPECT_THAT(
resource.cluster_specifier_plugin_map,
::testing::ElementsAre(::testing::Pair(
@ -1798,7 +1798,7 @@ TEST_F(RlsTest, PluginDefinedButNotUsed) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
EXPECT_THAT(resource.cluster_specifier_plugin_map, ::testing::ElementsAre());
ASSERT_EQ(resource.virtual_hosts.size(), 1UL);
ASSERT_EQ(resource.virtual_hosts[0].routes.size(), 1UL);
@ -1847,7 +1847,7 @@ TEST_F(RlsTest, NotUsedInAllVirtualHosts) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
EXPECT_THAT(
resource.cluster_specifier_plugin_map,
::testing::ElementsAre(::testing::Pair(
@ -1917,7 +1917,7 @@ TEST_F(RlsTest, ClusterSpecifierPluginsIgnoredWhenNotEnabled) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
EXPECT_THAT(resource.cluster_specifier_plugin_map, ::testing::ElementsAre());
ASSERT_EQ(resource.virtual_hosts.size(), 1UL);
ASSERT_EQ(resource.virtual_hosts[0].routes.size(), 1UL);
@ -2051,7 +2051,7 @@ TEST_F(RlsTest, UnsupportedButOptionalClusterSpecifierPlugin) {
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<XdsRouteConfigResource&>(**decode_result.resource);
static_cast<const XdsRouteConfigResource&>(**decode_result.resource);
EXPECT_THAT(resource.cluster_specifier_plugin_map, ::testing::ElementsAre());
ASSERT_EQ(resource.virtual_hosts.size(), 1UL);
ASSERT_EQ(resource.virtual_hosts[0].routes.size(), 1UL);

Loading…
Cancel
Save