[outlier detection] support multiple addresses per endpoint (#34526)

pull/34594/head
Mark D. Roth 1 year ago committed by GitHub
parent 34683ace5b
commit 36b70504e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 34
      CMakeLists.txt
  2. 11
      build_autogenerated.yaml
  3. 1
      src/core/BUILD
  4. 286
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
  5. 19
      src/core/lib/resolver/endpoint_addresses.cc
  6. 10
      src/core/lib/resolver/endpoint_addresses.h
  7. 187
      test/core/client_channel/lb_policy/outlier_detection_test.cc
  8. 34
      test/core/resolver/BUILD
  9. 105
      test/core/resolver/endpoint_addresses_test.cc
  10. 24
      tools/run_tests/generated/tests.json

34
CMakeLists.txt generated

@ -989,6 +989,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx end2end_binder_transport_test)
endif()
add_dependencies(buildtests_cxx end2end_test)
add_dependencies(buildtests_cxx endpoint_addresses_test)
add_dependencies(buildtests_cxx endpoint_binder_pool_test)
add_dependencies(buildtests_cxx endpoint_config_test)
add_dependencies(buildtests_cxx endpoint_pair_test)
@ -10589,6 +10590,39 @@ target_link_libraries(end2end_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(endpoint_addresses_test
test/core/resolver/endpoint_addresses_test.cc
)
target_compile_features(endpoint_addresses_test PUBLIC cxx_std_14)
target_include_directories(endpoint_addresses_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(endpoint_addresses_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)

@ -7661,6 +7661,17 @@ targets:
deps:
- grpc++_test
- grpc++_test_util
- name: endpoint_addresses_test
gtest: true
build: test
language: c++
headers: []
src:
- test/core/resolver/endpoint_addresses_test.cc
deps:
- gtest
- grpc_test_util
uses_polling: false
- name: endpoint_binder_pool_test
gtest: true
build: test

@ -5050,6 +5050,7 @@ grpc_cc_library(
"ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc",
],
external_deps = [
"absl/base:core_headers",
"absl/random",
"absl/status",
"absl/status:statusor",

@ -32,6 +32,7 @@
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/random/random.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
@ -54,6 +55,7 @@
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/gprpp/work_serializer.h"
@ -123,6 +125,8 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
private:
class SubchannelState;
class EndpointState;
class SubchannelWrapper : public DelegatingSubchannel {
public:
SubchannelWrapper(std::shared_ptr<WorkSerializer> work_serializer,
@ -133,7 +137,7 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
subchannel_state_(std::move(subchannel_state)) {
if (subchannel_state_ != nullptr) {
subchannel_state_->AddSubchannel(this);
if (subchannel_state_->ejection_time().has_value()) {
if (subchannel_state_->endpoint_state()->ejection_time().has_value()) {
ejected_ = true;
}
}
@ -162,8 +166,11 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher) override;
RefCountedPtr<SubchannelState> subchannel_state() const {
return subchannel_state_;
void CancelDataWatcher(DataWatcherInterface* watcher) override;
RefCountedPtr<EndpointState> endpoint_state() const {
if (subchannel_state_ == nullptr) return nullptr;
return subchannel_state_->endpoint_state();
}
private:
@ -229,10 +236,55 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
class SubchannelState : public RefCounted<SubchannelState> {
public:
struct Bucket {
std::atomic<uint64_t> successes;
std::atomic<uint64_t> failures;
};
void AddSubchannel(SubchannelWrapper* wrapper) {
subchannels_.insert(wrapper);
}
void RemoveSubchannel(SubchannelWrapper* wrapper) {
subchannels_.erase(wrapper);
}
RefCountedPtr<EndpointState> endpoint_state() {
MutexLock lock(&mu_);
return endpoint_state_;
}
void set_endpoint_state(RefCountedPtr<EndpointState> endpoint_state) {
MutexLock lock(&mu_);
endpoint_state_ = std::move(endpoint_state);
}
void Eject() {
// Ejecting the subchannel may cause the child policy to unref the
// subchannel, so we need to be prepared for the set to be modified
// while we are iterating.
for (auto it = subchannels_.begin(); it != subchannels_.end();) {
SubchannelWrapper* subchannel = *it;
++it;
subchannel->Eject();
}
}
void Uneject() {
for (auto& subchannel : subchannels_) {
subchannel->Uneject();
}
}
private:
std::set<SubchannelWrapper*> subchannels_;
Mutex mu_;
RefCountedPtr<EndpointState> endpoint_state_ ABSL_GUARDED_BY(mu_);
};
class EndpointState : public RefCounted<EndpointState> {
public:
explicit EndpointState(std::set<SubchannelState*> subchannels)
: subchannels_(std::move(subchannels)) {
for (SubchannelState* subchannel : subchannels_) {
subchannel->set_endpoint_state(Ref());
}
}
void RotateBucket() {
backup_bucket_->successes = 0;
@ -254,14 +306,6 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
{success_rate, backup_bucket_->successes + backup_bucket_->failures}};
}
void AddSubchannel(SubchannelWrapper* wrapper) {
subchannels_.insert(wrapper);
}
void RemoveSubchannel(SubchannelWrapper* wrapper) {
subchannels_.erase(wrapper);
}
void AddSuccessCount() { active_bucket_.load()->successes.fetch_add(1); }
void AddFailureCount() { active_bucket_.load()->failures.fetch_add(1); }
@ -271,20 +315,15 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
void Eject(const Timestamp& time) {
ejection_time_ = time;
++multiplier_;
// Ejecting the subchannel may cause the child policy to unref the
// subchannel, so we need to be prepared for the set to be modified
// while we are iterating.
for (auto it = subchannels_.begin(); it != subchannels_.end();) {
SubchannelWrapper* subchannel = *it;
++it;
subchannel->Eject();
for (SubchannelState* subchannel_state : subchannels_) {
subchannel_state->Eject();
}
}
void Uneject() {
ejection_time_.reset();
for (auto& subchannel : subchannels_) {
subchannel->Uneject();
for (SubchannelState* subchannel_state : subchannels_) {
subchannel_state->Uneject();
}
}
@ -315,6 +354,13 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
}
private:
struct Bucket {
std::atomic<uint64_t> successes;
std::atomic<uint64_t> failures;
};
const std::set<SubchannelState*> subchannels_;
std::unique_ptr<Bucket> current_bucket_ = std::make_unique<Bucket>();
std::unique_ptr<Bucket> backup_bucket_ = std::make_unique<Bucket>();
// The bucket used to update call counts.
@ -322,7 +368,6 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
std::atomic<Bucket*> active_bucket_{current_bucket_.get()};
uint32_t multiplier_ = 0;
absl::optional<Timestamp> ejection_time_;
std::set<SubchannelWrapper*> subchannels_;
};
// A picker that wraps the picker from the child to perform outlier detection.
@ -373,10 +418,6 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
~OutlierDetectionLb() override;
// Returns the address map key for an address, or the empty string if
// the address should be ignored.
static std::string MakeKeyForAddress(const grpc_resolved_address& address);
void ShutdownLocked() override;
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
@ -396,7 +437,11 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
absl::Status status_;
RefCountedPtr<SubchannelPicker> picker_;
std::map<std::string, RefCountedPtr<SubchannelState>> subchannel_state_map_;
std::map<EndpointAddressSet, RefCountedPtr<EndpointState>>
endpoint_state_map_;
std::map<grpc_resolved_address, RefCountedPtr<SubchannelState>,
ResolvedAddressLessThan>
subchannel_state_map_;
OrphanablePtr<EjectionTimer> ejection_timer_;
};
@ -427,6 +472,13 @@ void OutlierDetectionLb::SubchannelWrapper::AddDataWatcher(
DelegatingSubchannel::AddDataWatcher(std::move(watcher));
}
void OutlierDetectionLb::SubchannelWrapper::CancelDataWatcher(
DataWatcherInterface* watcher) {
auto* w = static_cast<InternalSubchannelDataWatcherInterface*>(watcher);
if (w->type() == HealthProducer::Type()) watcher_wrapper_ = nullptr;
DelegatingSubchannel::CancelDataWatcher(watcher);
}
//
// OutlierDetectionLb::Picker::SubchannelCallTracker
//
@ -437,13 +489,13 @@ class OutlierDetectionLb::Picker::SubchannelCallTracker
SubchannelCallTracker(
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
original_subchannel_call_tracker,
RefCountedPtr<SubchannelState> subchannel_state)
RefCountedPtr<EndpointState> endpoint_state)
: original_subchannel_call_tracker_(
std::move(original_subchannel_call_tracker)),
subchannel_state_(std::move(subchannel_state)) {}
endpoint_state_(std::move(endpoint_state)) {}
~SubchannelCallTracker() override {
subchannel_state_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
endpoint_state_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
}
void Start() override {
@ -461,19 +513,17 @@ class OutlierDetectionLb::Picker::SubchannelCallTracker
}
// Record call completion based on status for outlier detection
// calculations.
if (subchannel_state_ != nullptr) {
if (args.status.ok()) {
subchannel_state_->AddSuccessCount();
} else {
subchannel_state_->AddFailureCount();
}
if (args.status.ok()) {
endpoint_state_->AddSuccessCount();
} else {
endpoint_state_->AddFailureCount();
}
}
private:
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
original_subchannel_call_tracker_;
RefCountedPtr<SubchannelState> subchannel_state_;
RefCountedPtr<EndpointState> endpoint_state_;
};
//
@ -503,17 +553,20 @@ LoadBalancingPolicy::PickResult OutlierDetectionLb::Picker::Pick(
PickResult result = picker_->Pick(args);
auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
if (complete_pick != nullptr) {
// Unwrap subchannel to pass back up the stack.
auto* subchannel_wrapper =
static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
// Inject subchannel call tracker to record call completion as long as
// not both success_rate_ejection and failure_percentage_ejection are unset.
// either success_rate_ejection or failure_percentage_ejection is enabled.
if (counting_enabled_) {
complete_pick->subchannel_call_tracker =
std::make_unique<SubchannelCallTracker>(
std::move(complete_pick->subchannel_call_tracker),
subchannel_wrapper->subchannel_state());
auto endpoint_state = subchannel_wrapper->endpoint_state();
if (endpoint_state != nullptr) {
complete_pick->subchannel_call_tracker =
std::make_unique<SubchannelCallTracker>(
std::move(complete_pick->subchannel_call_tracker),
std::move(endpoint_state));
}
}
// Unwrap subchannel to pass back up the stack.
complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
}
return result;
@ -538,15 +591,6 @@ OutlierDetectionLb::~OutlierDetectionLb() {
}
}
std::string OutlierDetectionLb::MakeKeyForAddress(
const grpc_resolved_address& address) {
// Use only the address, not the attributes.
auto addr_str = grpc_sockaddr_to_string(&address, false);
// If address couldn't be stringified, ignore it.
if (!addr_str.ok()) return "";
return std::move(*addr_str);
}
void OutlierDetectionLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] shutting down", this);
@ -595,7 +639,7 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] starting timer", this);
}
ejection_timer_ = MakeOrphanable<EjectionTimer>(Ref(), Timestamp::Now());
for (const auto& p : subchannel_state_map_) {
for (const auto& p : endpoint_state_map_) {
p.second->RotateBucket(); // Reset call counters.
}
} else if (old_config->outlier_detection_config().interval !=
@ -612,47 +656,92 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
ejection_timer_ =
MakeOrphanable<EjectionTimer>(Ref(), ejection_timer_->StartTime());
}
// Update subchannel state map.
// Update subchannel and endpoint maps.
if (args.addresses.ok()) {
std::set<std::string> current_addresses;
std::set<EndpointAddressSet> current_endpoints;
std::set<grpc_resolved_address, ResolvedAddressLessThan> current_addresses;
for (const EndpointAddresses& endpoint : *args.addresses) {
std::string address_key = MakeKeyForAddress(endpoint.address());
if (address_key.empty()) continue;
auto& subchannel_state = subchannel_state_map_[address_key];
if (subchannel_state == nullptr) {
subchannel_state = MakeRefCounted<SubchannelState>();
EndpointAddressSet key(endpoint.addresses());
current_endpoints.emplace(key);
for (const grpc_resolved_address& address : endpoint.addresses()) {
current_addresses.emplace(address);
}
// Find the entry in the endpoint map.
auto it = endpoint_state_map_.find(key);
if (it == endpoint_state_map_.end()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] adding map entry for %s (%p)",
this, address_key.c_str(), subchannel_state.get());
"[outlier_detection_lb %p] adding endpoint entry for %s",
this, key.ToString().c_str());
}
// The endpoint is not present in the map, so we'll need to add it.
// Start by getting a pointer to the entry for each address in the
// subchannel map, creating the entry if needed.
std::set<SubchannelState*> subchannels;
for (const grpc_resolved_address& address : endpoint.addresses()) {
auto it2 = subchannel_state_map_.find(address);
if (it2 == subchannel_state_map_.end()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
std::string address_str = grpc_sockaddr_to_string(&address, false)
.value_or("<unknown>");
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] adding address entry for %s",
this, address_str.c_str());
}
it2 = subchannel_state_map_
.emplace(address, MakeRefCounted<SubchannelState>())
.first;
}
subchannels.insert(it2->second.get());
}
// Now create the endpoint.
endpoint_state_map_.emplace(
key, MakeRefCounted<EndpointState>(std::move(subchannels)));
} else if (!config_->CountingEnabled()) {
// If counting is not enabled, reset state.
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] counting disabled; disabling "
"ejection for %s (%p)",
this, address_key.c_str(), subchannel_state.get());
"ejection for %s",
this, key.ToString().c_str());
}
subchannel_state->DisableEjection();
it->second->DisableEjection();
}
current_addresses.emplace(address_key);
}
// Remove any entries we no longer need in the subchannel map.
for (auto it = subchannel_state_map_.begin();
it != subchannel_state_map_.end();) {
if (current_addresses.find(it->first) == current_addresses.end()) {
// remove each map entry for a subchannel address not in the updated
// address list.
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
std::string address_str =
grpc_sockaddr_to_string(&it->first, false).value_or("<unknown>");
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] removing map entry for %s (%p)",
this, it->first.c_str(), it->second.get());
"[outlier_detection_lb %p] removing subchannel map entry %s",
this, address_str.c_str());
}
// Don't hold a ref to the corresponding EndpointState object,
// because there could be subchannel wrappers keeping this alive
// for a while, and we don't need them to do any call tracking.
it->second->set_endpoint_state(nullptr);
it = subchannel_state_map_.erase(it);
} else {
++it;
}
}
// Remove any entries we no longer need in the endpoint map.
for (auto it = endpoint_state_map_.begin();
it != endpoint_state_map_.end();) {
if (current_endpoints.find(it->first) == current_endpoints.end()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] removing endpoint map entry %s",
this, it->first.ToString().c_str());
}
it = endpoint_state_map_.erase(it);
} else {
++it;
}
}
}
// Create child policy if needed.
if (child_policy_ == nullptr) {
@ -721,16 +810,17 @@ RefCountedPtr<SubchannelInterface> OutlierDetectionLb::Helper::CreateSubchannel(
const ChannelArgs& args) {
if (parent()->shutting_down_) return nullptr;
RefCountedPtr<SubchannelState> subchannel_state;
std::string key = MakeKeyForAddress(address);
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] creating subchannel, key %s",
parent(), key.c_str());
auto it = parent()->subchannel_state_map_.find(address);
if (it != parent()->subchannel_state_map_.end()) {
subchannel_state = it->second->Ref();
}
if (!key.empty()) {
auto it = parent()->subchannel_state_map_.find(key);
if (it != parent()->subchannel_state_map_.end()) {
subchannel_state = it->second->Ref();
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
std::string address_str =
grpc_sockaddr_to_string(&address, false).value_or("<unknown>");
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] creating subchannel for %s, "
"subchannel state %p",
parent(), address_str.c_str(), subchannel_state.get());
}
auto subchannel = MakeRefCounted<SubchannelWrapper>(
parent()->work_serializer(), subchannel_state,
@ -799,24 +889,24 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer running",
parent_.get());
}
std::map<SubchannelState*, double> success_rate_ejection_candidates;
std::map<SubchannelState*, double> failure_percentage_ejection_candidates;
std::map<EndpointState*, double> success_rate_ejection_candidates;
std::map<EndpointState*, double> failure_percentage_ejection_candidates;
size_t ejected_host_count = 0;
double success_rate_sum = 0;
auto time_now = Timestamp::Now();
auto& config = parent_->config_->outlier_detection_config();
for (auto& state : parent_->subchannel_state_map_) {
auto* subchannel_state = state.second.get();
for (auto& state : parent_->endpoint_state_map_) {
auto* endpoint_state = state.second.get();
// For each address, swap the call counter's buckets in that address's
// map entry.
subchannel_state->RotateBucket();
endpoint_state->RotateBucket();
// Gather data to run success rate algorithm or failure percentage
// algorithm.
if (subchannel_state->ejection_time().has_value()) {
if (endpoint_state->ejection_time().has_value()) {
++ejected_host_count;
}
absl::optional<std::pair<double, uint64_t>> host_success_rate_and_volume =
subchannel_state->GetSuccessRateAndVolume();
endpoint_state->GetSuccessRateAndVolume();
if (!host_success_rate_and_volume.has_value()) {
continue;
}
@ -824,14 +914,14 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
uint64_t request_volume = host_success_rate_and_volume->second;
if (config.success_rate_ejection.has_value()) {
if (request_volume >= config.success_rate_ejection->request_volume) {
success_rate_ejection_candidates[subchannel_state] = success_rate;
success_rate_ejection_candidates[endpoint_state] = success_rate;
success_rate_sum += success_rate;
}
}
if (config.failure_percentage_ejection.has_value()) {
if (request_volume >=
config.failure_percentage_ejection->request_volume) {
failure_percentage_ejection_candidates[subchannel_state] = success_rate;
failure_percentage_ejection_candidates[endpoint_state] = success_rate;
}
}
}
@ -883,7 +973,7 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
if (candidate.second < ejection_threshold) {
uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
double current_percent =
100.0 * ejected_host_count / parent_->subchannel_state_map_.size();
100.0 * ejected_host_count / parent_->endpoint_state_map_.size();
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] random_key=%d "
@ -931,7 +1021,7 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
config.failure_percentage_ejection->threshold) {
uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
double current_percent =
100.0 * ejected_host_count / parent_->subchannel_state_map_.size();
100.0 * ejected_host_count / parent_->endpoint_state_map_.size();
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] random_key=%d "
@ -961,13 +1051,13 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
// current time is after ejection_timestamp + min(base_ejection_time *
// multiplier, max(base_ejection_time, max_ejection_time)), un-eject the
// address.
for (auto& state : parent_->subchannel_state_map_) {
auto* subchannel_state = state.second.get();
const bool unejected = subchannel_state->MaybeUneject(
for (auto& state : parent_->endpoint_state_map_) {
auto* endpoint_state = state.second.get();
const bool unejected = endpoint_state->MaybeUneject(
config.base_ejection_time.millis(), config.max_ejection_time.millis());
if (unejected && GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] unejected address %s (%p)",
parent_.get(), state.first.c_str(), subchannel_state);
gpr_log(GPR_INFO, "[outlier_detection_lb %p] unejected endpoint %s (%p)",
parent_.get(), state.first.ToString().c_str(), endpoint_state);
}
}
parent_->ejection_timer_ =

@ -101,6 +101,13 @@ std::string EndpointAddresses::ToString() const {
return absl::StrJoin(parts, " ");
}
bool ResolvedAddressLessThan::operator()(
const grpc_resolved_address& addr1,
const grpc_resolved_address& addr2) const {
if (addr1.len < addr2.len) return true;
return memcmp(addr1.addr, addr2.addr, addr1.len) < 0;
}
bool EndpointAddressSet::operator==(const EndpointAddressSet& other) const {
if (addresses_.size() != other.addresses_.size()) return false;
auto other_it = other.addresses_.begin();
@ -118,20 +125,14 @@ bool EndpointAddressSet::operator==(const EndpointAddressSet& other) const {
bool EndpointAddressSet::operator<(const EndpointAddressSet& other) const {
auto other_it = other.addresses_.begin();
for (auto it = addresses_.begin(); it != addresses_.end(); ++it) {
if (other_it == other.addresses_.end()) return true;
if (other_it == other.addresses_.end()) return false;
if (it->len < other_it->len) return true;
if (it->len > other_it->len) return false;
int r = memcmp(it->addr, other_it->addr, it->len);
if (r != 0) return r < 0;
++other_it;
}
return false;
}
bool EndpointAddressSet::ResolvedAddressLessThan::operator()(
const grpc_resolved_address& addr1,
const grpc_resolved_address& addr2) const {
if (addr1.len < addr2.len) return true;
return memcmp(addr1.addr, addr2.addr, addr1.len) < 0;
return other_it != other.addresses_.end();
}
std::string EndpointAddressSet::ToString() const {

@ -91,6 +91,11 @@ class EndpointAddresses {
using EndpointAddressesList = std::vector<EndpointAddresses>;
struct ResolvedAddressLessThan {
bool operator()(const grpc_resolved_address& addr1,
const grpc_resolved_address& addr2) const;
};
class EndpointAddressSet {
public:
explicit EndpointAddressSet(
@ -103,11 +108,6 @@ class EndpointAddressSet {
std::string ToString() const;
private:
struct ResolvedAddressLessThan {
bool operator()(const grpc_resolved_address& addr1,
const grpc_resolved_address& addr2) const;
};
std::set<grpc_resolved_address, ResolvedAddressLessThan> addresses_;
};

@ -28,6 +28,7 @@
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/span.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
@ -36,10 +37,12 @@
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/resolver/endpoint_addresses.h"
#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
#include "test/core/util/test_config.h"
@ -205,6 +208,8 @@ TEST_F(OutlierDetectionTest, FailurePercentage) {
.SetFailurePercentageThreshold(1)
.SetFailurePercentageMinimumHosts(1)
.SetFailurePercentageRequestVolume(1)
.SetMaxEjectionTime(Duration::Seconds(1))
.SetBaseEjectionTime(Duration::Seconds(1))
.Build()),
lb_policy());
EXPECT_TRUE(status.ok()) << status;
@ -225,7 +230,187 @@ TEST_F(OutlierDetectionTest, FailurePercentage) {
for (const auto& addr : kAddresses) {
if (addr != *address) remaining_addresses.push_back(addr);
}
picker = WaitForRoundRobinListChange(kAddresses, remaining_addresses);
WaitForRoundRobinListChange(kAddresses, remaining_addresses);
// Advance time and run the timer callback to trigger un-ejection.
IncrementTimeBy(Duration::Seconds(10));
gpr_log(GPR_INFO, "### un-ejection complete");
// Expect a picker update.
WaitForRoundRobinListChange(remaining_addresses, kAddresses);
}
TEST_F(OutlierDetectionTest, MultipleAddressesPerEndpoint) {
if (!IsRoundRobinDelegateToPickFirstEnabled()) return;
// Can't use timer duration expectation here, because the Happy
// Eyeballs timer inside pick_first will use a different duration than
// the timer in outlier_detection.
SetExpectedTimerDuration(absl::nullopt);
constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
"ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
constexpr std::array<absl::string_view, 2> kEndpoint3Addresses = {
"ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
const std::array<EndpointAddresses, 3> kEndpoints = {
MakeEndpointAddresses(kEndpoint1Addresses),
MakeEndpointAddresses(kEndpoint2Addresses),
MakeEndpointAddresses(kEndpoint3Addresses)};
// Send initial update.
absl::Status status = ApplyUpdate(
BuildUpdate(kEndpoints, ConfigBuilder()
.SetFailurePercentageThreshold(1)
.SetFailurePercentageMinimumHosts(1)
.SetFailurePercentageRequestVolume(1)
.SetMaxEjectionTime(Duration::Seconds(1))
.SetBaseEjectionTime(Duration::Seconds(1))
.Build()),
lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// Expect normal startup.
auto picker = ExpectRoundRobinStartup(kEndpoints);
ASSERT_NE(picker, nullptr);
gpr_log(GPR_INFO, "### RR startup complete");
// Do a pick and report a failed call.
auto address = DoPickWithFailedCall(picker.get());
ASSERT_TRUE(address.has_value());
gpr_log(GPR_INFO, "### failed RPC on %s", address->c_str());
// Based on the address that the failed call went to, we determine
// which addresses to use in the subsequent steps.
absl::Span<const absl::string_view> ejected_endpoint_addresses;
absl::Span<const absl::string_view> sentinel_endpoint_addresses;
absl::string_view unmodified_endpoint_address;
std::vector<absl::string_view> final_addresses;
if (kEndpoint1Addresses[0] == *address) {
ejected_endpoint_addresses = kEndpoint1Addresses;
sentinel_endpoint_addresses = kEndpoint2Addresses;
unmodified_endpoint_address = kEndpoint3Addresses[0];
final_addresses = {kEndpoint1Addresses[1], kEndpoint2Addresses[1],
kEndpoint3Addresses[0]};
} else if (kEndpoint2Addresses[0] == *address) {
ejected_endpoint_addresses = kEndpoint2Addresses;
sentinel_endpoint_addresses = kEndpoint1Addresses;
unmodified_endpoint_address = kEndpoint3Addresses[0];
final_addresses = {kEndpoint1Addresses[1], kEndpoint2Addresses[1],
kEndpoint3Addresses[0]};
} else {
ejected_endpoint_addresses = kEndpoint3Addresses;
sentinel_endpoint_addresses = kEndpoint1Addresses;
unmodified_endpoint_address = kEndpoint2Addresses[0];
final_addresses = {kEndpoint1Addresses[1], kEndpoint2Addresses[0],
kEndpoint3Addresses[1]};
}
// Advance time and run the timer callback to trigger ejection.
IncrementTimeBy(Duration::Seconds(10));
gpr_log(GPR_INFO, "### ejection complete");
// Expect a picker that removes the ejected address.
WaitForRoundRobinListChange(
{kEndpoint1Addresses[0], kEndpoint2Addresses[0], kEndpoint3Addresses[0]},
{sentinel_endpoint_addresses[0], unmodified_endpoint_address});
gpr_log(GPR_INFO, "### ejected endpoint removed");
// Cause the connection to the ejected endpoint to fail, and then
// have it reconnect to a different address. The endpoint is still
// ejected, so the new address should not be used.
ExpectEndpointAddressChange(ejected_endpoint_addresses, 0, 1, nullptr);
// Need to drain the picker updates before calling
// ExpectEndpointAddressChange() again, since that will expect a
// re-resolution request in the queue.
DrainRoundRobinPickerUpdates(
{sentinel_endpoint_addresses[0], unmodified_endpoint_address});
gpr_log(GPR_INFO, "### done changing address of ejected endpoint");
// Do the same thing for the sentinel endpoint, so that we
// know that the LB policy has seen the address change for the ejected
// endpoint.
ExpectEndpointAddressChange(sentinel_endpoint_addresses, 0, 1, [&]() {
WaitForRoundRobinListChange(
{sentinel_endpoint_addresses[0], unmodified_endpoint_address},
{unmodified_endpoint_address});
});
WaitForRoundRobinListChange(
{unmodified_endpoint_address},
{sentinel_endpoint_addresses[1], unmodified_endpoint_address});
gpr_log(GPR_INFO, "### done changing address of ejected endpoint");
// Advance time and run the timer callback to trigger un-ejection.
IncrementTimeBy(Duration::Seconds(10));
gpr_log(GPR_INFO, "### un-ejection complete");
// The ejected endpoint should come back using the new address.
WaitForRoundRobinListChange(
{sentinel_endpoint_addresses[1], unmodified_endpoint_address},
final_addresses);
}
TEST_F(OutlierDetectionTest, EjectionStateResetsWhenEndpointAddressesChange) {
if (!IsRoundRobinDelegateToPickFirstEnabled()) return;
// Can't use timer duration expectation here, because the Happy
// Eyeballs timer inside pick_first will use a different duration than
// the timer in outlier_detection.
SetExpectedTimerDuration(absl::nullopt);
constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
"ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
constexpr std::array<absl::string_view, 2> kEndpoint3Addresses = {
"ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
const std::array<EndpointAddresses, 3> kEndpoints = {
MakeEndpointAddresses(kEndpoint1Addresses),
MakeEndpointAddresses(kEndpoint2Addresses),
MakeEndpointAddresses(kEndpoint3Addresses)};
auto kConfig = ConfigBuilder()
.SetFailurePercentageThreshold(1)
.SetFailurePercentageMinimumHosts(1)
.SetFailurePercentageRequestVolume(1)
.SetMaxEjectionTime(Duration::Seconds(1))
.SetBaseEjectionTime(Duration::Seconds(1))
.Build();
// Send initial update.
absl::Status status =
ApplyUpdate(BuildUpdate(kEndpoints, kConfig), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// Expect normal startup.
auto picker = ExpectRoundRobinStartup(kEndpoints);
ASSERT_NE(picker, nullptr);
gpr_log(GPR_INFO, "### RR startup complete");
// Do a pick and report a failed call.
auto ejected_address = DoPickWithFailedCall(picker.get());
ASSERT_TRUE(ejected_address.has_value());
gpr_log(GPR_INFO, "### failed RPC on %s", ejected_address->c_str());
// Based on the address that the failed call went to, we determine
// which addresses to use in the subsequent steps.
std::vector<absl::string_view> expected_round_robin_while_ejected;
std::vector<EndpointAddresses> new_endpoints;
if (kEndpoint1Addresses[0] == *ejected_address) {
expected_round_robin_while_ejected = {kEndpoint2Addresses[0],
kEndpoint3Addresses[0]};
new_endpoints = {MakeEndpointAddresses({kEndpoint1Addresses[0]}),
MakeEndpointAddresses(kEndpoint2Addresses),
MakeEndpointAddresses(kEndpoint3Addresses)};
} else if (kEndpoint2Addresses[0] == *ejected_address) {
expected_round_robin_while_ejected = {kEndpoint1Addresses[0],
kEndpoint3Addresses[0]};
new_endpoints = {MakeEndpointAddresses(kEndpoint1Addresses),
MakeEndpointAddresses({kEndpoint2Addresses[0]}),
MakeEndpointAddresses(kEndpoint3Addresses)};
} else {
expected_round_robin_while_ejected = {kEndpoint1Addresses[0],
kEndpoint2Addresses[0]};
new_endpoints = {MakeEndpointAddresses(kEndpoint1Addresses),
MakeEndpointAddresses(kEndpoint2Addresses),
MakeEndpointAddresses({kEndpoint3Addresses[0]})};
}
// Advance time and run the timer callback to trigger ejection.
IncrementTimeBy(Duration::Seconds(10));
gpr_log(GPR_INFO, "### ejection complete");
// Expect a picker that removes the ejected address.
WaitForRoundRobinListChange(
{kEndpoint1Addresses[0], kEndpoint2Addresses[0], kEndpoint3Addresses[0]},
expected_round_robin_while_ejected);
gpr_log(GPR_INFO, "### ejected endpoint removed");
// Send an update that removes the other address from the ejected endpoint.
status = ApplyUpdate(BuildUpdate(new_endpoints, kConfig), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// This should cause the address to start getting used again, since
// it's now associated with a different endpoint.
WaitForRoundRobinListChange(
expected_round_robin_while_ejected,
{kEndpoint1Addresses[0], kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
}
TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) {

@ -0,0 +1,34 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package")
grpc_package(name = "test/core/resolver")
licenses(["notice"])
grpc_cc_test(
name = "endpoint_addresses_test",
srcs = ["endpoint_addresses_test.cc"],
external_deps = ["gtest"],
language = "C++",
uses_event_engine = False,
uses_polling = False,
deps = [
"//:endpoint_addresses",
"//:parse_address",
"//:sockaddr_utils",
"//test/core/util:grpc_test_util",
],
)

@ -0,0 +1,105 @@
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/lib/resolver/endpoint_addresses.h"
#include <set>
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <grpc/support/log.h>
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/uri/uri_parser.h"
#include "test/core/util/test_config.h"
namespace grpc_core {
namespace testing {
namespace {
grpc_resolved_address MakeAddress(absl::string_view address_uri) {
auto uri = URI::Parse(address_uri);
GPR_ASSERT(uri.ok());
grpc_resolved_address address;
GPR_ASSERT(grpc_parse_uri(*uri, &address));
return address;
}
MATCHER_P(EqualsAddress, address_str, "") {
auto addr = grpc_sockaddr_to_uri(&arg);
if (!addr.ok()) {
*result_listener << "grpc_sockaddr_to_uri() failed";
return false;
}
return ::testing::ExplainMatchResult(*addr, address_str, result_listener);
}
TEST(ResolvedAddressLessThan, Basic) {
std::set<grpc_resolved_address, ResolvedAddressLessThan> address_set;
address_set.insert(MakeAddress("ipv4:127.0.0.2:443"));
address_set.insert(MakeAddress("ipv4:127.0.0.3:443"));
address_set.insert(MakeAddress("ipv4:127.0.0.1:443"));
EXPECT_THAT(address_set,
::testing::ElementsAre(EqualsAddress("ipv4:127.0.0.1:443"),
EqualsAddress("ipv4:127.0.0.2:443"),
EqualsAddress("ipv4:127.0.0.3:443")));
}
TEST(EndpointAddressSet, Basic) {
EndpointAddressSet set1({MakeAddress("ipv4:127.0.0.2:443"),
MakeAddress("ipv4:127.0.0.3:443"),
MakeAddress("ipv4:127.0.0.1:443")});
EXPECT_TRUE(set1 == set1);
EXPECT_FALSE(set1 < set1);
EXPECT_EQ(set1.ToString(), "{127.0.0.1:443, 127.0.0.2:443, 127.0.0.3:443}");
EndpointAddressSet set2({MakeAddress("ipv4:127.0.0.4:443"),
MakeAddress("ipv4:127.0.0.6:443"),
MakeAddress("ipv4:127.0.0.5:443")});
EXPECT_FALSE(set1 == set2);
EXPECT_TRUE(set1 < set2);
EXPECT_FALSE(set2 < set1);
EXPECT_EQ(set2.ToString(), "{127.0.0.4:443, 127.0.0.5:443, 127.0.0.6:443}");
}
TEST(EndpointAddressSet, Subset) {
EndpointAddressSet set1({MakeAddress("ipv4:127.0.0.2:443"),
MakeAddress("ipv4:127.0.0.3:443"),
MakeAddress("ipv4:127.0.0.1:443")});
EXPECT_EQ(set1.ToString(), "{127.0.0.1:443, 127.0.0.2:443, 127.0.0.3:443}");
EndpointAddressSet set2(
{MakeAddress("ipv4:127.0.0.2:443"), MakeAddress("ipv4:127.0.0.1:443")});
EXPECT_EQ(set2.ToString(), "{127.0.0.1:443, 127.0.0.2:443}");
EXPECT_FALSE(set1 == set2);
EXPECT_FALSE(set1 < set2);
EXPECT_TRUE(set2 < set1);
}
} // namespace
} // namespace testing
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
return RUN_ALL_TESTS();
}

@ -2899,6 +2899,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "endpoint_addresses_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save