From bb6a6faa69ecc3291d341bc2257f4f6b15bcf4d5 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 6 Oct 2023 15:23:33 -0700 Subject: [PATCH] [SSA] support multiple addresses per endpoint (#34472) --- build_autogenerated.yaml | 1 + src/core/BUILD | 3 + .../lb_policy/xds/xds_override_host.cc | 362 ++++++++++------- .../stateful_session_filter.cc | 52 +-- .../stateful_session_filter.h | 25 +- .../lb_policy/lb_policy_test_lib.h | 29 +- .../lb_policy/xds_override_host_test.cc | 369 ++++++++++-------- test/cpp/end2end/xds/BUILD | 1 + .../xds/xds_override_host_end2end_test.cc | 64 ++- 9 files changed, 559 insertions(+), 347 deletions(-) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index f2370733b8b..fd30f5f7c0e 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -18199,6 +18199,7 @@ targets: build: test language: c++ headers: + - test/core/util/scoped_env_var.h - test/cpp/end2end/counted_service.h - test/cpp/end2end/test_service_impl.h - test/cpp/end2end/xds/xds_end2end_test_lib.h diff --git a/src/core/BUILD b/src/core/BUILD index 9e91dedfbd3..981c6941a2a 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -3985,6 +3985,7 @@ grpc_cc_library( "map", "pipe", "poll", + "ref_counted_string", "service_config_parser", "slice", "time", @@ -5185,6 +5186,7 @@ grpc_cc_library( "absl/status:statusor", "absl/strings", "absl/types:optional", + "absl/types:span", "absl/types:variant", ], language = "c++", @@ -5205,6 +5207,7 @@ grpc_cc_library( "lb_policy_registry", "match", "pollset_set", + "ref_counted_string", "resolved_address", "subchannel_interface", "validation_errors", diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc index a3916c781e3..f6934818d40 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc @@ -37,8 +37,11 @@ #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" +#include "absl/strings/str_join.h" +#include "absl/strings/str_split.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" +#include "absl/types/span.h" #include "absl/types/variant.h" #include @@ -57,6 +60,7 @@ #include "src/core/lib/gprpp/match.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/ref_counted_string.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/gprpp/work_serializer.h" @@ -124,8 +128,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { class SubchannelWrapper : public DelegatingSubchannel { public: SubchannelWrapper(RefCountedPtr subchannel, - RefCountedPtr policy, - absl::string_view key); + RefCountedPtr policy); ~SubchannelWrapper() override; @@ -141,6 +144,9 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { XdsOverrideHostLb* policy() { return policy_.get(); } + void set_key(absl::string_view key) { key_ = std::string(key); } + const absl::optional& key() const { return key_; } + private: class ConnectivityStateWatcher : public ConnectivityStateWatcherInterface { public: @@ -172,6 +178,60 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { GRPC_CHANNEL_IDLE}; }; + class SubchannelEntry { + public: + explicit SubchannelEntry(XdsHealthStatus eds_health_status) + : eds_health_status_(eds_health_status) {} + + void SetSubchannel(SubchannelWrapper* subchannel) { + if (eds_health_status_.status() == XdsHealthStatus::kDraining) { + subchannel_ = subchannel->Ref(); + } else { + subchannel_ = subchannel->WeakRef(); + } + } + + void UnsetSubchannel() { + subchannel_ = WeakRefCountedPtr(nullptr); + } + + SubchannelWrapper* GetSubchannel() const { + return Match( + subchannel_, + [](WeakRefCountedPtr + subchannel) { return subchannel.get(); }, + [](RefCountedPtr subchannel) { + return subchannel.get(); + }); + } + + void SetEdsHealthStatus(XdsHealthStatus eds_health_status) { + eds_health_status_ = eds_health_status; + auto subchannel = GetSubchannel(); + if (subchannel == nullptr) return; + if (eds_health_status_.status() == XdsHealthStatus::kDraining) { + subchannel_ = subchannel->Ref(); + } else { + subchannel_ = subchannel->WeakRef(); + } + } + + XdsHealthStatus eds_health_status() const { return eds_health_status_; } + + void set_address_list(RefCountedStringValue address_list) { + address_list_ = std::move(address_list); + } + + RefCountedStringValue address_list() const { return address_list_; } + + private: + absl::variant, + RefCountedPtr> + subchannel_; + XdsHealthStatus eds_health_status_; + RefCountedStringValue address_list_; + }; + // A picker that wraps the picker from the child for cases when cookie is // present. class Picker : public SubchannelPicker { @@ -210,7 +270,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { }; absl::optional PickOverridenHost( - absl::string_view override_host); + XdsOverrideHostAttribute* override_host_attr) const; RefCountedPtr policy_; RefCountedPtr picker_; @@ -231,53 +291,6 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { RefCountedPtr picker) override; }; - class SubchannelEntry { - public: - explicit SubchannelEntry(XdsHealthStatus eds_health_status) - : eds_health_status_(eds_health_status) {} - - void SetSubchannel(SubchannelWrapper* subchannel) { - if (eds_health_status_.status() == XdsHealthStatus::kDraining) { - subchannel_ = subchannel->Ref(); - } else { - subchannel_ = subchannel; - } - } - - void UnsetSubchannel() { subchannel_ = nullptr; } - - SubchannelWrapper* GetSubchannel() const { - return Match( - subchannel_, - [](XdsOverrideHostLb::SubchannelWrapper* subchannel) { - return subchannel; - }, - [](RefCountedPtr subchannel) { - return subchannel.get(); - }); - } - - void SetEdsHealthStatus(XdsHealthStatus eds_health_status) { - eds_health_status_ = eds_health_status; - auto subchannel = GetSubchannel(); - if (subchannel == nullptr) { - return; - } - if (eds_health_status_.status() == XdsHealthStatus::kDraining) { - subchannel_ = subchannel->Ref(); - } else { - subchannel_ = subchannel; - } - } - - XdsHealthStatus eds_health_status() const { return eds_health_status_; } - - private: - absl::variant> - subchannel_; - XdsHealthStatus eds_health_status_; - }; - ~XdsOverrideHostLb() override; void ShutdownLocked() override; @@ -296,12 +309,9 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { void UnsetSubchannel(absl::string_view key, SubchannelWrapper* subchannel); - RefCountedPtr GetSubchannelByAddress( - absl::string_view address, XdsHealthStatusSet overriden_health_statuses); - void OnSubchannelConnectivityStateChange(absl::string_view subchannel_key) - ABSL_NO_THREAD_SAFETY_ANALYSIS; // Called from within the worker - // serializer and does not require + ABSL_NO_THREAD_SAFETY_ANALYSIS; // Called from within the + // WorkSerializer and does not require // additional synchronization // Current config from the resolver. @@ -339,39 +349,89 @@ XdsOverrideHostLb::Picker::Picker( } absl::optional -XdsOverrideHostLb::Picker::PickOverridenHost(absl::string_view override_host) { - if (override_host.length() == 0) { - return absl::nullopt; - } - auto subchannel = policy_->GetSubchannelByAddress( - override_host, override_host_health_status_set_); - if (subchannel == nullptr) { - return absl::nullopt; - } - auto connectivity_state = subchannel->connectivity_state(); - if (connectivity_state == GRPC_CHANNEL_READY) { - return PickResult::Complete(subchannel->wrapped_subchannel()); - } else if (connectivity_state == GRPC_CHANNEL_CONNECTING) { +XdsOverrideHostLb::Picker::PickOverridenHost( + XdsOverrideHostAttribute* override_host_attr) const { + GPR_ASSERT(override_host_attr != nullptr); + auto cookie_address_list = override_host_attr->cookie_address_list(); + if (cookie_address_list.empty()) return absl::nullopt; + // The cookie has an address list, so look through the addresses in order. + RefCountedPtr idle_subchannel; + bool found_connecting = false; + { + MutexLock lock(&policy_->subchannel_map_mu_); + for (absl::string_view address : absl::StrSplit(cookie_address_list, ',')) { + RefCountedPtr subchannel; + auto it = policy_->subchannel_map_.find(address); + if (it != policy_->subchannel_map_.end()) { + subchannel = it->second.GetSubchannel()->RefIfNonZero(); + } + if (subchannel == nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, "Subchannel %s was not found", + std::string(address).c_str()); + } + continue; + } + if (!override_host_health_status_set_.Contains( + it->second.eds_health_status())) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, + "Subchannel %s health status is not overridden (%s)", + std::string(address).c_str(), + it->second.eds_health_status().ToString()); + } + continue; + } + auto connectivity_state = subchannel->connectivity_state(); + if (connectivity_state == GRPC_CHANNEL_READY) { + // Found a READY subchannel. Pass back the actual address list + // and return the subchannel. + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, "Picker override found READY subchannel %s", + std::string(address).c_str()); + } + override_host_attr->set_actual_address_list(it->second.address_list()); + return PickResult::Complete(subchannel->wrapped_subchannel()); + } else if (connectivity_state == GRPC_CHANNEL_IDLE) { + if (idle_subchannel == nullptr) idle_subchannel = std::move(subchannel); + } else if (connectivity_state == GRPC_CHANNEL_CONNECTING) { + found_connecting = true; + } + } + } + // No READY subchannel found. If we found an IDLE subchannel, trigger + // a connection attempt and queue the pick until that attempt completes. + if (idle_subchannel != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, "Picker override found IDLE subchannel"); + } + // Deletes itself after the connection is requested. + new SubchannelConnectionRequester(std::move(idle_subchannel)); return PickResult::Queue(); - } else if (connectivity_state == GRPC_CHANNEL_IDLE) { - // Deleted after the connection is requested - new SubchannelConnectionRequester(std::move(subchannel)); + } + // No READY or IDLE subchannels. If we found a CONNECTING subchannel, + // queue the pick and wait for the connection attempt to complete. + if (found_connecting) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, "Picker override found CONNECTING subchannel"); + } return PickResult::Queue(); } + // No READY, IDLE, or CONNECTING subchannels found. return absl::nullopt; } -LoadBalancingPolicy::PickResult XdsOverrideHostLb::Picker::Pick( - LoadBalancingPolicy::PickArgs args) { +LoadBalancingPolicy::PickResult XdsOverrideHostLb::Picker::Pick(PickArgs args) { auto* call_state = static_cast(args.call_state); - auto* override_host = static_cast( + auto* override_host_attr = static_cast( call_state->GetCallAttribute(XdsOverrideHostAttribute::TypeName())); - auto overridden_host_pick = - PickOverridenHost(override_host != nullptr ? override_host->host_name() - : absl::string_view()); - if (overridden_host_pick.has_value()) { - return std::move(*overridden_host_pick); + if (override_host_attr != nullptr) { + auto overridden_host_pick = PickOverridenHost(override_host_attr); + if (overridden_host_pick.has_value()) { + return std::move(*overridden_host_pick); + } } + // No usable override. Delegate to child picker. if (picker_ == nullptr) { // Should never happen. return PickResult::Fail(absl::InternalError( "xds_override_host picker not given any child picker")); @@ -379,9 +439,23 @@ LoadBalancingPolicy::PickResult XdsOverrideHostLb::Picker::Pick( auto result = picker_->Pick(args); auto complete_pick = absl::get_if(&result.result); if (complete_pick != nullptr) { - complete_pick->subchannel = - static_cast(complete_pick->subchannel.get()) - ->wrapped_subchannel(); + auto* wrapper = + static_cast(complete_pick->subchannel.get()); + // Populate the address list in the override host attribute so that + // the StatefulSession filter can set the cookie. + if (override_host_attr != nullptr) { + auto& key = wrapper->key(); + if (key.has_value()) { + MutexLock lock(&policy_->subchannel_map_mu_); + auto it = policy_->subchannel_map_.find(*key); + if (it != policy_->subchannel_map_.end()) { // Should always be true. + override_host_attr->set_actual_address_list( + it->second.address_list()); + } + } + } + // Unwrap the subchannel. + complete_pick->subchannel = wrapper->wrapped_subchannel(); } return result; } @@ -513,10 +587,16 @@ absl::StatusOr XdsOverrideHostLb::UpdateAddressMap( } return endpoints; } - // TODO(roth): As we clarify this part of the dualstack design, add - // support for multiple addresses per endpoint. - EndpointAddressesList return_value; - std::map addresses_for_map; + // Construct the list of addresses to pass to the child policy and a + // map of address info from which to update subchannel_map_. + EndpointAddressesList child_addresses; + struct AddressInfo { + XdsHealthStatus eds_health_status; + RefCountedStringValue address_list; + AddressInfo(XdsHealthStatus status, RefCountedStringValue addresses) + : eds_health_status(status), address_list(std::move(addresses)) {} + }; + std::map addresses_for_map; for (const auto& endpoint : *endpoints) { XdsHealthStatus status = GetEndpointHealthStatus(endpoint); if (status.status() != XdsHealthStatus::kDraining) { @@ -526,7 +606,7 @@ absl::StatusOr XdsOverrideHostLb::UpdateAddressMap( "passing to child", this, endpoint.ToString().c_str()); } - return_value.push_back(endpoint); + child_addresses.push_back(endpoint); } else if (!config_->override_host_status_set().Contains(status)) { // Skip draining hosts if not in the override status set. if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { @@ -537,16 +617,32 @@ absl::StatusOr XdsOverrideHostLb::UpdateAddressMap( } continue; } - auto key = grpc_sockaddr_to_uri(&endpoint.address()); - if (key.ok()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { - gpr_log(GPR_INFO, - "[xds_override_host_lb %p] endpoint %s: adding map key %s", - this, endpoint.ToString().c_str(), key->c_str()); + std::vector addresses; + addresses.reserve(endpoint.addresses().size()); + for (const auto& address : endpoint.addresses()) { + auto key = grpc_sockaddr_to_string(&address, /*normalize=*/false); + if (key.ok()) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, + "[xds_override_host_lb %p] endpoint %s: adding map key %s", + this, endpoint.ToString().c_str(), key->c_str()); + } + addresses.push_back(*std::move(key)); } - addresses_for_map.emplace(std::move(*key), status); + } + absl::Span addresses_span = addresses; + for (size_t i = 0; i < addresses.size(); ++i) { + std::string start = absl::StrJoin(addresses_span.subspan(0, i), ","); + std::string end = absl::StrJoin(addresses_span.subspan(i + 1), ","); + RefCountedStringValue address_list( + absl::StrCat(addresses[i], (start.empty() ? "" : ","), start, + (end.empty() ? "" : ","), end)); + addresses_for_map.emplace( + std::piecewise_construct, std::forward_as_tuple(addresses[i]), + std::forward_as_tuple(status, std::move(address_list))); } } + // Now grab the lock and update subchannel_map_ from addresses_for_map. { MutexLock lock(&subchannel_map_mu_); for (auto it = subchannel_map_.begin(); it != subchannel_map_.end();) { @@ -560,44 +656,55 @@ absl::StatusOr XdsOverrideHostLb::UpdateAddressMap( ++it; } } - for (const auto& key_status : addresses_for_map) { - auto it = subchannel_map_.find(key_status.first); + for (auto& p : addresses_for_map) { + const auto& address = p.first; + auto& address_info = p.second; + auto it = subchannel_map_.find(address); if (it == subchannel_map_.end()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { gpr_log(GPR_INFO, "[xds_override_host_lb %p] adding map key %s", this, - key_status.first.c_str()); + address.c_str()); } - subchannel_map_.emplace(std::piecewise_construct, - std::forward_as_tuple(key_status.first), - std::forward_as_tuple(key_status.second)); + it = subchannel_map_ + .emplace(std::piecewise_construct, + std::forward_as_tuple(address), + std::forward_as_tuple(address_info.eds_health_status)) + .first; } else { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { gpr_log(GPR_INFO, "[xds_override_host_lb %p] setting EDS health status for " "%s to %s", - this, key_status.first.c_str(), key_status.second.ToString()); + this, address.c_str(), + address_info.eds_health_status.ToString()); } - it->second.SetEdsHealthStatus(key_status.second); + it->second.SetEdsHealthStatus(address_info.eds_health_status); + } + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, + "[xds_override_host_lb %p] setting address list for %s to %s", + this, address.c_str(), address_info.address_list.c_str()); } + it->second.set_address_list(std::move(address_info.address_list)); } } - return return_value; + return child_addresses; } RefCountedPtr XdsOverrideHostLb::AdoptSubchannel( const grpc_resolved_address& address, RefCountedPtr subchannel) { - auto key = grpc_sockaddr_to_uri(&address); - if (!key.ok()) { - return subchannel; - } + auto key = grpc_sockaddr_to_string(&address, /*normalize=*/false); auto wrapper = - MakeRefCounted(std::move(subchannel), Ref(), *key); - MutexLock lock(&subchannel_map_mu_); - auto it = subchannel_map_.find(*key); - if (it != subchannel_map_.end()) { - it->second.SetSubchannel(wrapper.get()); + MakeRefCounted(std::move(subchannel), Ref()); + if (key.ok()) { + MutexLock lock(&subchannel_map_mu_); + auto it = subchannel_map_.find(*key); + if (it != subchannel_map_.end()) { + wrapper->set_key(*key); + it->second.SetSubchannel(wrapper.get()); + } } return wrapper; } @@ -613,29 +720,6 @@ void XdsOverrideHostLb::UnsetSubchannel(absl::string_view key, } } -RefCountedPtr -XdsOverrideHostLb::GetSubchannelByAddress( - absl::string_view address, XdsHealthStatusSet overriden_health_statuses) { - MutexLock lock(&subchannel_map_mu_); - auto it = subchannel_map_.find(address); - if (it == subchannel_map_.end() || it->second.GetSubchannel() == nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { - gpr_log(GPR_INFO, "Subchannel %s was not found", - std::string(address).c_str()); - } - return nullptr; - } - if (!overriden_health_statuses.Contains(it->second.eds_health_status())) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { - gpr_log(GPR_INFO, "Subchannel %s health status is not overridden (%s)", - std::string(address).c_str(), - it->second.eds_health_status().ToString()); - } - return nullptr; - } - return it->second.GetSubchannel()->Ref(); -} - void XdsOverrideHostLb::OnSubchannelConnectivityStateChange( absl::string_view subchannel_key) { auto it = subchannel_map_.find(subchannel_key); @@ -656,7 +740,7 @@ RefCountedPtr XdsOverrideHostLb::Helper::CreateSubchannel( const ChannelArgs& args) { auto subchannel = parent()->channel_control_helper()->CreateSubchannel( address, per_address_args, args); - return parent()->AdoptSubchannel(address, subchannel); + return parent()->AdoptSubchannel(address, std::move(subchannel)); } void XdsOverrideHostLb::Helper::UpdateState( @@ -677,10 +761,8 @@ void XdsOverrideHostLb::Helper::UpdateState( XdsOverrideHostLb::SubchannelWrapper::SubchannelWrapper( RefCountedPtr subchannel, - RefCountedPtr policy, absl::string_view key) - : DelegatingSubchannel(std::move(subchannel)), - key_(key), - policy_(std::move(policy)) { + RefCountedPtr policy) + : DelegatingSubchannel(std::move(subchannel)), policy_(std::move(policy)) { auto watcher = std::make_unique(WeakRef()); watcher_ = watcher.get(); wrapped_subchannel()->WatchConnectivityState(std::move(watcher)); diff --git a/src/core/ext/filters/stateful_session/stateful_session_filter.cc b/src/core/ext/filters/stateful_session/stateful_session_filter.cc index 315e7e5ae62..ee0351bd633 100644 --- a/src/core/ext/filters/stateful_session/stateful_session_filter.cc +++ b/src/core/ext/filters/stateful_session/stateful_session_filter.cc @@ -102,21 +102,18 @@ absl::string_view AllocateStringOnArena( // Adds the set-cookie header to the server initial metadata if needed. void MaybeUpdateServerInitialMetadata( const StatefulSessionMethodParsedConfig::CookieConfig* cookie_config, - bool cluster_changed, absl::string_view host_override, - absl::string_view actual_cluster, ServerMetadata* server_initial_metadata) { - // Get peer string. - Slice* peer_string = server_initial_metadata->get_pointer(PeerString()); - if (peer_string == nullptr) { - // No changes, keep the same set-cookie header. + bool cluster_changed, absl::string_view actual_cluster, + absl::string_view cookie_address_list, + XdsOverrideHostAttribute* override_host_attribute, + ServerMetadata* server_initial_metadata) { + // If cookie doesn't need to change, do nothing. + if (cookie_address_list == override_host_attribute->actual_address_list() && + !cluster_changed) { return; } - if (host_override == peer_string->as_string_view() && !cluster_changed) { - return; - } - std::string new_value(peer_string->as_string_view()); - if (!actual_cluster.empty()) { - absl::StrAppend(&new_value, ";", actual_cluster); - } + // Construct new cookie value. + std::string new_value = absl::StrCat( + override_host_attribute->actual_address_list(), ";", actual_cluster); std::vector parts = {absl::StrCat( *cookie_config->name, "=", absl::Base64Escape(new_value), "; HttpOnly")}; if (!cookie_config->path.empty()) { @@ -249,14 +246,16 @@ ArenaPromise StatefulSessionFilter::MakeCallPromise( // Cookie format is "host;cluster" std::pair host_cluster = absl::StrSplit(cookie_value, absl::MaxSplits(';', 1)); - absl::string_view host_override; - // Set override host attribute. Allocate the string on the - // arena, so that it has the right lifetime. + absl::string_view cookie_address_list; + // Allocate the string on the arena, so that it has the right lifetime. if (!host_cluster.first.empty()) { - host_override = AllocateStringOnArena(host_cluster.first); - service_config_call_data->SetCallAttribute( - GetContext()->New(host_override)); + cookie_address_list = AllocateStringOnArena(host_cluster.first); } + // Set override host attribute. + auto* override_host_attribute = + GetContext()->ManagedNew( + cookie_address_list); + service_config_call_data->SetCallAttribute(override_host_attribute); // Check if the cluster override is valid, and apply it if necessary. // Note that cluster_name will point to an arena-allocated string // that will still be alive when we see the server initial metadata. @@ -267,23 +266,24 @@ ArenaPromise StatefulSessionFilter::MakeCallPromise( bool cluster_changed = cluster_name != host_cluster.second; // Intercept server initial metadata. call_args.server_initial_metadata->InterceptAndMap( - [cookie_config, cluster_changed, host_override, - cluster_name](ServerMetadataHandle md) { + [cookie_config, cluster_changed, cluster_name, cookie_address_list, + override_host_attribute](ServerMetadataHandle md) { // Add cookie to server initial metadata if needed. MaybeUpdateServerInitialMetadata(cookie_config, cluster_changed, - host_override, cluster_name, md.get()); + cluster_name, cookie_address_list, + override_host_attribute, md.get()); return md; }); return Map(next_promise_factory(std::move(call_args)), - [cookie_config, cluster_changed, host_override, - cluster_name](ServerMetadataHandle md) { + [cookie_config, cluster_changed, cluster_name, cookie_address_list, + override_host_attribute](ServerMetadataHandle md) { // If we got a Trailers-Only response, then add the // cookie to the trailing metadata instead of the // initial metadata. if (md->get(GrpcTrailersOnly()).value_or(false)) { MaybeUpdateServerInitialMetadata( - cookie_config, cluster_changed, host_override, - cluster_name, md.get()); + cookie_config, cluster_changed, cluster_name, + cookie_address_list, override_host_attribute, md.get()); } return md; }); diff --git a/src/core/ext/filters/stateful_session/stateful_session_filter.h b/src/core/ext/filters/stateful_session/stateful_session_filter.h index 2fea7f47391..9942900191c 100644 --- a/src/core/ext/filters/stateful_session/stateful_session_filter.h +++ b/src/core/ext/filters/stateful_session/stateful_session_filter.h @@ -21,12 +21,15 @@ #include +#include + #include "absl/status/statusor.h" #include "absl/strings/string_view.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/channel/promise_based_filter.h" +#include "src/core/lib/gprpp/ref_counted_string.h" #include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/promise/arena_promise.h" #include "src/core/lib/service_config/service_config_call_data.h" @@ -34,20 +37,34 @@ namespace grpc_core { +// A call attribute to be passed to the xds_override_host LB policy. +// The StatefulSession filter will populate the cookie's address list, +// if set. The xds_override_host LB policy will use that info, and then +// set the actual address list based on the chosen endpoint. The +// StatefulSession filter will then use the actual address list to +// update the cookie. class XdsOverrideHostAttribute : public ServiceConfigCallData::CallAttributeInterface { public: static UniqueTypeName TypeName(); - explicit XdsOverrideHostAttribute(absl::string_view host_name) - : host_name_(host_name) {} + explicit XdsOverrideHostAttribute(absl::string_view cookie_address_list) + : cookie_address_list_(cookie_address_list) {} + + absl::string_view cookie_address_list() const { return cookie_address_list_; } - absl::string_view host_name() const { return host_name_; } + absl::string_view actual_address_list() const { + return actual_address_list_.as_string_view(); + } + void set_actual_address_list(RefCountedStringValue actual_address_list) { + actual_address_list_ = std::move(actual_address_list); + } private: UniqueTypeName type() const override { return TypeName(); } - absl::string_view host_name_; + absl::string_view cookie_address_list_; + RefCountedStringValue actual_address_list_; }; // A filter to provide cookie-based stateful session affinity. diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h index cc1c79d0231..b202cab2ff9 100644 --- a/test/core/client_channel/lb_policy/lb_policy_test_lib.h +++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h @@ -95,8 +95,8 @@ namespace testing { class LoadBalancingPolicyTest : public ::testing::Test { protected: - using CallAttributes = std::vector< - std::unique_ptr>; + using CallAttributes = + std::vector; // Channel-level subchannel state for a specific address and channel args. // This is analogous to the real subchannel in the ClientChannel code. @@ -630,8 +630,8 @@ class LoadBalancingPolicyTest : public ::testing::Test { class FakeCallState : public ClientChannelLbCallState { public: explicit FakeCallState(const CallAttributes& attributes) { - for (const auto& p : attributes) { - attributes_.emplace(p->type(), p.get()); + for (const auto& attribute : attributes) { + attributes_.emplace(attribute->type(), attribute); } } @@ -1232,17 +1232,26 @@ class LoadBalancingPolicyTest : public ::testing::Test { // Expects zero or more picker updates, each of which returns // round-robin picks for the specified set of addresses. - void DrainRoundRobinPickerUpdates( - absl::Span addresses, - SourceLocation location = SourceLocation()) { + RefCountedPtr + DrainRoundRobinPickerUpdates(absl::Span addresses, + SourceLocation location = SourceLocation()) { gpr_log(GPR_INFO, "Draining RR picker updates..."); + RefCountedPtr picker; while (!helper_->QueueEmpty()) { auto update = helper_->GetNextStateUpdate(location); - ASSERT_TRUE(update.has_value()); - ASSERT_EQ(update->state, GRPC_CHANNEL_READY); - ExpectRoundRobinPicks(update->picker.get(), addresses); + EXPECT_TRUE(update.has_value()) + << location.file() << ":" << location.line(); + if (!update.has_value()) return nullptr; + EXPECT_EQ(update->state, GRPC_CHANNEL_READY) + << location.file() << ":" << location.line(); + if (update->state != GRPC_CHANNEL_READY) return nullptr; + ExpectRoundRobinPicks(update->picker.get(), addresses, + /*call_attributes=*/{}, /*num_iterations=*/3, + location); + picker = std::move(update->picker); } gpr_log(GPR_INFO, "Done draining RR picker updates"); + return picker; } // Expects zero or more CONNECTING updates. diff --git a/test/core/client_channel/lb_policy/xds_override_host_test.cc b/test/core/client_channel/lb_policy/xds_override_host_test.cc index 5deccfdb2c1..f3bf82fc6be 100644 --- a/test/core/client_channel/lb_policy/xds_override_host_test.cc +++ b/test/core/client_channel/lb_policy/xds_override_host_test.cc @@ -14,6 +14,8 @@ // limitations under the License. // +#include + #include #include #include @@ -23,8 +25,12 @@ #include "absl/status/status.h" #include "absl/status/statusor.h" +#include "absl/strings/str_join.h" #include "absl/strings/string_view.h" +#include "absl/strings/strip.h" +#include "absl/types/optional.h" #include "absl/types/span.h" +#include "gmock/gmock.h" #include "gtest/gtest.h" #include @@ -34,6 +40,8 @@ #include "src/core/ext/filters/stateful_session/stateful_session_filter.h" #include "src/core/ext/xds/xds_health_status.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/experiments/experiments.h" +#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/json/json.h" #include "src/core/lib/load_balancing/lb_policy.h" @@ -99,12 +107,86 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest { EXPECT_EQ(ApplyUpdate(update, lb_policy()), absl::OkStatus()); } - CallAttributes MakeOverrideHostAttribute(absl::string_view host) { - CallAttributes override_host_attributes; - override_host_attributes.emplace_back( - std::make_unique(host)); - return override_host_attributes; + struct OverrideHostAttributeStorage { + // Need to store the string externally, since + // XdsOverrideHostAttribute only holds a string_view. + std::string address_list; + XdsOverrideHostAttribute attribute; + + explicit OverrideHostAttributeStorage(std::string addresses) + : address_list(std::move(addresses)), attribute(address_list) {} + }; + + XdsOverrideHostAttribute* MakeOverrideHostAttribute( + absl::Span addresses) { + std::vector address_list; + address_list.reserve(addresses.size()); + for (absl::string_view address : addresses) { + address_list.emplace_back(absl::StripPrefix(address, "ipv4:")); + } + attribute_storage_.emplace_back( + std::make_unique( + absl::StrJoin(address_list, ","))); + return &attribute_storage_.back()->attribute; + } + + XdsOverrideHostAttribute* MakeOverrideHostAttribute( + absl::string_view address) { + const std::array addresses = {address}; + return MakeOverrideHostAttribute(addresses); + } + + void ExpectOverridePicks( + LoadBalancingPolicy::SubchannelPicker* picker, + XdsOverrideHostAttribute* attribute, absl::string_view expected, + absl::Span expected_address_list = {}, + SourceLocation location = SourceLocation()) { + std::array kArray = {expected}; + if (expected_address_list.empty()) expected_address_list = kArray; + std::vector expected_addresses; + expected_addresses.reserve(expected_address_list.size()); + for (absl::string_view address : expected_address_list) { + expected_addresses.push_back(absl::StripPrefix(address, "ipv4:")); + } + std::string expected_addresses_str = absl::StrJoin(expected_addresses, ","); + for (size_t i = 0; i < 3; ++i) { + EXPECT_EQ( + ExpectPickComplete(picker, {attribute}, + /*subchannel_call_tracker=*/nullptr, location), + expected) + << location.file() << ":" << location.line(); + EXPECT_EQ(attribute->actual_address_list(), expected_addresses_str) + << "Expected: " << attribute->actual_address_list() << "\n" + << " Actual: " << expected_addresses_str << "\n" + << location.file() << ":" << location.line(); + } } + + void ExpectRoundRobinPicksWithAttribute( + LoadBalancingPolicy::SubchannelPicker* picker, + XdsOverrideHostAttribute* attribute, + absl::Span expected, + SourceLocation location = SourceLocation()) { + std::vector actual_picks; + for (size_t i = 0; i < expected.size(); ++i) { + auto address = ExpectPickComplete( + picker, {attribute}, /*subchannel_call_tracker=*/nullptr, location); + ASSERT_TRUE(address.has_value()) + << location.file() << ":" << location.line(); + EXPECT_THAT(*address, ::testing::AnyOfArray(expected)) + << location.file() << ":" << location.line(); + EXPECT_EQ(attribute->actual_address_list(), + absl::StripPrefix(*address, "ipv4:")) + << "Expected: " << attribute->actual_address_list() << "\n" + << " Actual: " << absl::StripPrefix(*address, "ipv4:") << "\n" + << location.file() << ":" << location.line(); + actual_picks.push_back(std::move(*address)); + } + EXPECT_TRUE(PicksAreRoundRobin(expected, actual_picks)) + << location.file() << ":" << location.line(); + } + + std::vector> attribute_storage_; }; TEST_F(XdsOverrideHostTest, DelegatesToChild) { @@ -121,34 +203,23 @@ TEST_F(XdsOverrideHostTest, NoConfigReportsError) { } TEST_F(XdsOverrideHostTest, OverrideHost) { - // Send address list to LB policy. const std::array kAddresses = { "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; auto picker = ExpectStartupWithRoundRobin(kAddresses); ASSERT_NE(picker, nullptr); - // Check that the host is overridden - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[1])), - kAddresses[1]); - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[1])), - kAddresses[1]); - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[0])), - kAddresses[0]); - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[0])), - kAddresses[0]); + auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]); + ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]); + auto* address0_attribute = MakeOverrideHostAttribute(kAddresses[0]); + ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]); } TEST_F(XdsOverrideHostTest, SubchannelNotFound) { - // Send address list to LB policy. const std::array kAddresses = { "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; auto picker = ExpectStartupWithRoundRobin(kAddresses); ASSERT_NE(picker, nullptr); - ExpectRoundRobinPicks(picker.get(), kAddresses, - MakeOverrideHostAttribute("no such host")); + auto* attribute = MakeOverrideHostAttribute("no such host"); + ExpectRoundRobinPicksWithAttribute(picker.get(), attribute, kAddresses); } TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) { @@ -156,118 +227,77 @@ TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) { "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; auto picker = ExpectStartupWithRoundRobin(kAddresses); ASSERT_NE(picker, nullptr); - // Check that the host is overridden - ExpectRoundRobinPicks(picker.get(), {kAddresses[1]}, - MakeOverrideHostAttribute(kAddresses[1])); - // Some other address is gone - EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[0], kAddresses[1]}, - MakeXdsOverrideHostConfig()), - lb_policy()), - absl::OkStatus()); - // Wait for LB policy to return a new picker that uses the updated - // addresses. We can't use the host override for this, because then - // we won't know when the new picker is actually using all of the new - // addresses. - picker = - WaitForRoundRobinListChange(kAddresses, {kAddresses[0], kAddresses[1]}); - // Make sure host override still works. - ExpectRoundRobinPicks(picker.get(), {kAddresses[1]}, - MakeOverrideHostAttribute(kAddresses[1])); - // "Our" address is gone so others get returned in round-robin order + // Check that the host override works. + auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]); + ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]); + // The override address is removed. EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[0], kAddresses[2]}, MakeXdsOverrideHostConfig()), lb_policy()), absl::OkStatus()); - // Wait for LB policy to return the new picker. - // In this case, we can pass call_attributes while we wait instead of - // checking again afterward, because the host override won't actually - // be used. - WaitForRoundRobinListChange({kAddresses[0], kAddresses[1]}, - {kAddresses[0], kAddresses[2]}, - MakeOverrideHostAttribute(kAddresses[1])); - // And now it is back + picker = + WaitForRoundRobinListChange(kAddresses, {kAddresses[0], kAddresses[2]}); + // Picks are returned in round-robin order, because the address + // pointed to by the cookie is not present. + ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute, + {kAddresses[0], kAddresses[2]}); + // The override address comes back. EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[1], kAddresses[2]}, MakeXdsOverrideHostConfig()), lb_policy()), absl::OkStatus()); - // Wait for LB policy to return the new picker. picker = WaitForRoundRobinListChange({kAddresses[0], kAddresses[2]}, {kAddresses[1], kAddresses[2]}); // Make sure host override works. - ExpectRoundRobinPicks(picker.get(), {kAddresses[1]}, - MakeOverrideHostAttribute(kAddresses[1])); + ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]); } -TEST_F(XdsOverrideHostTest, FailedSubchannelIsNotPicked) { - // Send address list to LB policy. +TEST_F(XdsOverrideHostTest, + OverrideIsQueuedInIdleOrConnectingAndFailedInTransientFailure) { const std::array kAddresses = { "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; auto picker = ExpectStartupWithRoundRobin(kAddresses); ASSERT_NE(picker, nullptr); // Check that the host is overridden - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[1])), - kAddresses[1]); + auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]); + ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]); // Subchannel for address 1 becomes disconnected. gpr_log(GPR_INFO, "### subchannel 1 reporting IDLE"); auto subchannel = FindSubchannel(kAddresses[1]); ASSERT_NE(subchannel, nullptr); subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE); + EXPECT_TRUE(subchannel->ConnectionRequested()); gpr_log(GPR_INFO, "### expecting re-resolution request"); ExpectReresolutionRequest(); gpr_log(GPR_INFO, "### expecting RR picks to exclude the disconnected subchannel"); - ExpectRoundRobinPicks(ExpectState(GRPC_CHANNEL_READY).get(), - {kAddresses[0], kAddresses[2]}); - // It starts trying to reconnect... + picker = + WaitForRoundRobinListChange(kAddresses, {kAddresses[0], kAddresses[2]}); + // Picks with the override will be queued. + ExpectPickQueued(picker.get(), {address1_attribute}); + // The subchannel starts trying to reconnect. gpr_log(GPR_INFO, "### subchannel 1 reporting CONNECTING"); subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); - gpr_log(GPR_INFO, "### expecting RR picks again"); - ExpectRoundRobinPicks(ExpectState(GRPC_CHANNEL_READY).get(), - {kAddresses[0], kAddresses[2]}); - // ...but the connection attempt fails. + picker = ExpectState(GRPC_CHANNEL_READY); + ASSERT_NE(picker, nullptr); + ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); + // Picks with the override will still be queued. + ExpectPickQueued(picker.get(), {address1_attribute}); + // The connection attempt fails. gpr_log(GPR_INFO, "### subchannel 1 reporting TRANSIENT_FAILURE"); subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE, absl::ResourceExhaustedError("Hmmmm")); gpr_log(GPR_INFO, "### expecting re-resolution request"); ExpectReresolutionRequest(); + picker = ExpectState(GRPC_CHANNEL_READY); + ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); // The host override is not used. gpr_log(GPR_INFO, "### checking that host override is not used"); - picker = ExpectState(GRPC_CHANNEL_READY); - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}, - MakeOverrideHostAttribute(kAddresses[1])); -} - -TEST_F(XdsOverrideHostTest, ConnectingSubchannelIsQueued) { - // Send address list to LB policy. - const std::array kAddresses = { - "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; - auto picker = ExpectStartupWithRoundRobin(kAddresses); - ASSERT_NE(picker, nullptr); - // Check that the host is overridden - EXPECT_EQ(ExpectPickComplete(picker.get(), - { - MakeOverrideHostAttribute(kAddresses[1]), - }), - kAddresses[1]); - auto subchannel = FindSubchannel(kAddresses[1]); - ASSERT_NE(subchannel, nullptr); - subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE); - ExpectReresolutionRequest(); - EXPECT_TRUE(subchannel->ConnectionRequested()); - picker = ExpectState(GRPC_CHANNEL_READY); - ExpectPickQueued(picker.get(), { - MakeOverrideHostAttribute(kAddresses[1]), - }); - subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); - picker = ExpectState(GRPC_CHANNEL_READY); - ExpectPickQueued(picker.get(), { - MakeOverrideHostAttribute(kAddresses[1]), - }); + ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute, + {kAddresses[0], kAddresses[2]}); } TEST_F(XdsOverrideHostTest, DrainingState) { - // Send address list to LB policy. const std::array kAddresses = { "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr); @@ -277,33 +307,31 @@ TEST_F(XdsOverrideHostTest, DrainingState) { {kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}}, {"UNKNOWN", "HEALTHY", "DRAINING"}); auto picker = ExpectState(GRPC_CHANNEL_READY); - ASSERT_NE(picker, nullptr); + // Picks without an override will round-robin over the two endpoints + // that are not in draining state. ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); - ExpectQueueEmpty(); - // Draining subchannel is returned - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[1])), - kAddresses[1]); + // Picks with an override are able to select the draining endpoint. + auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]); + ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]); + // Send the LB policy an update that removes the draining endpoint. ApplyUpdateWithHealthStatuses( {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, {kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}}); picker = ExpectState(GRPC_CHANNEL_READY); ASSERT_NE(picker, nullptr); // Gone! - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}, - MakeOverrideHostAttribute(kAddresses[1])); + ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute, + {kAddresses[0], kAddresses[2]}); } TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) { - // Send address list to LB policy. const std::array kAddresses = { "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; auto picker = ExpectStartupWithRoundRobin(kAddresses); ASSERT_NE(picker, nullptr); // Check that the host is overridden - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[1])), - kAddresses[1]); + auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]); + ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]); // Send an update that marks the endpoints with different EDS health // states, but those states are present in override_host_status. // The picker should use the DRAINING host when a call's override @@ -318,9 +346,7 @@ TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) { auto subchannel = FindSubchannel(kAddresses[1]); ASSERT_NE(subchannel, nullptr); picker = ExpectState(GRPC_CHANNEL_READY); - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[1])), - kAddresses[1]); + ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]); ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); // Now the connection to the draining host gets dropped. // The picker should queue picks where the override host is IDLE. @@ -328,7 +354,7 @@ TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) { gpr_log(GPR_INFO, "### closing connection to DRAINING host"); subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE); picker = ExpectState(GRPC_CHANNEL_READY); - ExpectPickQueued(picker.get(), MakeOverrideHostAttribute(kAddresses[1])); + ExpectPickQueued(picker.get(), {address1_attribute}); ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); // The subchannel should have been asked to reconnect as a result of the // queued pick above. It will therefore transition into state CONNECTING. @@ -341,7 +367,7 @@ TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) { ExpectQueueEmpty(); subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); picker = ExpectState(GRPC_CHANNEL_READY); - ExpectPickQueued(picker.get(), MakeOverrideHostAttribute(kAddresses[1])); + ExpectPickQueued(picker.get(), {address1_attribute}); ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); // The subchannel now becomes connected again. // Now picks with this override host can be completed again. @@ -349,14 +375,11 @@ TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) { gpr_log(GPR_INFO, "### subchannel becomes reconnected"); subchannel->SetConnectivityState(GRPC_CHANNEL_READY); picker = ExpectState(GRPC_CHANNEL_READY); - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[1])), - kAddresses[1]); + ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]); ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); } TEST_F(XdsOverrideHostTest, DrainingToHealthy) { - // Send address list to LB policy. const std::array kAddresses = { "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr); @@ -366,25 +389,17 @@ TEST_F(XdsOverrideHostTest, DrainingToHealthy) { {kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}}, {"UNKNOWN", "HEALTHY", "DRAINING"}); auto picker = ExpectState(GRPC_CHANNEL_READY); - ASSERT_NE(picker, nullptr); ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]}); - ExpectQueueEmpty(); - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[1])), - kAddresses[1]); + auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]); + ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]); ApplyUpdateWithHealthStatuses( {{kAddresses[0], XdsHealthStatus::HealthStatus::kHealthy}, {kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy}, {kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}}, {"UNKNOWN", "HEALTHY", "DRAINING"}); picker = ExpectState(GRPC_CHANNEL_READY); - ASSERT_NE(picker, nullptr); - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[1])), - kAddresses[1]); - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[1])), - kAddresses[1]); + ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]); + ExpectRoundRobinPicks(picker.get(), kAddresses); } TEST_F(XdsOverrideHostTest, OverrideHostStatus) { @@ -399,16 +414,13 @@ TEST_F(XdsOverrideHostTest, OverrideHostStatus) { auto picker = ExpectState(GRPC_CHANNEL_READY); ASSERT_NE(picker, nullptr); ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}); - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[0])), - kAddresses[0]); - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[1])), - kAddresses[1]); - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[2])), - kAddresses[2]); - // UNKNOWN excluded - first chanel does not get overridden + auto* address0_attribute = MakeOverrideHostAttribute(kAddresses[0]); + ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]); + auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]); + ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]); + auto* address2_attribute = MakeOverrideHostAttribute(kAddresses[2]); + ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]); + // UNKNOWN excluded: overrides for first endpoint are not honored. ApplyUpdateWithHealthStatuses( {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, {kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy}, @@ -417,32 +429,24 @@ TEST_F(XdsOverrideHostTest, OverrideHostStatus) { picker = ExpectState(GRPC_CHANNEL_READY); ASSERT_NE(picker, nullptr); ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}); - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}, - MakeOverrideHostAttribute(kAddresses[0])); - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[1])), - kAddresses[1]); - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[2])), - kAddresses[2]); - // HEALTHY excluded - second chanel does not get overridden + ExpectRoundRobinPicksWithAttribute(picker.get(), address0_attribute, + {kAddresses[0], kAddresses[1]}); + ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]); + ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]); + // HEALTHY excluded: overrides for second endpoint are not honored. ApplyUpdateWithHealthStatuses( {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, {kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy}, {kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}}, - {"UNKNOWN", "HEALTHY"}); + {"UNKNOWN", "DRAINING"}); picker = ExpectState(GRPC_CHANNEL_READY); ASSERT_NE(picker, nullptr); ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}); - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[0])), - kAddresses[0]); - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[1])), - kAddresses[1]); - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}, - MakeOverrideHostAttribute(kAddresses[2])); - // DRAINING excluded - third chanel does not get overridden + ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]); + ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute, + {kAddresses[0], kAddresses[1]}); + ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]); + // DRAINING excluded: overrides for third endpoint are not honored. ApplyUpdateWithHealthStatuses( {{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown}, {kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy}, @@ -451,14 +455,49 @@ TEST_F(XdsOverrideHostTest, OverrideHostStatus) { picker = ExpectState(GRPC_CHANNEL_READY); ASSERT_NE(picker, nullptr); ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}); - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[0])), - kAddresses[0]); - EXPECT_EQ(ExpectPickComplete(picker.get(), - MakeOverrideHostAttribute(kAddresses[1])), - kAddresses[1]); - ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]}, - MakeOverrideHostAttribute(kAddresses[2])); + ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]); + ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]); + ExpectRoundRobinPicksWithAttribute(picker.get(), address2_attribute, + {kAddresses[0], kAddresses[1]}); +} + +TEST_F(XdsOverrideHostTest, MultipleAddressesPerEndpoint) { + if (!IsRoundRobinDelegateToPickFirstEnabled()) return; + constexpr std::array kEndpoint1Addresses = { + "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"}; + constexpr std::array kEndpoint2Addresses = { + "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"}; + constexpr std::array kEndpoint3Addresses = { + "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"}; + const std::array kEndpoints = { + MakeEndpointAddresses(kEndpoint1Addresses), + MakeEndpointAddresses(kEndpoint2Addresses), + MakeEndpointAddresses(kEndpoint3Addresses)}; + EXPECT_EQ(ApplyUpdate(BuildUpdate(kEndpoints, MakeXdsOverrideHostConfig()), + lb_policy()), + absl::OkStatus()); + auto picker = ExpectRoundRobinStartup(kEndpoints); + ASSERT_NE(picker, nullptr); + // Check that the host is overridden. + auto* endpoint1_attribute = MakeOverrideHostAttribute(kEndpoint1Addresses); + ExpectOverridePicks(picker.get(), endpoint1_attribute, kEndpoint1Addresses[0], + kEndpoint1Addresses); + auto* endpoint2_attribute = MakeOverrideHostAttribute(kEndpoint2Addresses); + ExpectOverridePicks(picker.get(), endpoint2_attribute, kEndpoint2Addresses[0], + kEndpoint2Addresses); + // Change endpoint 1 to connect to its second address. + ExpectEndpointAddressChange(kEndpoint1Addresses, 0, 1, [&]() { + WaitForRoundRobinListChange( + {kEndpoint1Addresses[0], kEndpoint2Addresses[0], + kEndpoint3Addresses[0]}, + {kEndpoint2Addresses[0], kEndpoint3Addresses[0]}); + }); + WaitForRoundRobinListChange( + {kEndpoint2Addresses[0], kEndpoint3Addresses[0]}, + {kEndpoint1Addresses[1], kEndpoint2Addresses[0], kEndpoint3Addresses[0]}); + // Now the cookie for endpoint 1 should cause us to use the second address. + ExpectOverridePicks(picker.get(), endpoint1_attribute, kEndpoint1Addresses[1], + {kEndpoint1Addresses[1], kEndpoint1Addresses[0]}); } } // namespace diff --git a/test/cpp/end2end/xds/BUILD b/test/cpp/end2end/xds/BUILD index 863e1d2c903..a1feea3a974 100644 --- a/test/cpp/end2end/xds/BUILD +++ b/test/cpp/end2end/xds/BUILD @@ -444,6 +444,7 @@ grpc_cc_test( "//src/proto/grpc/testing/xds/v3:stateful_session_cookie_proto", "//src/proto/grpc/testing/xds/v3:stateful_session_proto", "//test/core/util:grpc_test_util", + "//test/core/util:scoped_env_var", ], ) diff --git a/test/cpp/end2end/xds/xds_override_host_end2end_test.cc b/test/cpp/end2end/xds/xds_override_host_end2end_test.cc index 80bda5b8f10..6eebf54dec1 100644 --- a/test/cpp/end2end/xds/xds_override_host_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_override_host_end2end_test.cc @@ -23,9 +23,11 @@ #include "absl/strings/str_split.h" #include "src/core/lib/config/config_vars.h" +#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gprpp/time.h" #include "src/proto/grpc/testing/xds/v3/stateful_session.pb.h" #include "src/proto/grpc/testing/xds/v3/stateful_session_cookie.pb.h" +#include "test/core/util/scoped_env_var.h" #include "test/cpp/end2end/xds/xds_end2end_test_lib.h" namespace grpc { @@ -82,8 +84,14 @@ class OverrideHostTest : public XdsEnd2endTest { std::vector values; auto pair = server_initial_metadata.equal_range("set-cookie"); for (auto it = pair.first; it != pair.second; ++it) { - gpr_log(GPR_INFO, "set-cookie header: %s", - std::string(it->second).c_str()); + std::pair key_value = + absl::StrSplit(it->second, '='); + std::pair key_value2 = + absl::StrSplit(key_value.second, ';'); + std::string decoded; + EXPECT_TRUE(absl::Base64Unescape(key_value2.first, &decoded)); + gpr_log(GPR_INFO, "set-cookie header: %s (decoded: %s)", + std::string(it->second).c_str(), decoded.c_str()); values.emplace_back(ParseCookie(it->second)); EXPECT_FALSE(values.back().value.empty()); EXPECT_THAT(values.back().attributes, ::testing::Contains("HttpOnly")); @@ -609,6 +617,58 @@ TEST_P(OverrideHostTest, TTLSetsMaxAge) { ::testing::UnorderedElementsAre("Max-Age=42", "HttpOnly")); } +TEST_P(OverrideHostTest, MultipleAddressesPerEndpoint) { + if (!grpc_core::IsRoundRobinDelegateToPickFirstEnabled()) return; + grpc_core::testing::ScopedExperimentalEnvVar env( + "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS"); + // Create 3 backends, but leave backend 0 unstarted. + CreateBackends(3); + StartBackend(1); + StartBackend(2); + SetListenerAndRouteConfiguration(balancer_.get(), + BuildListenerWithStatefulSessionFilter(), + default_route_config_); + balancer_->ads_service()->SetEdsResource(BuildEdsResource( + EdsResourceArgs({{"locality0", + {CreateEndpoint(0, HealthStatus::HEALTHY, 1, {1}), + CreateEndpoint(2, HealthStatus::UNKNOWN)}}}))); + WaitForAllBackends(DEBUG_LOCATION, 1); // Wait for backends 1 and 2. + // Requests without a cookie should get round-robin across backends 1 and 2. + CheckRpcSendOk(DEBUG_LOCATION, 10); + EXPECT_EQ(backends_[0]->backend_service()->request_count(), 0); + EXPECT_EQ(backends_[1]->backend_service()->request_count(), 5); + EXPECT_EQ(backends_[2]->backend_service()->request_count(), 5); + ResetBackendCounters(); + // Get cookie for backend 1. + auto cookies = GetCookiesForBackend(DEBUG_LOCATION, 1); + EXPECT_THAT(cookies, + ::testing::ElementsAre(::testing::AllOf( + ::testing::Field("name", &Cookie::name, kCookieName), + ::testing::Field("attributes", &Cookie::attributes, + ::testing::ElementsAre("HttpOnly")), + ::testing::Field("value", &Cookie::value, + ::testing::Not(::testing::IsEmpty()))))); + // Requests with the cookie always go to the same backend. + CheckRpcSendOk(DEBUG_LOCATION, 5, + RpcOptions().set_metadata({cookies.front().Header()})); + EXPECT_EQ(backends_[1]->backend_service()->request_count(), 5); + // Now start backend 0 and stop backend 1. + StartBackend(0); + ShutdownBackend(1); + // Wait for traffic to go to backend 0. + WaitForBackend(DEBUG_LOCATION, 0); + // Requests with no cookie should get round-robin across backends 0 and 2. + CheckRpcSendOk(DEBUG_LOCATION, 10); + EXPECT_EQ(backends_[0]->backend_service()->request_count(), 5); + EXPECT_EQ(backends_[1]->backend_service()->request_count(), 0); + EXPECT_EQ(backends_[2]->backend_service()->request_count(), 5); + ResetBackendCounters(); + // Requests with the same cookie should now go to backend 0. + CheckRpcSendOk(DEBUG_LOCATION, 5, + RpcOptions().set_metadata({cookies.front().Header()})); + EXPECT_EQ(backends_[0]->backend_service()->request_count(), 5); +} + } // namespace } // namespace testing } // namespace grpc