[SSA] change xds_override_host policy to manage subchannels based on last-used time rather than EDS health state (#35397)

Part of the work needed for in-progress gRFC A75 (https://github.com/grpc/proposal/pull/405).

Closes #35397

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35397 from markdroth:xds_ssa_subchannel_management_revamp 8902deafad
PiperOrigin-RevId: 597288930
pull/35313/head^2
Mark D. Roth 1 year ago committed by Copybara-Service
parent 55c13844d9
commit 6a4b5ccea3
  1. 1
      src/core/BUILD
  2. 16
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  3. 568
      src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc
  4. 4
      src/core/lib/transport/connectivity_state.h
  5. 28
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  6. 315
      test/core/client_channel/lb_policy/xds_override_host_test.cc

@ -5417,6 +5417,7 @@ grpc_cc_library(
"//:grpc_client_channel",
"//:grpc_trace",
"//:orphanable",
"//:parse_address",
"//:ref_counted_ptr",
"//:sockaddr_utils",
"//:work_serializer",

@ -747,15 +747,13 @@ void RoundRobin::RoundRobinEndpointList::RoundRobinEndpoint::OnStateUpdate(
auto* rr_endpoint_list = endpoint_list<RoundRobinEndpointList>();
auto* round_robin = policy<RoundRobin>();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
"[RR %p] connectivity changed for child %p, endpoint_list %p "
"(index %" PRIuPTR " of %" PRIuPTR
"): prev_state=%s new_state=%s "
"(%s)",
round_robin, this, rr_endpoint_list, Index(),
rr_endpoint_list->size(),
(old_state.has_value() ? ConnectivityStateName(*old_state) : "N/A"),
ConnectivityStateName(new_state), status.ToString().c_str());
gpr_log(
GPR_INFO,
"[RR %p] connectivity changed for child %p, endpoint_list %p "
"(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s (%s)",
round_robin, this, rr_endpoint_list, Index(), rr_endpoint_list->size(),
(old_state.has_value() ? ConnectivityStateName(*old_state) : "N/A"),
ConnectivityStateName(new_state), status.ToString().c_str());
}
if (new_state == GRPC_CHANNEL_IDLE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {

@ -43,6 +43,7 @@
#include "absl/types/span.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h>
@ -51,6 +52,7 @@
#include "src/core/ext/filters/client_channel/resolver/xds/xds_dependency_manager.h"
#include "src/core/ext/filters/stateful_session/stateful_session_filter.h"
#include "src/core/ext/xds/xds_health_status.h"
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
@ -82,6 +84,9 @@
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
using ::grpc_event_engine::experimental::EventEngine;
TraceFlag grpc_lb_xds_override_host_trace(false, "xds_override_host_lb");
namespace {
@ -102,13 +107,6 @@ struct PtrLessThan {
}
};
XdsHealthStatus GetEndpointHealthStatus(const EndpointAddresses& endpoint) {
return XdsHealthStatus(static_cast<XdsHealthStatus::HealthStatus>(
endpoint.args()
.GetInt(GRPC_ARG_XDS_HEALTH_STATUS)
.value_or(XdsHealthStatus::HealthStatus::kUnknown)));
}
//
// xds_override_host LB policy
//
@ -150,8 +148,20 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
return subchannel_entry_->address_list();
}
void set_last_used_time()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
subchannel_entry_->set_last_used_time();
}
XdsOverrideHostLb* policy() const { return policy_.get(); }
RefCountedPtr<SubchannelWrapper> Clone() const {
auto subchannel =
MakeRefCounted<SubchannelWrapper>(wrapped_subchannel(), policy_);
subchannel->set_subchannel_entry(subchannel_entry_);
return subchannel;
}
private:
class ConnectivityStateWatcher : public ConnectivityStateWatcherInterface {
public:
@ -185,26 +195,74 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
watchers_;
};
// An entry in the subchannel map.
//
// The entry may hold either an owned (RefCountedPtr<>) or unowned
// (raw pointer) SubchannelWrapper, but not both. It will be unowned
// in the case where the SubchannelWrapper is owned by the child policy.
// It will be owned in the case where the child policy has not created a
// subchannel but we have RPCs whose cookies point to that address.
//
// Note that when a SubchannelWrapper is orphaned, it will try to
// acquire the lock to remove itself from the entry. This means that
// whenever we need to remove an owned subchannel from an entry, if we
// released our ref to the SubchannelWrapper immediately, we would
// cause a deadlock, since our caller is already holding the lock. To
// avoid that, any method that may result in releasing a ref to the
// SubchannelWrapper will instead return that ref to the caller, who is
// responsible for releasing the ref after releasing the lock.
class SubchannelEntry : public RefCounted<SubchannelEntry> {
public:
using SubchannelPtr =
absl::variant<SubchannelWrapper*, RefCountedPtr<SubchannelWrapper>>;
bool HasOwnedSubchannel() const
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
auto* sc = absl::get_if<RefCountedPtr<SubchannelWrapper>>(&subchannel_);
return sc != nullptr && *sc != nullptr;
}
explicit SubchannelEntry(XdsHealthStatus eds_health_status)
: eds_health_status_(eds_health_status) {}
// Sets the unowned subchannel. If the entry previously had an
// owned subchannel, returns the ref to it.
RefCountedPtr<SubchannelWrapper> SetUnownedSubchannel(
SubchannelWrapper* subchannel)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
RefCountedPtr<SubchannelWrapper> TakeSubchannelRef()
// Sets the owned subchannel. Must not be called if the entry
// already has an owned subchannel.
void SetOwnedSubchannel(RefCountedPtr<SubchannelWrapper> subchannel)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
return MatchMutable(
&subchannel_,
[](SubchannelWrapper**) -> RefCountedPtr<SubchannelWrapper> {
return nullptr;
},
[](RefCountedPtr<SubchannelWrapper>* subchannel) {
return std::move(*subchannel);
});
GPR_DEBUG_ASSERT(!HasOwnedSubchannel());
subchannel_ = std::move(subchannel);
}
// Returns a pointer to the subchannel, regardless of whether it's
// owned or not.
SubchannelWrapper* GetSubchannel() const
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
// Returns a ref to the subchannel, regardless of whether it's owned
// or not. Returns null if there is no subchannel or if the
// subchannel's ref count is 0.
RefCountedPtr<SubchannelWrapper> GetSubchannelRef() const
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
// If the entry has an owned subchannel, moves it out of the entry
// and returns it.
RefCountedPtr<SubchannelWrapper> TakeOwnedSubchannel()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
// Unsets the entry's subchannel.
// If the entry had an owned subchannel, moves the ref into
// owned_subchannels.
void UnsetSubchannel(
std::vector<RefCountedPtr<SubchannelWrapper>>* owned_subchannels)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
// Called when a SubchannelWrapper is orphaned. May replace the
// unowned SubchannelWrapper with an owned one based on
// last_used_time_ and connection_idle_timeout.
void OnSubchannelWrapperOrphan(SubchannelWrapper* wrapper,
Duration connection_idle_timeout)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
grpc_connectivity_state connectivity_state() const
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
return connectivity_state_;
@ -214,77 +272,44 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
connectivity_state_ = state;
}
void SetSubchannel(SubchannelWrapper* subchannel)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
if (eds_health_status_.status() == XdsHealthStatus::kDraining) {
subchannel_ = subchannel->RefAsSubclass<SubchannelWrapper>();
} else {
subchannel_ = subchannel;
}
}
void UnsetSubchannel(SubchannelWrapper* wrapper)
XdsHealthStatus eds_health_status() const
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
if (GetSubchannel() == wrapper) subchannel_ = nullptr;
return eds_health_status_;
}
SubchannelWrapper* GetSubchannel() const
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
return Match(
subchannel_, [](SubchannelWrapper* subchannel) { return subchannel; },
[](const RefCountedPtr<SubchannelWrapper>& subchannel) {
return subchannel.get();
});
}
// Returns the previously held strong ref, if any, which the caller
// will need to release after releasing the lock, because if this is
// the last strong ref, we need to avoid deadlock caused by
// SubchannelWrapper::Orphan() re-acquiring the lock.
RefCountedPtr<SubchannelWrapper> SetEdsHealthStatus(
XdsHealthStatus eds_health_status)
void set_eds_health_status(XdsHealthStatus eds_health_status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
if (eds_health_status_ == eds_health_status) return nullptr;
eds_health_status_ = eds_health_status;
// TODO(roth): Change this to use the gprpp MatchMutable() function
// once we can do that without breaking lock annotations.
auto* raw_ptr = absl::get_if<SubchannelWrapper*>(&subchannel_);
if (raw_ptr != nullptr) {
if (eds_health_status_.status() == XdsHealthStatus::kDraining &&
*raw_ptr != nullptr) {
subchannel_ =
(*raw_ptr)->RefIfNonZero().TakeAsSubclass<SubchannelWrapper>();
}
return nullptr;
}
auto strong_ref =
std::move(absl::get<RefCountedPtr<SubchannelWrapper>>(subchannel_));
subchannel_ = strong_ref.get();
return strong_ref;
}
XdsHealthStatus eds_health_status() const
RefCountedStringValue address_list() const
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
return eds_health_status_;
return address_list_;
}
void set_address_list(RefCountedStringValue address_list)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
address_list_ = std::move(address_list);
}
RefCountedStringValue address_list() const
Timestamp last_used_time() const
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
return address_list_;
return last_used_time_;
}
void set_last_used_time()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
last_used_time_ = Timestamp::Now();
}
private:
grpc_connectivity_state connectivity_state_
ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_) = GRPC_CHANNEL_IDLE;
SubchannelPtr subchannel_ ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_);
XdsHealthStatus eds_health_status_ ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_);
absl::variant<SubchannelWrapper*, RefCountedPtr<SubchannelWrapper>>
subchannel_ ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_);
XdsHealthStatus eds_health_status_ ABSL_GUARDED_BY(
&XdsOverrideHostLb::mu_) = XdsHealthStatus(XdsHealthStatus::kUnknown);
RefCountedStringValue address_list_
ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_);
Timestamp last_used_time_ ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_) =
Timestamp::InfPast();
};
// A picker that wraps the picker from the child for cases when cookie is
@ -304,8 +329,8 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
RefCountedPtr<SubchannelWrapper> subchannel)
: subchannel_(std::move(subchannel)) {
GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
// Hop into ExecCtx, so that we're not holding the data plane mutex
// while we run control-plane code.
// Hop into ExecCtx, so that we don't get stuck running
// arbitrary WorkSerializer callbacks while doing a pick.
ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
}
@ -324,6 +349,33 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
grpc_closure closure_;
};
class SubchannelCreationRequester {
public:
SubchannelCreationRequester(RefCountedPtr<XdsOverrideHostLb> policy,
absl::string_view address)
: policy_(std::move(policy)), address_(address) {
GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
// Hop into ExecCtx, so that we don't get stuck running
// arbitrary WorkSerializer callbacks while doing a pick.
ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
}
private:
static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) {
auto* self = static_cast<SubchannelCreationRequester*>(arg);
self->policy_->work_serializer()->Run(
[self]() {
self->policy_->CreateSubchannelForAddress(self->address_);
delete self;
},
DEBUG_LOCATION);
}
RefCountedPtr<XdsOverrideHostLb> policy_;
std::string address_;
grpc_closure closure_;
};
absl::optional<LoadBalancingPolicy::PickResult> PickOverridenHost(
XdsOverrideHostAttribute* override_host_attr) const;
@ -346,6 +398,19 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
RefCountedPtr<SubchannelPicker> picker) override;
};
class IdleTimer : public InternallyRefCounted<IdleTimer> {
public:
IdleTimer(RefCountedPtr<XdsOverrideHostLb> policy, Duration duration);
void Orphan() override;
private:
void OnTimerLocked();
RefCountedPtr<XdsOverrideHostLb> policy_;
absl::optional<EventEngine::TaskHandle> timer_handle_;
};
~XdsOverrideHostLb() override;
void ShutdownLocked() override;
@ -364,8 +429,14 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
const grpc_resolved_address& address,
RefCountedPtr<SubchannelInterface> subchannel);
// Current config from the resolver.
void CreateSubchannelForAddress(absl::string_view address);
void CleanupSubchannels();
// State from most recent resolver update.
ChannelArgs args_;
XdsHealthStatusSet override_host_status_set_;
Duration connection_idle_timeout_;
// Internal state.
bool shutting_down_ = false;
@ -379,6 +450,9 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
Mutex mu_;
std::map<std::string, RefCountedPtr<SubchannelEntry>, std::less<>>
subchannel_map_ ABSL_GUARDED_BY(mu_);
// Timer handle for periodic subchannel sweep.
OrphanablePtr<IdleTimer> idle_timer_;
};
//
@ -405,26 +479,14 @@ XdsOverrideHostLb::Picker::PickOverridenHost(
auto cookie_address_list = override_host_attr->cookie_address_list();
if (cookie_address_list.empty()) return absl::nullopt;
// The cookie has an address list, so look through the addresses in order.
absl::string_view address_with_no_subchannel;
RefCountedPtr<SubchannelWrapper> idle_subchannel;
bool found_connecting = false;
{
MutexLock lock(&policy_->mu_);
for (absl::string_view address : absl::StrSplit(cookie_address_list, ',')) {
RefCountedPtr<SubchannelWrapper> subchannel;
auto it = policy_->subchannel_map_.find(address);
if (it != policy_->subchannel_map_.end()) {
auto* sc = it->second->GetSubchannel();
if (sc != nullptr) {
subchannel = sc->RefIfNonZero().TakeAsSubclass<SubchannelWrapper>();
}
}
if (subchannel == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "Subchannel %s was not found",
std::string(address).c_str());
}
continue;
}
if (it == policy_->subchannel_map_.end()) continue;
if (!override_host_health_status_set_.Contains(
it->second->eds_health_status())) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
@ -435,6 +497,17 @@ XdsOverrideHostLb::Picker::PickOverridenHost(
}
continue;
}
auto subchannel = it->second->GetSubchannelRef();
if (subchannel == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "No subchannel for %s",
std::string(address).c_str());
}
if (address_with_no_subchannel.empty()) {
address_with_no_subchannel = it->first;
}
continue;
}
auto connectivity_state = it->second->connectivity_state();
if (connectivity_state == GRPC_CHANNEL_READY) {
// Found a READY subchannel. Pass back the actual address list
@ -443,6 +516,7 @@ XdsOverrideHostLb::Picker::PickOverridenHost(
gpr_log(GPR_INFO, "Picker override found READY subchannel %s",
std::string(address).c_str());
}
it->second->set_last_used_time();
override_host_attr->set_actual_address_list(it->second->address_list());
return PickResult::Complete(subchannel->wrapped_subchannel());
} else if (connectivity_state == GRPC_CHANNEL_IDLE) {
@ -470,7 +544,26 @@ XdsOverrideHostLb::Picker::PickOverridenHost(
}
return PickResult::Queue();
}
// No READY, IDLE, or CONNECTING subchannels found.
// No READY, IDLE, or CONNECTING subchannels found. If we found an
// entry that has no subchannel, then queue the pick and trigger
// creation of a subchannel for that entry.
if (!address_with_no_subchannel.empty()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "Picker override found entry with no subchannel");
}
if (!IsWorkSerializerDispatchEnabled()) {
new SubchannelCreationRequester(policy_, address_with_no_subchannel);
} else {
policy_->work_serializer()->Run(
[policy = policy_,
address = std::string(address_with_no_subchannel)]() {
policy->CreateSubchannelForAddress(address);
},
DEBUG_LOCATION);
}
return PickResult::Queue();
}
// No entry found that was not in TRANSIENT_FAILURE.
return absl::nullopt;
}
@ -498,6 +591,7 @@ LoadBalancingPolicy::PickResult XdsOverrideHostLb::Picker::Pick(PickArgs args) {
// the StatefulSession filter can set the cookie.
if (override_host_attr != nullptr) {
MutexLock lock(&wrapper->policy()->mu_);
wrapper->set_last_used_time();
override_host_attr->set_actual_address_list(wrapper->address_list());
}
// Unwrap the subchannel.
@ -506,6 +600,56 @@ LoadBalancingPolicy::PickResult XdsOverrideHostLb::Picker::Pick(PickArgs args) {
return result;
}
//
// XdsOverrideHostLb::IdleTimer
//
XdsOverrideHostLb::IdleTimer::IdleTimer(RefCountedPtr<XdsOverrideHostLb> policy,
Duration duration)
: policy_(std::move(policy)) {
// Min time between timer runs is 5s so that we don't kill ourselves
// with lock contention and CPU usage due to sweeps over the map.
duration = std::max(duration, Duration::Seconds(5));
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] idle timer %p: subchannel cleanup "
"pass will run in %s",
policy_.get(), this, duration.ToString().c_str());
}
timer_handle_ = policy_->channel_control_helper()->GetEventEngine()->RunAfter(
duration, [self = RefAsSubclass<IdleTimer>()]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto self_ptr = self.get();
self_ptr->policy_->work_serializer()->Run(
[self = std::move(self)]() { self->OnTimerLocked(); },
DEBUG_LOCATION);
});
}
void XdsOverrideHostLb::IdleTimer::Orphan() {
if (timer_handle_.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] idle timer %p: cancelling",
policy_.get(), this);
}
policy_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
timer_handle_.reset();
}
Unref();
}
void XdsOverrideHostLb::IdleTimer::OnTimerLocked() {
if (timer_handle_.has_value()) {
timer_handle_.reset();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] idle timer %p: timer fired",
policy_.get(), this);
}
policy_->CleanupSubchannels();
}
}
//
// XdsOverrideHostLb
//
@ -536,17 +680,16 @@ void XdsOverrideHostLb::ShutdownLocked() {
void XdsOverrideHostLb::ResetState() {
{
// Drop subchannel refs after releasing the lock to avoid deadlock.
std::vector<SubchannelEntry::SubchannelPtr> subchannel_refs_to_drop;
std::vector<RefCountedPtr<SubchannelWrapper>> subchannel_refs_to_drop;
MutexLock lock(&mu_);
subchannel_refs_to_drop.reserve(subchannel_map_.size());
for (auto& p : subchannel_map_) {
auto subchannel = p.second->TakeSubchannelRef();
if (subchannel != nullptr) {
subchannel_refs_to_drop.push_back(std::move(subchannel));
}
p.second->UnsetSubchannel(&subchannel_refs_to_drop);
}
subchannel_map_.clear();
}
// Cancel timer, if any.
idle_timer_.reset();
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
if (child_policy_ != nullptr) {
@ -579,6 +722,13 @@ void XdsOverrideHostLb::ResetBackoffLocked() {
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
}
XdsHealthStatus GetEndpointHealthStatus(const EndpointAddresses& endpoint) {
return XdsHealthStatus(static_cast<XdsHealthStatus::HealthStatus>(
endpoint.args()
.GetInt(GRPC_ARG_XDS_HEALTH_STATUS)
.value_or(XdsHealthStatus::HealthStatus::kUnknown)));
}
// Wraps the endpoint iterator and filters out endpoints in state DRAINING.
class ChildEndpointIterator : public EndpointAddressesIterator {
public:
@ -634,10 +784,15 @@ absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) {
ReportTransientFailure(status);
return status;
}
args_ = std::move(args.args);
override_host_status_set_ = it->second->cluster->override_host_statuses;
connection_idle_timeout_ = it->second->cluster->connection_idle_timeout;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] override host status set: %s",
this, override_host_status_set_.ToString().c_str());
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] override host status set: %s "
"connection idle timeout: %s",
this, override_host_status_set_.ToString().c_str(),
connection_idle_timeout_.ToString().c_str());
}
// Update address map and wrap endpoint iterator for child policy.
if (args.addresses.ok()) {
@ -659,7 +814,7 @@ absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) {
update_args.addresses = std::move(args.addresses);
update_args.resolution_note = std::move(args.resolution_note);
update_args.config = new_config->child_config();
update_args.args = std::move(args.args);
update_args.args = args_;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] Updating child policy handler %p", this,
@ -756,9 +911,12 @@ void XdsOverrideHostLb::UpdateAddressMap(
}
});
// Now grab the lock and update subchannel_map_ from addresses_for_map.
const Timestamp now = Timestamp::Now();
const Timestamp idle_threshold = now - connection_idle_timeout_;
Duration next_time = connection_idle_timeout_;
{
// Drop subchannel refs after releasing the lock to avoid deadlock.
std::vector<SubchannelEntry::SubchannelPtr> subchannel_refs_to_drop;
std::vector<RefCountedPtr<SubchannelWrapper>> subchannel_refs_to_drop;
MutexLock lock(&mu_);
for (auto it = subchannel_map_.begin(); it != subchannel_map_.end();) {
if (addresses_for_map.find(it->first) == addresses_for_map.end()) {
@ -766,10 +924,7 @@ void XdsOverrideHostLb::UpdateAddressMap(
gpr_log(GPR_INFO, "[xds_override_host_lb %p] removing map key %s",
this, it->first.c_str());
}
auto subchannel = it->second->TakeSubchannelRef();
if (subchannel != nullptr) {
subchannel_refs_to_drop.push_back(std::move(subchannel));
}
it->second->UnsetSubchannel(&subchannel_refs_to_drop);
it = subchannel_map_.erase(it);
} else {
++it;
@ -784,32 +939,30 @@ void XdsOverrideHostLb::UpdateAddressMap(
gpr_log(GPR_INFO, "[xds_override_host_lb %p] adding map key %s", this,
address.c_str());
}
it = subchannel_map_
.emplace(address, MakeRefCounted<SubchannelEntry>(
address_info.eds_health_status))
it = subchannel_map_.emplace(address, MakeRefCounted<SubchannelEntry>())
.first;
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] setting EDS health status for "
"%s to %s",
this, address.c_str(),
address_info.eds_health_status.ToString());
}
auto subchannel_ref =
it->second->SetEdsHealthStatus(address_info.eds_health_status);
if (subchannel_ref != nullptr) {
subchannel_refs_to_drop.push_back(std::move(subchannel_ref));
}
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] setting address list for %s to %s",
this, address.c_str(), address_info.address_list.c_str());
"[xds_override_host_lb %p] map key %s: setting "
"eds_health_status=%s address_list=%s",
this, address.c_str(),
address_info.eds_health_status.ToString(),
address_info.address_list.c_str());
}
it->second->set_eds_health_status(address_info.eds_health_status);
it->second->set_address_list(std::move(address_info.address_list));
// Check the entry's last_used_time to determine the next time at
// which the timer needs to run.
if (it->second->last_used_time() > idle_threshold) {
const Duration next_time_for_entry =
it->second->last_used_time() + connection_idle_timeout_ - now;
next_time = std::min(next_time, next_time_for_entry);
}
}
}
idle_timer_ =
MakeOrphanable<IdleTimer>(RefAsSubclass<XdsOverrideHostLb>(), next_time);
}
RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
@ -820,16 +973,84 @@ XdsOverrideHostLb::AdoptSubchannel(
std::move(subchannel), RefAsSubclass<XdsOverrideHostLb>());
auto key = grpc_sockaddr_to_string(&address, /*normalize=*/false);
if (key.ok()) {
// Drop ref to previously owned subchannel (if any) after releasing
// the lock.
RefCountedPtr<SubchannelWrapper> subchannel_ref_to_drop;
MutexLock lock(&mu_);
auto it = subchannel_map_.find(*key);
if (it != subchannel_map_.end()) {
wrapper->set_subchannel_entry(it->second);
it->second->SetSubchannel(wrapper.get());
subchannel_ref_to_drop = it->second->SetUnownedSubchannel(wrapper.get());
}
}
return wrapper;
}
void XdsOverrideHostLb::CreateSubchannelForAddress(absl::string_view address) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] creating owned subchannel for %s", this,
std::string(address).c_str());
}
auto addr = StringToSockaddr(address);
GPR_ASSERT(addr.ok());
// Note: We don't currently have any cases where per_address_args need to
// be passed through. If we encounter any such cases in the future, we
// will need to change this to store those attributes from the resolver
// update in the map entry.
auto subchannel = channel_control_helper()->CreateSubchannel(
*addr, /*per_address_args=*/ChannelArgs(), args_);
auto wrapper = MakeRefCounted<SubchannelWrapper>(
std::move(subchannel), RefAsSubclass<XdsOverrideHostLb>());
{
MutexLock lock(&mu_);
auto it = subchannel_map_.find(address);
// This can happen if the map entry was removed between the time that
// the picker requested the subchannel creation and the time that we got
// here. In that case, we can just make it a no-op, since the update
// that removed the entry will have generated a new picker already.
if (it == subchannel_map_.end()) return;
// This can happen if the picker requests subchannel creation for
// the same address multiple times.
if (it->second->HasOwnedSubchannel()) return;
wrapper->set_subchannel_entry(it->second);
it->second->SetOwnedSubchannel(std::move(wrapper));
}
MaybeUpdatePickerLocked();
}
void XdsOverrideHostLb::CleanupSubchannels() {
const Timestamp now = Timestamp::Now();
const Timestamp idle_threshold = now - connection_idle_timeout_;
Duration next_time = connection_idle_timeout_;
std::vector<RefCountedPtr<SubchannelWrapper>> subchannel_refs_to_drop;
{
MutexLock lock(&mu_);
if (subchannel_map_.empty()) return;
for (const auto& p : subchannel_map_) {
if (p.second->last_used_time() <= idle_threshold) {
auto subchannel = p.second->TakeOwnedSubchannel();
if (subchannel != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] dropping subchannel for %s",
this, p.first.c_str());
}
subchannel_refs_to_drop.push_back(std::move(subchannel));
}
} else {
// Not dropping the subchannel. Check the entry's last_used_time to
// determine the next time at which the timer needs to run.
const Duration next_time_for_entry =
p.second->last_used_time() + connection_idle_timeout_ - now;
next_time = std::min(next_time, next_time_for_entry);
}
}
}
idle_timer_ =
MakeOrphanable<IdleTimer>(RefAsSubclass<XdsOverrideHostLb>(), next_time);
}
//
// XdsOverrideHostLb::Helper
//
@ -890,18 +1111,29 @@ void XdsOverrideHostLb::SubchannelWrapper::CancelConnectivityStateWatch(
}
void XdsOverrideHostLb::SubchannelWrapper::Orphan() {
if (subchannel_entry_ != nullptr) {
MutexLock lock(&policy()->mu_);
subchannel_entry_->UnsetSubchannel(this);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] subchannel wrapper %p orphaned",
policy_.get(), this);
}
if (!IsWorkSerializerDispatchEnabled()) {
wrapped_subchannel()->CancelConnectivityStateWatch(watcher_);
if (subchannel_entry_ != nullptr) {
MutexLock lock(&policy()->mu_);
subchannel_entry_->OnSubchannelWrapperOrphan(
this, policy()->connection_idle_timeout_);
}
return;
}
policy()->work_serializer()->Run(
[self = WeakRefAsSubclass<SubchannelWrapper>()]() {
self->wrapped_subchannel()->CancelConnectivityStateWatch(
self->watcher_);
if (self->subchannel_entry_ != nullptr) {
MutexLock lock(&self->policy()->mu_);
self->subchannel_entry_->OnSubchannelWrapperOrphan(
self.get(), self->policy()->connection_idle_timeout_);
}
},
DEBUG_LOCATION);
}
@ -911,10 +1143,11 @@ void XdsOverrideHostLb::SubchannelWrapper::UpdateConnectivityState(
bool update_picker = false;
if (subchannel_entry_ != nullptr) {
MutexLock lock(&policy()->mu_);
subchannel_entry_->set_connectivity_state(state);
update_picker = subchannel_entry_->GetSubchannel() == this &&
subchannel_entry_->eds_health_status().status() ==
XdsHealthStatus::kDraining;
if (subchannel_entry_->connectivity_state() != state) {
subchannel_entry_->set_connectivity_state(state);
update_picker = subchannel_entry_->HasOwnedSubchannel() &&
subchannel_entry_->GetSubchannel() == this;
}
}
// Sending connectivity state notifications to the watchers may cause the set
// of watchers to change, so we can't be iterating over the set of watchers
@ -932,6 +1165,87 @@ void XdsOverrideHostLb::SubchannelWrapper::UpdateConnectivityState(
if (update_picker) policy()->MaybeUpdatePickerLocked();
}
//
// XdsOverrideHostLb::SubchannelEntry
//
RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
XdsOverrideHostLb::SubchannelEntry::SetUnownedSubchannel(
SubchannelWrapper* subchannel)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
auto owned_subchannel = TakeOwnedSubchannel();
subchannel_ = subchannel;
return owned_subchannel;
}
XdsOverrideHostLb::SubchannelWrapper*
XdsOverrideHostLb::SubchannelEntry::GetSubchannel() const
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
return Match(
subchannel_, [](SubchannelWrapper* subchannel) { return subchannel; },
[](const RefCountedPtr<SubchannelWrapper>& subchannel) {
return subchannel.get();
});
}
RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
XdsOverrideHostLb::SubchannelEntry::GetSubchannelRef() const
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
auto* sc = GetSubchannel();
if (sc == nullptr) return nullptr;
return sc->RefIfNonZero().TakeAsSubclass<SubchannelWrapper>();
}
RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
XdsOverrideHostLb::SubchannelEntry::TakeOwnedSubchannel()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
return MatchMutable(
&subchannel_,
[](SubchannelWrapper**) -> RefCountedPtr<SubchannelWrapper> {
return nullptr;
},
[](RefCountedPtr<SubchannelWrapper>* subchannel) {
return std::move(*subchannel);
});
}
void XdsOverrideHostLb::SubchannelEntry::UnsetSubchannel(
std::vector<RefCountedPtr<SubchannelWrapper>>* owned_subchannels)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
auto subchannel = TakeOwnedSubchannel();
if (subchannel != nullptr) {
owned_subchannels->push_back(std::move(subchannel));
}
subchannel_ = nullptr;
}
void XdsOverrideHostLb::SubchannelEntry::OnSubchannelWrapperOrphan(
SubchannelWrapper* wrapper, Duration connection_idle_timeout)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
auto* subchannel = GetSubchannel();
if (subchannel != wrapper) return;
if (last_used_time_ < (Timestamp::Now() - connection_idle_timeout)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb] removing unowned subchannel wrapper %p",
subchannel);
}
subchannel_ = nullptr;
} else {
// The subchannel is being released by the child policy, but it
// is still within its idle timeout, so we make a new copy of
// the wrapper with the same underlying subchannel, and we hold
// our own ref to it.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb] subchannel wrapper %p: cloning "
"to gain ownership",
subchannel);
}
subchannel_ = wrapper->Clone();
}
}
//
// factory
//

