[SSA] support multiple addresses per endpoint (#34472)

pull/34610/head
Mark D. Roth 1 year ago committed by GitHub
parent 3ed72c0c60
commit bb6a6faa69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      build_autogenerated.yaml
  2. 3
      src/core/BUILD
  3. 362
      src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc
  4. 52
      src/core/ext/filters/stateful_session/stateful_session_filter.cc
  5. 25
      src/core/ext/filters/stateful_session/stateful_session_filter.h
  6. 29
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  7. 369
      test/core/client_channel/lb_policy/xds_override_host_test.cc
  8. 1
      test/cpp/end2end/xds/BUILD
  9. 64
      test/cpp/end2end/xds/xds_override_host_end2end_test.cc

@ -18199,6 +18199,7 @@ targets:
build: test
language: c++
headers:
- test/core/util/scoped_env_var.h
- test/cpp/end2end/counted_service.h
- test/cpp/end2end/test_service_impl.h
- test/cpp/end2end/xds/xds_end2end_test_lib.h

@ -3985,6 +3985,7 @@ grpc_cc_library(
"map",
"pipe",
"poll",
"ref_counted_string",
"service_config_parser",
"slice",
"time",
@ -5185,6 +5186,7 @@ grpc_cc_library(
"absl/status:statusor",
"absl/strings",
"absl/types:optional",
"absl/types:span",
"absl/types:variant",
],
language = "c++",
@ -5205,6 +5207,7 @@ grpc_cc_library(
"lb_policy_registry",
"match",
"pollset_set",
"ref_counted_string",
"resolved_address",
"subchannel_interface",
"validation_errors",

@ -37,8 +37,11 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/span.h"
#include "absl/types/variant.h"
#include <grpc/impl/connectivity_state.h>
@ -57,6 +60,7 @@
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/ref_counted_string.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/gprpp/work_serializer.h"
@ -124,8 +128,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
class SubchannelWrapper : public DelegatingSubchannel {
public:
SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,
RefCountedPtr<XdsOverrideHostLb> policy,
absl::string_view key);
RefCountedPtr<XdsOverrideHostLb> policy);
~SubchannelWrapper() override;
@ -141,6 +144,9 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
XdsOverrideHostLb* policy() { return policy_.get(); }
void set_key(absl::string_view key) { key_ = std::string(key); }
const absl::optional<std::string>& key() const { return key_; }
private:
class ConnectivityStateWatcher : public ConnectivityStateWatcherInterface {
public:
@ -172,6 +178,60 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
GRPC_CHANNEL_IDLE};
};
class SubchannelEntry {
public:
explicit SubchannelEntry(XdsHealthStatus eds_health_status)
: eds_health_status_(eds_health_status) {}
void SetSubchannel(SubchannelWrapper* subchannel) {
if (eds_health_status_.status() == XdsHealthStatus::kDraining) {
subchannel_ = subchannel->Ref();
} else {
subchannel_ = subchannel->WeakRef();
}
}
void UnsetSubchannel() {
subchannel_ = WeakRefCountedPtr<SubchannelWrapper>(nullptr);
}
SubchannelWrapper* GetSubchannel() const {
return Match(
subchannel_,
[](WeakRefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
subchannel) { return subchannel.get(); },
[](RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper> subchannel) {
return subchannel.get();
});
}
void SetEdsHealthStatus(XdsHealthStatus eds_health_status) {
eds_health_status_ = eds_health_status;
auto subchannel = GetSubchannel();
if (subchannel == nullptr) return;
if (eds_health_status_.status() == XdsHealthStatus::kDraining) {
subchannel_ = subchannel->Ref();
} else {
subchannel_ = subchannel->WeakRef();
}
}
XdsHealthStatus eds_health_status() const { return eds_health_status_; }
void set_address_list(RefCountedStringValue address_list) {
address_list_ = std::move(address_list);
}
RefCountedStringValue address_list() const { return address_list_; }
private:
absl::variant<WeakRefCountedPtr<SubchannelWrapper>,
RefCountedPtr<SubchannelWrapper>>
subchannel_;
XdsHealthStatus eds_health_status_;
RefCountedStringValue address_list_;
};
// A picker that wraps the picker from the child for cases when cookie is
// present.
class Picker : public SubchannelPicker {
@ -210,7 +270,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
};
absl::optional<LoadBalancingPolicy::PickResult> PickOverridenHost(
absl::string_view override_host);
XdsOverrideHostAttribute* override_host_attr) const;
RefCountedPtr<XdsOverrideHostLb> policy_;
RefCountedPtr<SubchannelPicker> picker_;
@ -231,53 +291,6 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
RefCountedPtr<SubchannelPicker> picker) override;
};
class SubchannelEntry {
public:
explicit SubchannelEntry(XdsHealthStatus eds_health_status)
: eds_health_status_(eds_health_status) {}
void SetSubchannel(SubchannelWrapper* subchannel) {
if (eds_health_status_.status() == XdsHealthStatus::kDraining) {
subchannel_ = subchannel->Ref();
} else {
subchannel_ = subchannel;
}
}
void UnsetSubchannel() { subchannel_ = nullptr; }
SubchannelWrapper* GetSubchannel() const {
return Match(
subchannel_,
[](XdsOverrideHostLb::SubchannelWrapper* subchannel) {
return subchannel;
},
[](RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper> subchannel) {
return subchannel.get();
});
}
void SetEdsHealthStatus(XdsHealthStatus eds_health_status) {
eds_health_status_ = eds_health_status;
auto subchannel = GetSubchannel();
if (subchannel == nullptr) {
return;
}
if (eds_health_status_.status() == XdsHealthStatus::kDraining) {
subchannel_ = subchannel->Ref();
} else {
subchannel_ = subchannel;
}
}
XdsHealthStatus eds_health_status() const { return eds_health_status_; }
private:
absl::variant<SubchannelWrapper*, RefCountedPtr<SubchannelWrapper>>
subchannel_;
XdsHealthStatus eds_health_status_;
};
~XdsOverrideHostLb() override;
void ShutdownLocked() override;
@ -296,12 +309,9 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
void UnsetSubchannel(absl::string_view key, SubchannelWrapper* subchannel);
RefCountedPtr<SubchannelWrapper> GetSubchannelByAddress(
absl::string_view address, XdsHealthStatusSet overriden_health_statuses);
void OnSubchannelConnectivityStateChange(absl::string_view subchannel_key)
ABSL_NO_THREAD_SAFETY_ANALYSIS; // Called from within the worker
// serializer and does not require
ABSL_NO_THREAD_SAFETY_ANALYSIS; // Called from within the
// WorkSerializer and does not require
// additional synchronization
// Current config from the resolver.
@ -339,39 +349,89 @@ XdsOverrideHostLb::Picker::Picker(
}
absl::optional<LoadBalancingPolicy::PickResult>
XdsOverrideHostLb::Picker::PickOverridenHost(absl::string_view override_host) {
if (override_host.length() == 0) {
return absl::nullopt;
}
auto subchannel = policy_->GetSubchannelByAddress(
override_host, override_host_health_status_set_);
if (subchannel == nullptr) {
return absl::nullopt;
}
auto connectivity_state = subchannel->connectivity_state();
if (connectivity_state == GRPC_CHANNEL_READY) {
return PickResult::Complete(subchannel->wrapped_subchannel());
} else if (connectivity_state == GRPC_CHANNEL_CONNECTING) {
XdsOverrideHostLb::Picker::PickOverridenHost(
XdsOverrideHostAttribute* override_host_attr) const {
GPR_ASSERT(override_host_attr != nullptr);
auto cookie_address_list = override_host_attr->cookie_address_list();
if (cookie_address_list.empty()) return absl::nullopt;
// The cookie has an address list, so look through the addresses in order.
RefCountedPtr<SubchannelWrapper> idle_subchannel;
bool found_connecting = false;
{
MutexLock lock(&policy_->subchannel_map_mu_);
for (absl::string_view address : absl::StrSplit(cookie_address_list, ',')) {
RefCountedPtr<SubchannelWrapper> subchannel;
auto it = policy_->subchannel_map_.find(address);
if (it != policy_->subchannel_map_.end()) {
subchannel = it->second.GetSubchannel()->RefIfNonZero();
}
if (subchannel == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "Subchannel %s was not found",
std::string(address).c_str());
}
continue;
}
if (!override_host_health_status_set_.Contains(
it->second.eds_health_status())) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"Subchannel %s health status is not overridden (%s)",
std::string(address).c_str(),
it->second.eds_health_status().ToString());
}
continue;
}
auto connectivity_state = subchannel->connectivity_state();
if (connectivity_state == GRPC_CHANNEL_READY) {
// Found a READY subchannel. Pass back the actual address list
// and return the subchannel.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "Picker override found READY subchannel %s",
std::string(address).c_str());
}
override_host_attr->set_actual_address_list(it->second.address_list());
return PickResult::Complete(subchannel->wrapped_subchannel());
} else if (connectivity_state == GRPC_CHANNEL_IDLE) {
if (idle_subchannel == nullptr) idle_subchannel = std::move(subchannel);
} else if (connectivity_state == GRPC_CHANNEL_CONNECTING) {
found_connecting = true;
}
}
}
// No READY subchannel found. If we found an IDLE subchannel, trigger
// a connection attempt and queue the pick until that attempt completes.
if (idle_subchannel != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "Picker override found IDLE subchannel");
}
// Deletes itself after the connection is requested.
new SubchannelConnectionRequester(std::move(idle_subchannel));
return PickResult::Queue();
} else if (connectivity_state == GRPC_CHANNEL_IDLE) {
// Deleted after the connection is requested
new SubchannelConnectionRequester(std::move(subchannel));
}
// No READY or IDLE subchannels. If we found a CONNECTING subchannel,
// queue the pick and wait for the connection attempt to complete.
if (found_connecting) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "Picker override found CONNECTING subchannel");
}
return PickResult::Queue();
}
// No READY, IDLE, or CONNECTING subchannels found.
return absl::nullopt;
}
LoadBalancingPolicy::PickResult XdsOverrideHostLb::Picker::Pick(
LoadBalancingPolicy::PickArgs args) {
LoadBalancingPolicy::PickResult XdsOverrideHostLb::Picker::Pick(PickArgs args) {
auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
auto* override_host = static_cast<XdsOverrideHostAttribute*>(
auto* override_host_attr = static_cast<XdsOverrideHostAttribute*>(
call_state->GetCallAttribute(XdsOverrideHostAttribute::TypeName()));
auto overridden_host_pick =
PickOverridenHost(override_host != nullptr ? override_host->host_name()
: absl::string_view());
if (overridden_host_pick.has_value()) {
return std::move(*overridden_host_pick);
if (override_host_attr != nullptr) {
auto overridden_host_pick = PickOverridenHost(override_host_attr);
if (overridden_host_pick.has_value()) {
return std::move(*overridden_host_pick);
}
}
// No usable override. Delegate to child picker.
if (picker_ == nullptr) { // Should never happen.
return PickResult::Fail(absl::InternalError(
"xds_override_host picker not given any child picker"));
@ -379,9 +439,23 @@ LoadBalancingPolicy::PickResult XdsOverrideHostLb::Picker::Pick(
auto result = picker_->Pick(args);
auto complete_pick = absl::get_if<PickResult::Complete>(&result.result);
if (complete_pick != nullptr) {
complete_pick->subchannel =
static_cast<SubchannelWrapper*>(complete_pick->subchannel.get())
->wrapped_subchannel();
auto* wrapper =
static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
// Populate the address list in the override host attribute so that
// the StatefulSession filter can set the cookie.
if (override_host_attr != nullptr) {
auto& key = wrapper->key();
if (key.has_value()) {
MutexLock lock(&policy_->subchannel_map_mu_);
auto it = policy_->subchannel_map_.find(*key);
if (it != policy_->subchannel_map_.end()) { // Should always be true.
override_host_attr->set_actual_address_list(
it->second.address_list());
}
}
}
// Unwrap the subchannel.
complete_pick->subchannel = wrapper->wrapped_subchannel();
}
return result;
}
@ -513,10 +587,16 @@ absl::StatusOr<EndpointAddressesList> XdsOverrideHostLb::UpdateAddressMap(
}
return endpoints;
}
// TODO(roth): As we clarify this part of the dualstack design, add
// support for multiple addresses per endpoint.
EndpointAddressesList return_value;
std::map<const std::string, XdsHealthStatus> addresses_for_map;
// Construct the list of addresses to pass to the child policy and a
// map of address info from which to update subchannel_map_.
EndpointAddressesList child_addresses;
struct AddressInfo {
XdsHealthStatus eds_health_status;
RefCountedStringValue address_list;
AddressInfo(XdsHealthStatus status, RefCountedStringValue addresses)
: eds_health_status(status), address_list(std::move(addresses)) {}
};
std::map<const std::string, AddressInfo> addresses_for_map;
for (const auto& endpoint : *endpoints) {
XdsHealthStatus status = GetEndpointHealthStatus(endpoint);
if (status.status() != XdsHealthStatus::kDraining) {
@ -526,7 +606,7 @@ absl::StatusOr<EndpointAddressesList> XdsOverrideHostLb::UpdateAddressMap(
"passing to child",
this, endpoint.ToString().c_str());
}
return_value.push_back(endpoint);
child_addresses.push_back(endpoint);
} else if (!config_->override_host_status_set().Contains(status)) {
// Skip draining hosts if not in the override status set.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
@ -537,16 +617,32 @@ absl::StatusOr<EndpointAddressesList> XdsOverrideHostLb::UpdateAddressMap(
}
continue;
}
auto key = grpc_sockaddr_to_uri(&endpoint.address());
if (key.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] endpoint %s: adding map key %s",
this, endpoint.ToString().c_str(), key->c_str());
std::vector<std::string> addresses;
addresses.reserve(endpoint.addresses().size());
for (const auto& address : endpoint.addresses()) {
auto key = grpc_sockaddr_to_string(&address, /*normalize=*/false);
if (key.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] endpoint %s: adding map key %s",
this, endpoint.ToString().c_str(), key->c_str());
}
addresses.push_back(*std::move(key));
}
addresses_for_map.emplace(std::move(*key), status);
}
absl::Span<const std::string> addresses_span = addresses;
for (size_t i = 0; i < addresses.size(); ++i) {
std::string start = absl::StrJoin(addresses_span.subspan(0, i), ",");
std::string end = absl::StrJoin(addresses_span.subspan(i + 1), ",");
RefCountedStringValue address_list(
absl::StrCat(addresses[i], (start.empty() ? "" : ","), start,
(end.empty() ? "" : ","), end));
addresses_for_map.emplace(
std::piecewise_construct, std::forward_as_tuple(addresses[i]),
std::forward_as_tuple(status, std::move(address_list)));
}
}
// Now grab the lock and update subchannel_map_ from addresses_for_map.
{
MutexLock lock(&subchannel_map_mu_);
for (auto it = subchannel_map_.begin(); it != subchannel_map_.end();) {
@ -560,44 +656,55 @@ absl::StatusOr<EndpointAddressesList> XdsOverrideHostLb::UpdateAddressMap(
++it;
}
}
for (const auto& key_status : addresses_for_map) {
auto it = subchannel_map_.find(key_status.first);
for (auto& p : addresses_for_map) {
const auto& address = p.first;
auto& address_info = p.second;
auto it = subchannel_map_.find(address);
if (it == subchannel_map_.end()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] adding map key %s", this,
key_status.first.c_str());
address.c_str());
}
subchannel_map_.emplace(std::piecewise_construct,
std::forward_as_tuple(key_status.first),
std::forward_as_tuple(key_status.second));
it = subchannel_map_
.emplace(std::piecewise_construct,
std::forward_as_tuple(address),
std::forward_as_tuple(address_info.eds_health_status))
.first;
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] setting EDS health status for "
"%s to %s",
this, key_status.first.c_str(), key_status.second.ToString());
this, address.c_str(),
address_info.eds_health_status.ToString());
}
it->second.SetEdsHealthStatus(key_status.second);
it->second.SetEdsHealthStatus(address_info.eds_health_status);
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] setting address list for %s to %s",
this, address.c_str(), address_info.address_list.c_str());
}
it->second.set_address_list(std::move(address_info.address_list));
}
}
return return_value;
return child_addresses;
}
RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
XdsOverrideHostLb::AdoptSubchannel(
const grpc_resolved_address& address,
RefCountedPtr<SubchannelInterface> subchannel) {
auto key = grpc_sockaddr_to_uri(&address);
if (!key.ok()) {
return subchannel;
}
auto key = grpc_sockaddr_to_string(&address, /*normalize=*/false);
auto wrapper =
MakeRefCounted<SubchannelWrapper>(std::move(subchannel), Ref(), *key);
MutexLock lock(&subchannel_map_mu_);
auto it = subchannel_map_.find(*key);
if (it != subchannel_map_.end()) {
it->second.SetSubchannel(wrapper.get());
MakeRefCounted<SubchannelWrapper>(std::move(subchannel), Ref());
if (key.ok()) {
MutexLock lock(&subchannel_map_mu_);
auto it = subchannel_map_.find(*key);
if (it != subchannel_map_.end()) {
wrapper->set_key(*key);
it->second.SetSubchannel(wrapper.get());
}
}
return wrapper;
}
@ -613,29 +720,6 @@ void XdsOverrideHostLb::UnsetSubchannel(absl::string_view key,
}
}
RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
XdsOverrideHostLb::GetSubchannelByAddress(
absl::string_view address, XdsHealthStatusSet overriden_health_statuses) {
MutexLock lock(&subchannel_map_mu_);
auto it = subchannel_map_.find(address);
if (it == subchannel_map_.end() || it->second.GetSubchannel() == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "Subchannel %s was not found",
std::string(address).c_str());
}
return nullptr;
}
if (!overriden_health_statuses.Contains(it->second.eds_health_status())) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "Subchannel %s health status is not overridden (%s)",
std::string(address).c_str(),
it->second.eds_health_status().ToString());
}
return nullptr;
}
return it->second.GetSubchannel()->Ref();
}
void XdsOverrideHostLb::OnSubchannelConnectivityStateChange(
absl::string_view subchannel_key) {
auto it = subchannel_map_.find(subchannel_key);
@ -656,7 +740,7 @@ RefCountedPtr<SubchannelInterface> XdsOverrideHostLb::Helper::CreateSubchannel(
const ChannelArgs& args) {
auto subchannel = parent()->channel_control_helper()->CreateSubchannel(
address, per_address_args, args);
return parent()->AdoptSubchannel(address, subchannel);
return parent()->AdoptSubchannel(address, std::move(subchannel));
}
void XdsOverrideHostLb::Helper::UpdateState(
@ -677,10 +761,8 @@ void XdsOverrideHostLb::Helper::UpdateState(
XdsOverrideHostLb::SubchannelWrapper::SubchannelWrapper(
RefCountedPtr<SubchannelInterface> subchannel,
RefCountedPtr<XdsOverrideHostLb> policy, absl::string_view key)
: DelegatingSubchannel(std::move(subchannel)),
key_(key),
policy_(std::move(policy)) {
RefCountedPtr<XdsOverrideHostLb> policy)
: DelegatingSubchannel(std::move(subchannel)), policy_(std::move(policy)) {
auto watcher = std::make_unique<ConnectivityStateWatcher>(WeakRef());
watcher_ = watcher.get();
wrapped_subchannel()->WatchConnectivityState(std::move(watcher));

@ -102,21 +102,18 @@ absl::string_view AllocateStringOnArena(
// Adds the set-cookie header to the server initial metadata if needed.
void MaybeUpdateServerInitialMetadata(
const StatefulSessionMethodParsedConfig::CookieConfig* cookie_config,
bool cluster_changed, absl::string_view host_override,
absl::string_view actual_cluster, ServerMetadata* server_initial_metadata) {
// Get peer string.
Slice* peer_string = server_initial_metadata->get_pointer(PeerString());
if (peer_string == nullptr) {
// No changes, keep the same set-cookie header.
bool cluster_changed, absl::string_view actual_cluster,
absl::string_view cookie_address_list,
XdsOverrideHostAttribute* override_host_attribute,
ServerMetadata* server_initial_metadata) {
// If cookie doesn't need to change, do nothing.
if (cookie_address_list == override_host_attribute->actual_address_list() &&
!cluster_changed) {
return;
}
if (host_override == peer_string->as_string_view() && !cluster_changed) {
return;
}
std::string new_value(peer_string->as_string_view());
if (!actual_cluster.empty()) {
absl::StrAppend(&new_value, ";", actual_cluster);
}
// Construct new cookie value.
std::string new_value = absl::StrCat(
override_host_attribute->actual_address_list(), ";", actual_cluster);
std::vector<std::string> parts = {absl::StrCat(
*cookie_config->name, "=", absl::Base64Escape(new_value), "; HttpOnly")};
if (!cookie_config->path.empty()) {
@ -249,14 +246,16 @@ ArenaPromise<ServerMetadataHandle> StatefulSessionFilter::MakeCallPromise(
// Cookie format is "host;cluster"
std::pair<absl::string_view, absl::string_view> host_cluster =
absl::StrSplit(cookie_value, absl::MaxSplits(';', 1));
absl::string_view host_override;
// Set override host attribute. Allocate the string on the
// arena, so that it has the right lifetime.
absl::string_view cookie_address_list;
// Allocate the string on the arena, so that it has the right lifetime.
if (!host_cluster.first.empty()) {
host_override = AllocateStringOnArena(host_cluster.first);
service_config_call_data->SetCallAttribute(
GetContext<Arena>()->New<XdsOverrideHostAttribute>(host_override));
cookie_address_list = AllocateStringOnArena(host_cluster.first);
}
// Set override host attribute.
auto* override_host_attribute =
GetContext<Arena>()->ManagedNew<XdsOverrideHostAttribute>(
cookie_address_list);
service_config_call_data->SetCallAttribute(override_host_attribute);
// Check if the cluster override is valid, and apply it if necessary.
// Note that cluster_name will point to an arena-allocated string
// that will still be alive when we see the server initial metadata.
@ -267,23 +266,24 @@ ArenaPromise<ServerMetadataHandle> StatefulSessionFilter::MakeCallPromise(
bool cluster_changed = cluster_name != host_cluster.second;
// Intercept server initial metadata.
call_args.server_initial_metadata->InterceptAndMap(
[cookie_config, cluster_changed, host_override,
cluster_name](ServerMetadataHandle md) {
[cookie_config, cluster_changed, cluster_name, cookie_address_list,
override_host_attribute](ServerMetadataHandle md) {
// Add cookie to server initial metadata if needed.
MaybeUpdateServerInitialMetadata(cookie_config, cluster_changed,
host_override, cluster_name, md.get());
cluster_name, cookie_address_list,
override_host_attribute, md.get());
return md;
});
return Map(next_promise_factory(std::move(call_args)),
[cookie_config, cluster_changed, host_override,
cluster_name](ServerMetadataHandle md) {
[cookie_config, cluster_changed, cluster_name, cookie_address_list,
override_host_attribute](ServerMetadataHandle md) {
// If we got a Trailers-Only response, then add the
// cookie to the trailing metadata instead of the
// initial metadata.
if (md->get(GrpcTrailersOnly()).value_or(false)) {
MaybeUpdateServerInitialMetadata(
cookie_config, cluster_changed, host_override,
cluster_name, md.get());
cookie_config, cluster_changed, cluster_name,
cookie_address_list, override_host_attribute, md.get());
}
return md;
});

@ -21,12 +21,15 @@
#include <stddef.h>
#include <utility>
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/gprpp/ref_counted_string.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/service_config/service_config_call_data.h"
@ -34,20 +37,34 @@
namespace grpc_core {
// A call attribute to be passed to the xds_override_host LB policy.
// The StatefulSession filter will populate the cookie's address list,
// if set. The xds_override_host LB policy will use that info, and then
// set the actual address list based on the chosen endpoint. The
// StatefulSession filter will then use the actual address list to
// update the cookie.
class XdsOverrideHostAttribute
: public ServiceConfigCallData::CallAttributeInterface {
public:
static UniqueTypeName TypeName();
explicit XdsOverrideHostAttribute(absl::string_view host_name)
: host_name_(host_name) {}
explicit XdsOverrideHostAttribute(absl::string_view cookie_address_list)
: cookie_address_list_(cookie_address_list) {}
absl::string_view cookie_address_list() const { return cookie_address_list_; }
absl::string_view host_name() const { return host_name_; }
absl::string_view actual_address_list() const {
return actual_address_list_.as_string_view();
}
void set_actual_address_list(RefCountedStringValue actual_address_list) {
actual_address_list_ = std::move(actual_address_list);
}
private:
UniqueTypeName type() const override { return TypeName(); }
absl::string_view host_name_;
absl::string_view cookie_address_list_;
RefCountedStringValue actual_address_list_;
};
// A filter to provide cookie-based stateful session affinity.

@ -95,8 +95,8 @@ namespace testing {
class LoadBalancingPolicyTest : public ::testing::Test {
protected:
using CallAttributes = std::vector<
std::unique_ptr<ServiceConfigCallData::CallAttributeInterface>>;
using CallAttributes =
std::vector<ServiceConfigCallData::CallAttributeInterface*>;
// Channel-level subchannel state for a specific address and channel args.
// This is analogous to the real subchannel in the ClientChannel code.
@ -630,8 +630,8 @@ class LoadBalancingPolicyTest : public ::testing::Test {
class FakeCallState : public ClientChannelLbCallState {
public:
explicit FakeCallState(const CallAttributes& attributes) {
for (const auto& p : attributes) {
attributes_.emplace(p->type(), p.get());
for (const auto& attribute : attributes) {
attributes_.emplace(attribute->type(), attribute);
}
}
@ -1232,17 +1232,26 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// Expects zero or more picker updates, each of which returns
// round-robin picks for the specified set of addresses.
void DrainRoundRobinPickerUpdates(
absl::Span<const absl::string_view> addresses,
SourceLocation location = SourceLocation()) {
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
DrainRoundRobinPickerUpdates(absl::Span<const absl::string_view> addresses,
SourceLocation location = SourceLocation()) {
gpr_log(GPR_INFO, "Draining RR picker updates...");
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
while (!helper_->QueueEmpty()) {
auto update = helper_->GetNextStateUpdate(location);
ASSERT_TRUE(update.has_value());
ASSERT_EQ(update->state, GRPC_CHANNEL_READY);
ExpectRoundRobinPicks(update->picker.get(), addresses);
EXPECT_TRUE(update.has_value())
<< location.file() << ":" << location.line();
if (!update.has_value()) return nullptr;
EXPECT_EQ(update->state, GRPC_CHANNEL_READY)
<< location.file() << ":" << location.line();
if (update->state != GRPC_CHANNEL_READY) return nullptr;
ExpectRoundRobinPicks(update->picker.get(), addresses,
/*call_attributes=*/{}, /*num_iterations=*/3,
location);
picker = std::move(update->picker);
}
gpr_log(GPR_INFO, "Done draining RR picker updates");
return picker;
}
// Expects zero or more CONNECTING updates.

@ -14,6 +14,8 @@
// limitations under the License.
//
#include <stddef.h>
#include <algorithm>
#include <array>
#include <memory>
@ -23,8 +25,12 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "absl/types/optional.h"
#include "absl/types/span.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
@ -34,6 +40,8 @@
#include "src/core/ext/filters/stateful_session/stateful_session_filter.h"
#include "src/core/ext/xds/xds_health_status.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h"
@ -99,12 +107,86 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
EXPECT_EQ(ApplyUpdate(update, lb_policy()), absl::OkStatus());
}
CallAttributes MakeOverrideHostAttribute(absl::string_view host) {
CallAttributes override_host_attributes;
override_host_attributes.emplace_back(
std::make_unique<XdsOverrideHostAttribute>(host));
return override_host_attributes;
struct OverrideHostAttributeStorage {
// Need to store the string externally, since
// XdsOverrideHostAttribute only holds a string_view.
std::string address_list;
XdsOverrideHostAttribute attribute;
explicit OverrideHostAttributeStorage(std::string addresses)
: address_list(std::move(addresses)), attribute(address_list) {}
};
XdsOverrideHostAttribute* MakeOverrideHostAttribute(
absl::Span<const absl::string_view> addresses) {
std::vector<absl::string_view> address_list;
address_list.reserve(addresses.size());
for (absl::string_view address : addresses) {
address_list.emplace_back(absl::StripPrefix(address, "ipv4:"));
}
attribute_storage_.emplace_back(
std::make_unique<OverrideHostAttributeStorage>(
absl::StrJoin(address_list, ",")));
return &attribute_storage_.back()->attribute;
}
XdsOverrideHostAttribute* MakeOverrideHostAttribute(
absl::string_view address) {
const std::array<absl::string_view, 1> addresses = {address};
return MakeOverrideHostAttribute(addresses);
}
void ExpectOverridePicks(
LoadBalancingPolicy::SubchannelPicker* picker,
XdsOverrideHostAttribute* attribute, absl::string_view expected,
absl::Span<const absl::string_view> expected_address_list = {},
SourceLocation location = SourceLocation()) {
std::array<absl::string_view, 1> kArray = {expected};
if (expected_address_list.empty()) expected_address_list = kArray;
std::vector<absl::string_view> expected_addresses;
expected_addresses.reserve(expected_address_list.size());
for (absl::string_view address : expected_address_list) {
expected_addresses.push_back(absl::StripPrefix(address, "ipv4:"));
}
std::string expected_addresses_str = absl::StrJoin(expected_addresses, ",");
for (size_t i = 0; i < 3; ++i) {
EXPECT_EQ(
ExpectPickComplete(picker, {attribute},
/*subchannel_call_tracker=*/nullptr, location),
expected)
<< location.file() << ":" << location.line();
EXPECT_EQ(attribute->actual_address_list(), expected_addresses_str)
<< "Expected: " << attribute->actual_address_list() << "\n"
<< " Actual: " << expected_addresses_str << "\n"
<< location.file() << ":" << location.line();
}
}
void ExpectRoundRobinPicksWithAttribute(
LoadBalancingPolicy::SubchannelPicker* picker,
XdsOverrideHostAttribute* attribute,
absl::Span<const absl::string_view> expected,
SourceLocation location = SourceLocation()) {
std::vector<std::string> actual_picks;
for (size_t i = 0; i < expected.size(); ++i) {
auto address = ExpectPickComplete(
picker, {attribute}, /*subchannel_call_tracker=*/nullptr, location);
ASSERT_TRUE(address.has_value())
<< location.file() << ":" << location.line();
EXPECT_THAT(*address, ::testing::AnyOfArray(expected))
<< location.file() << ":" << location.line();
EXPECT_EQ(attribute->actual_address_list(),
absl::StripPrefix(*address, "ipv4:"))
<< "Expected: " << attribute->actual_address_list() << "\n"
<< " Actual: " << absl::StripPrefix(*address, "ipv4:") << "\n"
<< location.file() << ":" << location.line();
actual_picks.push_back(std::move(*address));
}
EXPECT_TRUE(PicksAreRoundRobin(expected, actual_picks))
<< location.file() << ":" << location.line();
}
std::vector<std::unique_ptr<OverrideHostAttributeStorage>> attribute_storage_;
};
TEST_F(XdsOverrideHostTest, DelegatesToChild) {
@ -121,34 +203,23 @@ TEST_F(XdsOverrideHostTest, NoConfigReportsError) {
}
TEST_F(XdsOverrideHostTest, OverrideHost) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto picker = ExpectStartupWithRoundRobin(kAddresses);
ASSERT_NE(picker, nullptr);
// Check that the host is overridden
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[1])),
kAddresses[1]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[1])),
kAddresses[1]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[0])),
kAddresses[0]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[0])),
kAddresses[0]);
auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
auto* address0_attribute = MakeOverrideHostAttribute(kAddresses[0]);
ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
}
TEST_F(XdsOverrideHostTest, SubchannelNotFound) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto picker = ExpectStartupWithRoundRobin(kAddresses);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), kAddresses,
MakeOverrideHostAttribute("no such host"));
auto* attribute = MakeOverrideHostAttribute("no such host");
ExpectRoundRobinPicksWithAttribute(picker.get(), attribute, kAddresses);
}
TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) {
@ -156,118 +227,77 @@ TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto picker = ExpectStartupWithRoundRobin(kAddresses);
ASSERT_NE(picker, nullptr);
// Check that the host is overridden
ExpectRoundRobinPicks(picker.get(), {kAddresses[1]},
MakeOverrideHostAttribute(kAddresses[1]));
// Some other address is gone
EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[0], kAddresses[1]},
MakeXdsOverrideHostConfig()),
lb_policy()),
absl::OkStatus());
// Wait for LB policy to return a new picker that uses the updated
// addresses. We can't use the host override for this, because then
// we won't know when the new picker is actually using all of the new
// addresses.
picker =
WaitForRoundRobinListChange(kAddresses, {kAddresses[0], kAddresses[1]});
// Make sure host override still works.
ExpectRoundRobinPicks(picker.get(), {kAddresses[1]},
MakeOverrideHostAttribute(kAddresses[1]));
// "Our" address is gone so others get returned in round-robin order
// Check that the host override works.
auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
// The override address is removed.
EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[0], kAddresses[2]},
MakeXdsOverrideHostConfig()),
lb_policy()),
absl::OkStatus());
// Wait for LB policy to return the new picker.
// In this case, we can pass call_attributes while we wait instead of
// checking again afterward, because the host override won't actually
// be used.
WaitForRoundRobinListChange({kAddresses[0], kAddresses[1]},
{kAddresses[0], kAddresses[2]},
MakeOverrideHostAttribute(kAddresses[1]));
// And now it is back
picker =
WaitForRoundRobinListChange(kAddresses, {kAddresses[0], kAddresses[2]});
// Picks are returned in round-robin order, because the address
// pointed to by the cookie is not present.
ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
{kAddresses[0], kAddresses[2]});
// The override address comes back.
EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[1], kAddresses[2]},
MakeXdsOverrideHostConfig()),
lb_policy()),
absl::OkStatus());
// Wait for LB policy to return the new picker.
picker = WaitForRoundRobinListChange({kAddresses[0], kAddresses[2]},
{kAddresses[1], kAddresses[2]});
// Make sure host override works.
ExpectRoundRobinPicks(picker.get(), {kAddresses[1]},
MakeOverrideHostAttribute(kAddresses[1]));
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
}
TEST_F(XdsOverrideHostTest, FailedSubchannelIsNotPicked) {
// Send address list to LB policy.
TEST_F(XdsOverrideHostTest,
OverrideIsQueuedInIdleOrConnectingAndFailedInTransientFailure) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto picker = ExpectStartupWithRoundRobin(kAddresses);
ASSERT_NE(picker, nullptr);
// Check that the host is overridden
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[1])),
kAddresses[1]);
auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
// Subchannel for address 1 becomes disconnected.
gpr_log(GPR_INFO, "### subchannel 1 reporting IDLE");
auto subchannel = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel, nullptr);
subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
EXPECT_TRUE(subchannel->ConnectionRequested());
gpr_log(GPR_INFO, "### expecting re-resolution request");
ExpectReresolutionRequest();
gpr_log(GPR_INFO,
"### expecting RR picks to exclude the disconnected subchannel");
ExpectRoundRobinPicks(ExpectState(GRPC_CHANNEL_READY).get(),
{kAddresses[0], kAddresses[2]});
// It starts trying to reconnect...
picker =
WaitForRoundRobinListChange(kAddresses, {kAddresses[0], kAddresses[2]});
// Picks with the override will be queued.
ExpectPickQueued(picker.get(), {address1_attribute});
// The subchannel starts trying to reconnect.
gpr_log(GPR_INFO, "### subchannel 1 reporting CONNECTING");
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
gpr_log(GPR_INFO, "### expecting RR picks again");
ExpectRoundRobinPicks(ExpectState(GRPC_CHANNEL_READY).get(),
{kAddresses[0], kAddresses[2]});
// ...but the connection attempt fails.
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
// Picks with the override will still be queued.
ExpectPickQueued(picker.get(), {address1_attribute});
// The connection attempt fails.
gpr_log(GPR_INFO, "### subchannel 1 reporting TRANSIENT_FAILURE");
subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::ResourceExhaustedError("Hmmmm"));
gpr_log(GPR_INFO, "### expecting re-resolution request");
ExpectReresolutionRequest();
picker = ExpectState(GRPC_CHANNEL_READY);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
// The host override is not used.
gpr_log(GPR_INFO, "### checking that host override is not used");
picker = ExpectState(GRPC_CHANNEL_READY);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]},
MakeOverrideHostAttribute(kAddresses[1]));
}
TEST_F(XdsOverrideHostTest, ConnectingSubchannelIsQueued) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto picker = ExpectStartupWithRoundRobin(kAddresses);
ASSERT_NE(picker, nullptr);
// Check that the host is overridden
EXPECT_EQ(ExpectPickComplete(picker.get(),
{
MakeOverrideHostAttribute(kAddresses[1]),
}),
kAddresses[1]);
auto subchannel = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel, nullptr);
subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
ExpectReresolutionRequest();
EXPECT_TRUE(subchannel->ConnectionRequested());
picker = ExpectState(GRPC_CHANNEL_READY);
ExpectPickQueued(picker.get(), {
MakeOverrideHostAttribute(kAddresses[1]),
});
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
picker = ExpectState(GRPC_CHANNEL_READY);
ExpectPickQueued(picker.get(), {
MakeOverrideHostAttribute(kAddresses[1]),
});
ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
{kAddresses[0], kAddresses[2]});
}
TEST_F(XdsOverrideHostTest, DrainingState) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr);
@ -277,33 +307,31 @@ TEST_F(XdsOverrideHostTest, DrainingState) {
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
auto picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
// Picks without an override will round-robin over the two endpoints
// that are not in draining state.
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
ExpectQueueEmpty();
// Draining subchannel is returned
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[1])),
kAddresses[1]);
// Picks with an override are able to select the draining endpoint.
auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
// Send the LB policy an update that removes the draining endpoint.
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
// Gone!
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]},
MakeOverrideHostAttribute(kAddresses[1]));
ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
{kAddresses[0], kAddresses[2]});
}
TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto picker = ExpectStartupWithRoundRobin(kAddresses);
ASSERT_NE(picker, nullptr);
// Check that the host is overridden
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[1])),
kAddresses[1]);
auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
// Send an update that marks the endpoints with different EDS health
// states, but those states are present in override_host_status.
// The picker should use the DRAINING host when a call's override
@ -318,9 +346,7 @@ TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) {
auto subchannel = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel, nullptr);
picker = ExpectState(GRPC_CHANNEL_READY);
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[1])),
kAddresses[1]);
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
// Now the connection to the draining host gets dropped.
// The picker should queue picks where the override host is IDLE.
@ -328,7 +354,7 @@ TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) {
gpr_log(GPR_INFO, "### closing connection to DRAINING host");
subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
picker = ExpectState(GRPC_CHANNEL_READY);
ExpectPickQueued(picker.get(), MakeOverrideHostAttribute(kAddresses[1]));
ExpectPickQueued(picker.get(), {address1_attribute});
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
// The subchannel should have been asked to reconnect as a result of the
// queued pick above. It will therefore transition into state CONNECTING.
@ -341,7 +367,7 @@ TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) {
ExpectQueueEmpty();
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
picker = ExpectState(GRPC_CHANNEL_READY);
ExpectPickQueued(picker.get(), MakeOverrideHostAttribute(kAddresses[1]));
ExpectPickQueued(picker.get(), {address1_attribute});
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
// The subchannel now becomes connected again.
// Now picks with this override host can be completed again.
@ -349,14 +375,11 @@ TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) {
gpr_log(GPR_INFO, "### subchannel becomes reconnected");
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
picker = ExpectState(GRPC_CHANNEL_READY);
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[1])),
kAddresses[1]);
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
}
TEST_F(XdsOverrideHostTest, DrainingToHealthy) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr);
@ -366,25 +389,17 @@ TEST_F(XdsOverrideHostTest, DrainingToHealthy) {
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
auto picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
ExpectQueueEmpty();
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[1])),
kAddresses[1]);
auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[1])),
kAddresses[1]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[1])),
kAddresses[1]);
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
ExpectRoundRobinPicks(picker.get(), kAddresses);
}
TEST_F(XdsOverrideHostTest, OverrideHostStatus) {
@ -399,16 +414,13 @@ TEST_F(XdsOverrideHostTest, OverrideHostStatus) {
auto picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[0])),
kAddresses[0]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[1])),
kAddresses[1]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[2])),
kAddresses[2]);
// UNKNOWN excluded - first chanel does not get overridden
auto* address0_attribute = MakeOverrideHostAttribute(kAddresses[0]);
ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
auto* address2_attribute = MakeOverrideHostAttribute(kAddresses[2]);
ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
// UNKNOWN excluded: overrides for first endpoint are not honored.
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
@ -417,32 +429,24 @@ TEST_F(XdsOverrideHostTest, OverrideHostStatus) {
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]},
MakeOverrideHostAttribute(kAddresses[0]));
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[1])),
kAddresses[1]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[2])),
kAddresses[2]);
// HEALTHY excluded - second chanel does not get overridden
ExpectRoundRobinPicksWithAttribute(picker.get(), address0_attribute,
{kAddresses[0], kAddresses[1]});
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
// HEALTHY excluded: overrides for second endpoint are not honored.
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}},
{"UNKNOWN", "HEALTHY"});
{"UNKNOWN", "DRAINING"});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[0])),
kAddresses[0]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[1])),
kAddresses[1]);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]},
MakeOverrideHostAttribute(kAddresses[2]));
// DRAINING excluded - third chanel does not get overridden
ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
{kAddresses[0], kAddresses[1]});
ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
// DRAINING excluded: overrides for third endpoint are not honored.
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
@ -451,14 +455,49 @@ TEST_F(XdsOverrideHostTest, OverrideHostStatus) {
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[0])),
kAddresses[0]);
EXPECT_EQ(ExpectPickComplete(picker.get(),
MakeOverrideHostAttribute(kAddresses[1])),
kAddresses[1]);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]},
MakeOverrideHostAttribute(kAddresses[2]));
ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
ExpectRoundRobinPicksWithAttribute(picker.get(), address2_attribute,
{kAddresses[0], kAddresses[1]});
}
TEST_F(XdsOverrideHostTest, MultipleAddressesPerEndpoint) {
if (!IsRoundRobinDelegateToPickFirstEnabled()) return;
constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
"ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
constexpr std::array<absl::string_view, 2> kEndpoint3Addresses = {
"ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
const std::array<EndpointAddresses, 3> kEndpoints = {
MakeEndpointAddresses(kEndpoint1Addresses),
MakeEndpointAddresses(kEndpoint2Addresses),
MakeEndpointAddresses(kEndpoint3Addresses)};
EXPECT_EQ(ApplyUpdate(BuildUpdate(kEndpoints, MakeXdsOverrideHostConfig()),
lb_policy()),
absl::OkStatus());
auto picker = ExpectRoundRobinStartup(kEndpoints);
ASSERT_NE(picker, nullptr);
// Check that the host is overridden.
auto* endpoint1_attribute = MakeOverrideHostAttribute(kEndpoint1Addresses);
ExpectOverridePicks(picker.get(), endpoint1_attribute, kEndpoint1Addresses[0],
kEndpoint1Addresses);
auto* endpoint2_attribute = MakeOverrideHostAttribute(kEndpoint2Addresses);
ExpectOverridePicks(picker.get(), endpoint2_attribute, kEndpoint2Addresses[0],
kEndpoint2Addresses);
// Change endpoint 1 to connect to its second address.
ExpectEndpointAddressChange(kEndpoint1Addresses, 0, 1, [&]() {
WaitForRoundRobinListChange(
{kEndpoint1Addresses[0], kEndpoint2Addresses[0],
kEndpoint3Addresses[0]},
{kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
});
WaitForRoundRobinListChange(
{kEndpoint2Addresses[0], kEndpoint3Addresses[0]},
{kEndpoint1Addresses[1], kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
// Now the cookie for endpoint 1 should cause us to use the second address.
ExpectOverridePicks(picker.get(), endpoint1_attribute, kEndpoint1Addresses[1],
{kEndpoint1Addresses[1], kEndpoint1Addresses[0]});
}
} // namespace

@ -444,6 +444,7 @@ grpc_cc_test(
"//src/proto/grpc/testing/xds/v3:stateful_session_cookie_proto",
"//src/proto/grpc/testing/xds/v3:stateful_session_proto",
"//test/core/util:grpc_test_util",
"//test/core/util:scoped_env_var",
],
)

@ -23,9 +23,11 @@
#include "absl/strings/str_split.h"
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/time.h"
#include "src/proto/grpc/testing/xds/v3/stateful_session.pb.h"
#include "src/proto/grpc/testing/xds/v3/stateful_session_cookie.pb.h"
#include "test/core/util/scoped_env_var.h"
#include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
namespace grpc {
@ -82,8 +84,14 @@ class OverrideHostTest : public XdsEnd2endTest {
std::vector<Cookie> values;
auto pair = server_initial_metadata.equal_range("set-cookie");
for (auto it = pair.first; it != pair.second; ++it) {
gpr_log(GPR_INFO, "set-cookie header: %s",
std::string(it->second).c_str());
std::pair<absl::string_view, absl::string_view> key_value =
absl::StrSplit(it->second, '=');
std::pair<absl::string_view, absl::string_view> key_value2 =
absl::StrSplit(key_value.second, ';');
std::string decoded;
EXPECT_TRUE(absl::Base64Unescape(key_value2.first, &decoded));
gpr_log(GPR_INFO, "set-cookie header: %s (decoded: %s)",
std::string(it->second).c_str(), decoded.c_str());
values.emplace_back(ParseCookie(it->second));
EXPECT_FALSE(values.back().value.empty());
EXPECT_THAT(values.back().attributes, ::testing::Contains("HttpOnly"));
@ -609,6 +617,58 @@ TEST_P(OverrideHostTest, TTLSetsMaxAge) {
::testing::UnorderedElementsAre("Max-Age=42", "HttpOnly"));
}
TEST_P(OverrideHostTest, MultipleAddressesPerEndpoint) {
if (!grpc_core::IsRoundRobinDelegateToPickFirstEnabled()) return;
grpc_core::testing::ScopedExperimentalEnvVar env(
"GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
// Create 3 backends, but leave backend 0 unstarted.
CreateBackends(3);
StartBackend(1);
StartBackend(2);
SetListenerAndRouteConfiguration(balancer_.get(),
BuildListenerWithStatefulSessionFilter(),
default_route_config_);
balancer_->ads_service()->SetEdsResource(BuildEdsResource(
EdsResourceArgs({{"locality0",
{CreateEndpoint(0, HealthStatus::HEALTHY, 1, {1}),
CreateEndpoint(2, HealthStatus::UNKNOWN)}}})));
WaitForAllBackends(DEBUG_LOCATION, 1); // Wait for backends 1 and 2.
// Requests without a cookie should get round-robin across backends 1 and 2.
CheckRpcSendOk(DEBUG_LOCATION, 10);
EXPECT_EQ(backends_[0]->backend_service()->request_count(), 0);
EXPECT_EQ(backends_[1]->backend_service()->request_count(), 5);
EXPECT_EQ(backends_[2]->backend_service()->request_count(), 5);
ResetBackendCounters();
// Get cookie for backend 1.
auto cookies = GetCookiesForBackend(DEBUG_LOCATION, 1);
EXPECT_THAT(cookies,
::testing::ElementsAre(::testing::AllOf(
::testing::Field("name", &Cookie::name, kCookieName),
::testing::Field("attributes", &Cookie::attributes,
::testing::ElementsAre("HttpOnly")),
::testing::Field("value", &Cookie::value,
::testing::Not(::testing::IsEmpty())))));
// Requests with the cookie always go to the same backend.
CheckRpcSendOk(DEBUG_LOCATION, 5,
RpcOptions().set_metadata({cookies.front().Header()}));
EXPECT_EQ(backends_[1]->backend_service()->request_count(), 5);
// Now start backend 0 and stop backend 1.
StartBackend(0);
ShutdownBackend(1);
// Wait for traffic to go to backend 0.
WaitForBackend(DEBUG_LOCATION, 0);
// Requests with no cookie should get round-robin across backends 0 and 2.
CheckRpcSendOk(DEBUG_LOCATION, 10);
EXPECT_EQ(backends_[0]->backend_service()->request_count(), 5);
EXPECT_EQ(backends_[1]->backend_service()->request_count(), 0);
EXPECT_EQ(backends_[2]->backend_service()->request_count(), 5);
ResetBackendCounters();
// Requests with the same cookie should now go to backend 0.
CheckRpcSendOk(DEBUG_LOCATION, 5,
RpcOptions().set_metadata({cookies.front().Header()}));
EXPECT_EQ(backends_[0]->backend_service()->request_count(), 5);
}
} // namespace
} // namespace testing
} // namespace grpc

Loading…
Cancel
Save