@ -128,6 +128,10 @@ class ConnectivityStateTracker {
// Not thread safe; access must be serialized with an external lock.
absl::Status status() const { return status_; }
// Returns the number of watchers.
// Not thread safe; access must be serialized with an external lock.
size_t NumWatchers() const { return watchers_.size(); }
private:
const char* name_;
std::atomic<grpc_connectivity_state> state_{grpc_connectivity_state()};

@ -385,7 +385,20 @@ class LoadBalancingPolicyTest : public ::testing::Test {
}
}
std::shared_ptr<WorkSerializer> work_serializer() {
size_t NumWatchers() const {
size_t num_watchers;
absl::Notification notification;
work_serializer()->Run(
[&]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*test_->work_serializer_) {
num_watchers = state_tracker_.NumWatchers();
notification.Notify();
},
DEBUG_LOCATION);
notification.WaitForNotification();
return num_watchers;
}
std::shared_ptr<WorkSerializer> work_serializer() const {
return test_->work_serializer_;
}
@ -457,7 +470,8 @@ class LoadBalancingPolicyTest : public ::testing::Test {
<< location.file() << ":" << location.line();
if (update == nullptr) return absl::nullopt;
StateUpdate result = std::move(*update);
gpr_log(GPR_INFO, "got next state update: %s", result.ToString().c_str());
gpr_log(GPR_INFO, "dequeued next state update: %s",
result.ToString().c_str());
queue_.pop_front();
return std::move(result);
}
@ -563,7 +577,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
StateUpdate update{
state, status,
MakeRefCounted<PickerWrapper>(test_, std::move(picker))};
gpr_log(GPR_INFO, "state update from LB policy: %s",
gpr_log(GPR_INFO, "enqueuing state update from LB policy: %s",
update.ToString().c_str());
queue_.push_back(std::move(update));
}
@ -1414,14 +1428,16 @@ class LoadBalancingPolicyTest : public ::testing::Test {
void SetExpectedTimerDuration(
absl::optional<grpc_event_engine::experimental::EventEngine::Duration>
duration) {
duration,
SourceLocation location = SourceLocation()) {
if (duration.has_value()) {
fuzzing_ee_->SetRunAfterDurationCallback(
[expected = *duration](
[expected = *duration, location = location](
grpc_event_engine::experimental::EventEngine::Duration duration) {
EXPECT_EQ(duration, expected)
<< "Expected: " << expected.count()
<< "ns\nActual: " << duration.count() << "ns";
<< "ns\n Actual: " << duration.count() << "ns\n"
<< location.file() << ":" << location.line();
});
} else {
fuzzing_ee_->SetRunAfterDurationCallback(nullptr);

@ -61,12 +61,16 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
static RefCountedPtr<const XdsDependencyManager::XdsConfig> MakeXdsConfig(
absl::Span<const absl::string_view> override_host_statuses = {"UNKNOWN",
"HEALTHY"},
absl::optional<Duration> connection_idle_timeout = absl::nullopt,
std::string cluster_name = "cluster_name") {
auto cluster_resource = std::make_shared<XdsClusterResource>();
for (const absl::string_view host_status : override_host_statuses) {
cluster_resource->override_host_statuses.Add(
XdsHealthStatus::FromString(host_status).value());
}
if (connection_idle_timeout.has_value()) {
cluster_resource->connection_idle_timeout = *connection_idle_timeout;
}
auto xds_config = MakeRefCounted<XdsDependencyManager::XdsConfig>();
xds_config->clusters[cluster_name].emplace(
cluster_name, std::move(cluster_resource), nullptr, "");
@ -77,6 +81,7 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
absl::Span<const EndpointAddresses> endpoints,
absl::Span<const absl::string_view> override_host_statuses = {"UNKNOWN",
"HEALTHY"},
absl::optional<Duration> connection_idle_timeout = absl::nullopt,
std::string cluster_name = "cluster_name",
std::string child_policy = "round_robin") {
auto config = MakeConfig(Json::FromArray({Json::FromObject(
@ -86,7 +91,8 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
{"childPolicy",
Json::FromArray({Json::FromObject(
{{child_policy, Json::FromObject({})}})})}})}})}));
auto xds_config = MakeXdsConfig(override_host_statuses, cluster_name);
auto xds_config = MakeXdsConfig(override_host_statuses,
connection_idle_timeout, cluster_name);
return ApplyUpdate(
BuildUpdate(endpoints, std::move(config),
ChannelArgs().SetObject(std::move(xds_config))),
@ -97,12 +103,13 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
absl::Span<const absl::string_view> addresses,
absl::Span<const absl::string_view> override_host_statuses = {"UNKNOWN",
"HEALTHY"},
absl::optional<Duration> connection_idle_timeout = absl::nullopt,
std::string cluster_name = "cluster_name",
std::string child_policy = "round_robin") {
return UpdateXdsOverrideHostPolicy(
MakeEndpointAddressesListFromAddressList(addresses),
override_host_statuses, std::move(cluster_name),
std::move(child_policy));
override_host_statuses, connection_idle_timeout,
std::move(cluster_name), std::move(child_policy));
}
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
@ -125,13 +132,15 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
XdsHealthStatus::HealthStatus>>
addresses_and_statuses,
absl::Span<const absl::string_view> override_host_status = {"UNKNOWN",
"HEALTHY"}) {
"HEALTHY"},
absl::optional<Duration> connection_idle_timeout = absl::nullopt) {
EndpointAddressesList endpoints;
for (auto address_and_status : addresses_and_statuses) {
endpoints.push_back(MakeAddressWithHealthStatus(
address_and_status.first, address_and_status.second));
}
EXPECT_EQ(UpdateXdsOverrideHostPolicy(endpoints, override_host_status),
EXPECT_EQ(UpdateXdsOverrideHostPolicy(endpoints, override_host_status,
connection_idle_timeout),
absl::OkStatus());
}
@ -184,8 +193,8 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
expected)
<< location.file() << ":" << location.line();
EXPECT_EQ(attribute->actual_address_list(), expected_addresses_str)
<< "Expected: " << attribute->actual_address_list() << "\n"
<< " Actual: " << expected_addresses_str << "\n"
<< " Actual: " << attribute->actual_address_list() << "\n"
<< "Expected: " << expected_addresses_str << "\n"
<< location.file() << ":" << location.line();
}
}
@ -205,8 +214,8 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
<< location.file() << ":" << location.line();
EXPECT_EQ(attribute->actual_address_list(),
absl::StripPrefix(*address, "ipv4:"))
<< "Expected: " << attribute->actual_address_list() << "\n"
<< " Actual: " << absl::StripPrefix(*address, "ipv4:") << "\n"
<< " Actual: " << attribute->actual_address_list() << "\n"
<< "Expected: " << absl::StripPrefix(*address, "ipv4:") << "\n"
<< location.file() << ":" << location.line();
actual_picks.push_back(std::move(*address));
}
@ -324,23 +333,29 @@ TEST_F(XdsOverrideHostTest,
TEST_F(XdsOverrideHostTest, DrainingState) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr);
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kDraining},
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
auto picker = ExpectState(GRPC_CHANNEL_READY);
auto picker = ExpectStartupWithRoundRobin(kAddresses);
ASSERT_NE(picker, nullptr);
// Do one override pick for endpoint 1, so that it will still be within
// the idle threshold and will therefore be retained when it moves to
// state DRAINING.
auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
// Now move endpoint 1 to state DRAINING.
ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::kDraining},
{kAddresses[2], XdsHealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
picker = ExpectState(GRPC_CHANNEL_READY);
// Make sure subchannels get orphaned in the WorkSerializer.
WaitForWorkSerializerToFlush();
// Picks without an override will round-robin over the two endpoints
// that are not in draining state.
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
// Picks with an override are able to select the draining endpoint.
auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
// Send the LB policy an update that removes the draining endpoint.
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}});
ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
{kAddresses[2], XdsHealthStatus::kHealthy}});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
// Gone!
@ -362,14 +377,15 @@ TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) {
// points to that hose, but the host should not be used if there is no
// override pointing to it.
gpr_log(GPR_INFO, "### sending update with DRAINING host");
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kDraining},
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::kDraining},
{kAddresses[2], XdsHealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
auto subchannel = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel, nullptr);
picker = ExpectState(GRPC_CHANNEL_READY);
// Make sure subchannels get orphaned in the WorkSerializer.
WaitForWorkSerializerToFlush();
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
// Now the connection to the draining host gets dropped.
@ -406,21 +422,26 @@ TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) {
TEST_F(XdsOverrideHostTest, DrainingToHealthy) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr);
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kDraining},
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
auto picker = ExpectState(GRPC_CHANNEL_READY);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
auto picker = ExpectStartupWithRoundRobin(kAddresses);
ASSERT_NE(picker, nullptr);
// Do one override pick for endpoint 1, so that it will still be within
// the idle threshold and will therefore be retained when it moves to
// state DRAINING.
auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::HealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::kDraining},
{kAddresses[2], XdsHealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
picker = ExpectState(GRPC_CHANNEL_READY);
// Make sure subchannels get orphaned in the WorkSerializer.
WaitForWorkSerializerToFlush();
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kHealthy},
{kAddresses[1], XdsHealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
picker = ExpectState(GRPC_CHANNEL_READY);
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
ExpectRoundRobinPicks(picker.get(), kAddresses);
@ -429,27 +450,32 @@ TEST_F(XdsOverrideHostTest, DrainingToHealthy) {
TEST_F(XdsOverrideHostTest, OverrideHostStatus) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
ASSERT_NE(ExpectStartupWithRoundRobin(kAddresses), nullptr);
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
auto picker = ExpectState(GRPC_CHANNEL_READY);
auto* address0_attribute = MakeOverrideHostAttribute(kAddresses[0]);
auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
auto* address2_attribute = MakeOverrideHostAttribute(kAddresses[2]);
auto picker = ExpectStartupWithRoundRobin(kAddresses);
ASSERT_NE(picker, nullptr);
// Do one override pick for endpoint 2, so that it will still be within
// the idle threshold and will therefore be retained when it moves to
// state DRAINING.
ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::kDraining}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
// Make sure subchannels get orphaned in the WorkSerializer.
WaitForWorkSerializerToFlush();
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
auto* address0_attribute = MakeOverrideHostAttribute(kAddresses[0]);
ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
auto* address2_attribute = MakeOverrideHostAttribute(kAddresses[2]);
ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
// UNKNOWN excluded: overrides for first endpoint are not honored.
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}},
{"HEALTHY", "DRAINING"});
ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::kDraining}},
{"HEALTHY", "DRAINING"});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
@ -458,11 +484,10 @@ TEST_F(XdsOverrideHostTest, OverrideHostStatus) {
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
// HEALTHY excluded: overrides for second endpoint are not honored.
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}},
{"UNKNOWN", "DRAINING"});
ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::kDraining}},
{"UNKNOWN", "DRAINING"});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
@ -471,11 +496,10 @@ TEST_F(XdsOverrideHostTest, OverrideHostStatus) {
{kAddresses[0], kAddresses[1]});
ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
// DRAINING excluded: overrides for third endpoint are not honored.
ApplyUpdateWithHealthStatuses(
{{kAddresses[0], XdsHealthStatus::HealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::HealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::HealthStatus::kDraining}},
{"UNKNOWN", "HEALTHY"});
ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::kHealthy},
{kAddresses[2], XdsHealthStatus::kDraining}},
{"UNKNOWN", "HEALTHY"});
picker = ExpectState(GRPC_CHANNEL_READY);
ASSERT_NE(picker, nullptr);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
@ -522,6 +546,171 @@ TEST_F(XdsOverrideHostTest, MultipleAddressesPerEndpoint) {
{kEndpoint1Addresses[1], kEndpoint1Addresses[0]});
}
TEST_F(XdsOverrideHostTest, ChildPolicyNeverCreatedSubchannel) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::kDraining},
{kAddresses[2], XdsHealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
// The draining endpoint is not passed down to the child policy.
// Picks without an override will round-robin over the two endpoints
// that are not in draining state.
auto picker = ExpectRoundRobinStartup({kAddresses[0], kAddresses[2]});
// Subchannels should exist for the non-draining endpoints only.
auto* subchannel = FindSubchannel(kAddresses[0]);
ASSERT_NE(subchannel, nullptr);
EXPECT_GE(subchannel->NumWatchers(), 1);
auto* subchannel2 = FindSubchannel(kAddresses[1]);
EXPECT_EQ(subchannel2, nullptr);
auto* subchannel3 = FindSubchannel(kAddresses[2]);
ASSERT_NE(subchannel3, nullptr);
EXPECT_GE(subchannel3->NumWatchers(), 1);
// A pick with an override pointing to the draining endpoint should
// queue the pick and trigger subchannel creation.
auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
ExpectPickQueued(picker.get(), {address1_attribute});
WaitForWorkSerializerToFlush();
subchannel2 = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel2, nullptr);
EXPECT_EQ(subchannel2->NumWatchers(), 1);
// Subchannel creation will trigger returning a new picker.
// Picks without an override should continue to use only the
// non-draining endpoints.
picker = ExpectState(GRPC_CHANNEL_READY);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
// Trying the pick again with the new picker will trigger a connection
// attempt on the new subchannel.
ExpectPickQueued(picker.get(), {address1_attribute});
WaitForWorkSerializerToFlush();
EXPECT_TRUE(subchannel2->ConnectionRequested());
subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// Subchannel state change will trigger returning a new picker.
// Picks without an override should continue to use only the
// non-draining endpoints.
picker = ExpectState(GRPC_CHANNEL_READY);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
// Trying the pick with override again should queue, because the
// connection attempt is still pending.
ExpectPickQueued(picker.get(), {address1_attribute});
// Connection attempt succeeds.
subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
// Subchannel state change will trigger returning a new picker.
// Picks without an override should continue to use only the
// non-draining endpoints.
picker = ExpectState(GRPC_CHANNEL_READY);
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
// Now the pick with override should complete.
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
}
TEST_F(XdsOverrideHostTest,
ChildPolicyUnrefsSubchannelNotUsedWithinIdleThreshold) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto picker = ExpectStartupWithRoundRobin(kAddresses);
ASSERT_NE(picker, nullptr);
// Now move endpoint 1 to state DRAINING.
ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::kDraining},
{kAddresses[2], XdsHealthStatus::kHealthy}},
{"UNKNOWN", "HEALTHY", "DRAINING"});
picker = ExpectState(GRPC_CHANNEL_READY);
// Make sure subchannels get orphaned in the WorkSerializer.
WaitForWorkSerializerToFlush();
// Picks without an override will round-robin over the two endpoints
// that are not in draining state.
ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
// Child policy should drop its ref to the draining endpoint, and
// xds_override_host should not take ownership, since the entry never
// had an override pick.
auto* subchannel0 = FindSubchannel(kAddresses[0]);
ASSERT_NE(subchannel0, nullptr);
EXPECT_GE(subchannel0->NumWatchers(), 1);
auto* subchannel1 = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel1, nullptr);
EXPECT_EQ(subchannel1->NumWatchers(), 0);
auto* subchannel2 = FindSubchannel(kAddresses[2]);
ASSERT_NE(subchannel2, nullptr);
EXPECT_GE(subchannel2->NumWatchers(), 1);
}
TEST_F(XdsOverrideHostTest, IdleTimer) {
std::vector<grpc_event_engine::experimental::EventEngine::Duration>
timer_durations;
fuzzing_ee_->SetRunAfterDurationCallback(
[&timer_durations](
grpc_event_engine::experimental::EventEngine::Duration duration) {
timer_durations.push_back(duration);
});
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
gpr_log(GPR_INFO, "### sending initial update");
EXPECT_EQ(UpdateXdsOverrideHostPolicy(kAddresses, {"UNKNOWN", "HEALTHY"},
Duration::Minutes(1)),
absl::OkStatus());
// Initial update should have caused the timer to be set for the idle
// timeout.
EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Minutes(1)));
timer_durations.clear();
auto picker = ExpectRoundRobinStartup(kAddresses);
ASSERT_NE(picker, nullptr);
// Do an override pick for endpoints 1 and 2, so that they will still be
// within the idle threshold and will therefore be retained when they move
// to state DRAINING.
auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
auto* address2_attribute = MakeOverrideHostAttribute(kAddresses[2]);
ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
// Increment time by 5 seconds and send an update that moves endpoints 1
// and 2 to state DRAINING.
gpr_log(GPR_INFO, "### moving endpoints 1 and 2 to state DRAINING");
IncrementTimeBy(Duration::Seconds(5));
ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
{kAddresses[1], XdsHealthStatus::kDraining},
{kAddresses[2], XdsHealthStatus::kDraining}},
{"UNKNOWN", "HEALTHY", "DRAINING"},
Duration::Minutes(1));
// The update should cause the timer to be reset for the next
// expiration time.
EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Seconds(55)));
timer_durations.clear();
picker = ExpectState(GRPC_CHANNEL_READY);
// Make sure subchannels get orphaned in the WorkSerializer.
WaitForWorkSerializerToFlush();
// Picks without an override will use only the endpoint that is not in
// draining state.
ExpectRoundRobinPicks(picker.get(), {kAddresses[0]});
// Picks with an override are able to select the draining endpoints.
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
// Both subchannels are owned by the xds_override_host policy.
auto* subchannel1 = FindSubchannel(kAddresses[1]);
ASSERT_NE(subchannel1, nullptr);
EXPECT_EQ(subchannel1->NumWatchers(), 1);
auto* subchannel2 = FindSubchannel(kAddresses[2]);
ASSERT_NE(subchannel2, nullptr);
EXPECT_EQ(subchannel2->NumWatchers(), 1);
// Trigger the timer. Both subchannels have gotten an override pick more
// recently than the timer was scheduled, so neither one will be unreffed.
IncrementTimeBy(Duration::Seconds(55));
EXPECT_EQ(subchannel1->NumWatchers(), 1);
EXPECT_EQ(subchannel2->NumWatchers(), 1);
// The timer will be reset for 5 seconds.
EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Seconds(5)));
timer_durations.clear();
// Send another override pick for endpoint 1.
ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
// Trigger the timer again. This time, it should unref endpoint 2 but
// keep endpoint 1.
IncrementTimeBy(Duration::Seconds(5));
EXPECT_EQ(subchannel1->NumWatchers(), 1);
EXPECT_EQ(subchannel2->NumWatchers(), 0);
// The timer should now be set for 55 seconds, which is how long it
// will be until endpoint 1 should be unreffed.
EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Seconds(55)));
}
} // namespace
} // namespace testing
} // namespace grpc_core

Loading…
Cancel
Save