[outlier detection] fix crash with pick_first and add tests (#33069)

Fixes #32967.

Also fix incorrect defaults for `enforcementPercentage` fields.
pull/33073/head
Mark D. Roth 2 years ago committed by GitHub
parent 1b479bdda3
commit 2c423d277c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      build_autogenerated.yaml
  2. 28
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
  3. 4
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h
  4. 2
      test/core/client_channel/lb_policy/BUILD
  5. 100
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  6. 127
      test/core/client_channel/lb_policy/outlier_detection_test.cc
  7. 75
      test/core/client_channel/lb_policy/weighted_round_robin_test.cc
  8. 29
      test/core/client_channel/lb_policy/xds_override_host_test.cc

@ -9830,6 +9830,7 @@ targets:
language: c++
headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
- test/core/event_engine/mock_event_engine.h
src:
- test/core/client_channel/lb_policy/outlier_detection_test.cc
deps:
@ -9994,6 +9995,7 @@ targets:
language: c++
headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
- test/core/event_engine/mock_event_engine.h
src:
- test/core/client_channel/lb_policy/pick_first_test.cc
deps:
@ -10674,6 +10676,7 @@ targets:
language: c++
headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
- test/core/event_engine/mock_event_engine.h
src:
- test/core/client_channel/lb_policy/round_robin_test.cc
deps:
@ -13720,6 +13723,7 @@ targets:
language: c++
headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
- test/core/event_engine/mock_event_engine.h
src:
- test/core/client_channel/lb_policy/xds_override_host_test.cc
deps:

@ -257,7 +257,12 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
void Eject(const Timestamp& time) {
ejection_time_ = time;
++multiplier_;
for (auto& subchannel : subchannels_) {
// Ejecting the subchannel may cause the child policy to unref the
// subchannel, so we need to be prepared for the set to be modified
// while we are iterating.
for (auto it = subchannels_.begin(); it != subchannels_.end();) {
SubchannelWrapper* subchannel = *it;
++it;
subchannel->Eject();
}
}
@ -394,8 +399,13 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
void OutlierDetectionLb::SubchannelWrapper::Eject() {
ejected_ = true;
for (auto& watcher : watchers_) {
watcher.second->Eject();
// Ejecting the subchannel may cause the child policy to cancel the watch,
// so we need to be prepared for the map to be modified while we are
// iterating.
for (auto it = watchers_.begin(); it != watchers_.end();) {
WatcherWrapper* watcher = it->second;
++it;
watcher->Eject();
}
}
@ -858,8 +868,10 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
config.success_rate_ejection->minimum_hosts) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] running success rate algorithm",
parent_.get());
"[outlier_detection_lb %p] running success rate algorithm: "
"stdev_factor=%d, enforcement_percentage=%d",
parent_.get(), config.success_rate_ejection->stdev_factor,
config.success_rate_ejection->enforcement_percentage);
}
// calculate ejection threshold: (mean - stdev *
// (success_rate_ejection.stdev_factor / 1000))
@ -917,8 +929,10 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
config.failure_percentage_ejection->minimum_hosts) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] running failure percentage algorithm",
parent_.get());
"[outlier_detection_lb %p] running failure percentage algorithm: "
"threshold=%d, enforcement_percentage=%d",
parent_.get(), config.failure_percentage_ejection->threshold,
config.failure_percentage_ejection->enforcement_percentage);
}
for (auto& candidate : failure_percentage_ejection_candidates) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {

@ -38,7 +38,7 @@ struct OutlierDetectionConfig {
uint32_t max_ejection_percent = 10;
struct SuccessRateEjection {
uint32_t stdev_factor = 1900;
uint32_t enforcement_percentage = 0;
uint32_t enforcement_percentage = 100;
uint32_t minimum_hosts = 5;
uint32_t request_volume = 100;
@ -56,7 +56,7 @@ struct OutlierDetectionConfig {
};
struct FailurePercentageEjection {
uint32_t threshold = 85;
uint32_t enforcement_percentage = 0;
uint32_t enforcement_percentage = 100;
uint32_t minimum_hosts = 5;
uint32_t request_volume = 50;

@ -34,6 +34,7 @@ grpc_cc_library(
deps = [
"//src/core:lb_policy",
"//src/core:subchannel_interface",
"//test/core/event_engine:mock_event_engine",
],
)
@ -205,7 +206,6 @@ grpc_cc_test(
deps = [
":lb_policy_test_lib",
"//src/core:grpc_lb_policy_weighted_round_robin",
"//test/core/event_engine:mock_event_engine",
"//test/core/util:grpc_test_util",
],
)

@ -20,13 +20,16 @@
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <chrono>
#include <deque>
#include <functional>
#include <initializer_list>
#include <map>
#include <memory>
#include <ratio>
#include <set>
#include <string>
#include <tuple>
@ -34,6 +37,7 @@
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_format.h"
@ -79,6 +83,7 @@
#include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/uri/uri_parser.h"
#include "test/core/event_engine/mock_event_engine.h"
namespace grpc_core {
namespace testing {
@ -896,6 +901,30 @@ class LoadBalancingPolicyTest : public ::testing::Test {
<< location.file() << ":" << location.line();
}
// Expect startup with RR with a set of addresses.
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> ExpectRoundRobinStartup(
absl::Span<const absl::string_view> addresses) {
ExpectConnectingUpdate();
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
for (size_t i = 0; i < addresses.size(); ++i) {
auto* subchannel = FindSubchannel(addresses[i]);
EXPECT_NE(subchannel, nullptr);
if (subchannel == nullptr) return nullptr;
EXPECT_TRUE(subchannel->ConnectionRequested());
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
if (i == 0) {
picker = WaitForConnected();
ExpectRoundRobinPicks(picker.get(), {addresses[0]});
} else {
picker = WaitForRoundRobinListChange(
absl::MakeSpan(addresses).subspan(0, i),
absl::MakeSpan(addresses).subspan(0, i + 1));
}
}
return picker;
}
// Requests a picker on picker and expects a Fail result.
// The failing status is passed to check_status.
void ExpectPickFail(LoadBalancingPolicy::SubchannelPicker* picker,
@ -964,6 +993,77 @@ class LoadBalancingPolicyTest : public ::testing::Test {
std::map<SubchannelKey, SubchannelState> subchannel_pool_;
};
// A subclass to be used for LB policies that start timers.
// Injects a mock EventEngine and provides the necessary framework for
// incrementing time and handling timer callbacks.
class TimeAwareLoadBalancingPolicyTest : public LoadBalancingPolicyTest {
protected:
// A custom time cache for which InvalidateCache() is a no-op. This
// ensures that when the timer callback instantiates its own ExecCtx
// and therefore its own ScopedTimeCache, it continues to see the time
// that we are injecting in the test.
class TestTimeCache final : public Timestamp::ScopedSource {
public:
TestTimeCache() : cached_time_(previous()->Now()) {}
Timestamp Now() override { return cached_time_; }
void InvalidateCache() override {}
void IncrementBy(Duration duration) { cached_time_ += duration; }
private:
Timestamp cached_time_;
};
TimeAwareLoadBalancingPolicyTest() {
auto mock_ee =
std::make_shared<grpc_event_engine::experimental::MockEventEngine>();
auto capture = [this](std::chrono::duration<int64_t, std::nano> duration,
absl::AnyInvocable<void()> callback) {
CheckExpectedTimerDuration(duration);
intptr_t key = next_key_++;
timer_callbacks_[key] = std::move(callback);
return grpc_event_engine::experimental::EventEngine::TaskHandle{key, 0};
};
ON_CALL(*mock_ee,
RunAfter(::testing::_, ::testing::A<absl::AnyInvocable<void()>>()))
.WillByDefault(capture);
auto cancel =
[this](
grpc_event_engine::experimental::EventEngine::TaskHandle handle) {
auto it = timer_callbacks_.find(handle.keys[0]);
if (it == timer_callbacks_.end()) return false;
timer_callbacks_.erase(it);
return true;
};
ON_CALL(*mock_ee, Cancel(::testing::_)).WillByDefault(cancel);
// Store in base class, to make it visible to the LB policy.
event_engine_ = std::move(mock_ee);
}
~TimeAwareLoadBalancingPolicyTest() override {
EXPECT_TRUE(timer_callbacks_.empty())
<< "WARNING: Test did not run all timer callbacks";
}
void RunTimerCallback() {
ASSERT_EQ(timer_callbacks_.size(), 1UL);
auto it = timer_callbacks_.begin();
ASSERT_NE(it->second, nullptr);
std::move(it->second)();
timer_callbacks_.erase(it);
}
// Called when the LB policy starts a timer.
// May be overridden by subclasses.
virtual void CheckExpectedTimerDuration(
grpc_event_engine::experimental::EventEngine::Duration) {}
std::map<intptr_t, absl::AnyInvocable<void()>> timer_callbacks_;
intptr_t next_key_ = 1;
TestTimeCache time_cache_;
};
} // namespace testing
} // namespace grpc_core

@ -17,16 +17,27 @@
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <array>
#include <chrono>
#include <memory>
#include <ratio>
#include <string>
#include <utility>
#include <vector>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/support/json.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
@ -39,7 +50,7 @@ namespace grpc_core {
namespace testing {
namespace {
class OutlierDetectionTest : public LoadBalancingPolicyTest {
class OutlierDetectionTest : public TimeAwareLoadBalancingPolicyTest {
protected:
class ConfigBuilder {
public:
@ -137,7 +148,34 @@ class OutlierDetectionTest : public LoadBalancingPolicyTest {
OutlierDetectionTest()
: lb_policy_(MakeLbPolicy("outlier_detection_experimental")) {}
absl::optional<std::string> DoPickWithFailedCall(
LoadBalancingPolicy::SubchannelPicker* picker) {
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
subchannel_call_tracker;
auto address = ExpectPickComplete(picker, {}, &subchannel_call_tracker);
if (address.has_value()) {
subchannel_call_tracker->Start();
FakeMetadata metadata({});
FakeBackendMetricAccessor backend_metric_accessor({});
LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
*address, absl::UnavailableError("uh oh"), &metadata,
&backend_metric_accessor};
subchannel_call_tracker->Finish(args);
}
return address;
}
void CheckExpectedTimerDuration(
grpc_event_engine::experimental::EventEngine::Duration duration)
override {
EXPECT_EQ(duration, expected_internal_)
<< "Expected: " << expected_internal_.count() << "ns"
<< "\n Actual: " << duration.count() << "ns";
}
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
grpc_event_engine::experimental::EventEngine::Duration expected_internal_ =
std::chrono::seconds(10);
};
TEST_F(OutlierDetectionTest, Basic) {
@ -168,6 +206,93 @@ TEST_F(OutlierDetectionTest, Basic) {
}
}
TEST_F(OutlierDetectionTest, FailurePercentage) {
constexpr std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:440", "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"};
// Send initial update.
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, ConfigBuilder()
.SetFailurePercentageThreshold(1)
.SetFailurePercentageMinimumHosts(1)
.SetFailurePercentageRequestVolume(1)
.Build()),
lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// Expect normal startup.
auto picker = ExpectRoundRobinStartup(kAddresses);
ASSERT_NE(picker, nullptr);
gpr_log(GPR_INFO, "### RR startup complete");
// Do a pick and report a failed call.
auto address = DoPickWithFailedCall(picker.get());
ASSERT_TRUE(address.has_value());
gpr_log(GPR_INFO, "### failed RPC on %s", address->c_str());
// Advance time and run the timer callback to trigger ejection.
time_cache_.IncrementBy(Duration::Seconds(10));
RunTimerCallback();
gpr_log(GPR_INFO, "### ejection complete");
// Expect a re-resolution request.
ExpectReresolutionRequest();
// Expect a picker update.
std::vector<absl::string_view> remaining_addresses;
for (const auto& addr : kAddresses) {
if (addr != *address) remaining_addresses.push_back(addr);
}
picker = WaitForRoundRobinListChange(kAddresses, remaining_addresses);
}
TEST_F(OutlierDetectionTest, FailurePercentageWithPickFirst) {
constexpr std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:440", "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"};
// Send initial update.
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses,
ConfigBuilder()
.SetFailurePercentageThreshold(1)
.SetFailurePercentageMinimumHosts(1)
.SetFailurePercentageRequestVolume(1)
.SetChildPolicy({{"pick_first", Json::FromObject({})}})
.Build()),
lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for the first address with
// the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg.
auto* subchannel = FindSubchannel(
kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true));
ASSERT_NE(subchannel, nullptr);
// When the LB policy receives the subchannel's initial connectivity
// state notification (IDLE), it will request a connection.
EXPECT_TRUE(subchannel->ConnectionRequested());
// This causes the subchannel to start to connect, so it reports CONNECTING.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// LB policy should have reported CONNECTING state.
ExpectConnectingUpdate();
// When the subchannel becomes connected, it reports READY.
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
// The LB policy will report CONNECTING some number of times (doesn't
// matter how many) and then report READY.
auto picker = WaitForConnected();
ASSERT_NE(picker, nullptr);
// Picker should return the same subchannel repeatedly.
for (size_t i = 0; i < 3; ++i) {
EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
}
gpr_log(GPR_INFO, "### PF startup complete");
// Now have an RPC to that subchannel fail.
auto address = DoPickWithFailedCall(picker.get());
ASSERT_TRUE(address.has_value());
gpr_log(GPR_INFO, "### failed RPC on %s", address->c_str());
// Advance time and run the timer callback to trigger ejection.
time_cache_.IncrementBy(Duration::Seconds(10));
RunTimerCallback();
gpr_log(GPR_INFO, "### ejection complete");
// Expect a re-resolution request.
ExpectReresolutionRequest();
// The pick_first policy should report IDLE with a queuing picker.
ExpectStateAndQueuingPicker(GRPC_CHANNEL_IDLE);
// The queued pick should have triggered a reconnection attempt.
EXPECT_TRUE(subchannel->ConnectionRequested());
}
} // namespace
} // namespace testing
} // namespace grpc_core

@ -27,7 +27,6 @@
#include <utility>
#include <vector>
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
@ -35,7 +34,6 @@
#include "absl/time/time.h"
#include "absl/types/optional.h"
#include "absl/types/span.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
@ -52,16 +50,12 @@
#include "src/core/lib/json/json_writer.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
#include "test/core/event_engine/mock_event_engine.h"
#include "test/core/util/test_config.h"
namespace grpc_core {
namespace testing {
namespace {
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::MockEventEngine;
BackendMetricData MakeBackendMetricData(double cpu_utilization, double qps,
double eps) {
BackendMetricData b;
@ -71,7 +65,7 @@ BackendMetricData MakeBackendMetricData(double cpu_utilization, double qps,
return b;
}
class WeightedRoundRobinTest : public LoadBalancingPolicyTest {
class WeightedRoundRobinTest : public TimeAwareLoadBalancingPolicyTest {
protected:
class ConfigBuilder {
public:
@ -117,61 +111,10 @@ class WeightedRoundRobinTest : public LoadBalancingPolicyTest {
Json::Object json_;
};
// A custom time cache for which InvalidateCache() is a no-op. This
// ensures that when the timer callback instantiates its own ExecCtx
// and therefore its own ScopedTimeCache, it continues to see the time
// that we are injecting in the test.
class TestTimeCache final : public Timestamp::ScopedSource {
public:
TestTimeCache() : cached_time_(previous()->Now()) {}
Timestamp Now() override { return cached_time_; }
void InvalidateCache() override {}
void IncrementBy(Duration duration) { cached_time_ += duration; }
private:
Timestamp cached_time_;
};
WeightedRoundRobinTest() {
mock_ee_ = std::make_shared<MockEventEngine>();
event_engine_ = mock_ee_;
auto capture = [this](std::chrono::duration<int64_t, std::nano> duration,
absl::AnyInvocable<void()> callback) {
EXPECT_EQ(duration, expected_weight_update_interval_)
<< "Expected: " << expected_weight_update_interval_.count() << "ns"
<< "\n Actual: " << duration.count() << "ns";
intptr_t key = next_key_++;
timer_callbacks_[key] = std::move(callback);
return EventEngine::TaskHandle{key, 0};
};
ON_CALL(*mock_ee_,
RunAfter(::testing::_, ::testing::A<absl::AnyInvocable<void()>>()))
.WillByDefault(capture);
auto cancel = [this](EventEngine::TaskHandle handle) {
auto it = timer_callbacks_.find(handle.keys[0]);
if (it == timer_callbacks_.end()) return false;
timer_callbacks_.erase(it);
return true;
};
ON_CALL(*mock_ee_, Cancel(::testing::_)).WillByDefault(cancel);
lb_policy_ = MakeLbPolicy("weighted_round_robin");
}
~WeightedRoundRobinTest() override {
EXPECT_TRUE(timer_callbacks_.empty())
<< "WARNING: Test did not run all timer callbacks";
}
void RunTimerCallback() {
ASSERT_EQ(timer_callbacks_.size(), 1UL);
auto it = timer_callbacks_.begin();
ASSERT_NE(it->second, nullptr);
std::move(it->second)();
timer_callbacks_.erase(it);
}
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
SendInitialUpdateAndWaitForConnected(
absl::Span<const absl::string_view> addresses,
@ -368,13 +311,17 @@ class WeightedRoundRobinTest : public LoadBalancingPolicyTest {
}
}
void CheckExpectedTimerDuration(
grpc_event_engine::experimental::EventEngine::Duration duration)
override {
EXPECT_EQ(duration, expected_weight_update_interval_)
<< "Expected: " << expected_weight_update_interval_.count() << "ns"
<< "\n Actual: " << duration.count() << "ns";
}
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
std::shared_ptr<MockEventEngine> mock_ee_;
std::map<intptr_t, absl::AnyInvocable<void()>> timer_callbacks_;
intptr_t next_key_ = 1;
EventEngine::Duration expected_weight_update_interval_ =
std::chrono::seconds(1);
TestTimeCache time_cache_;
grpc_event_engine::experimental::EventEngine::Duration
expected_weight_update_interval_ = std::chrono::seconds(1);
};
TEST_F(WeightedRoundRobinTest, Basic) {

@ -14,8 +14,6 @@
// limitations under the License.
//
#include <stddef.h>
#include <algorithm>
#include <array>
#include <map>
@ -71,30 +69,11 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
}
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
ExpectStartupWithRoundRobin(absl::Span<const absl::string_view> addresses,
RefCountedPtr<LoadBalancingPolicy::Config>
config = MakeXdsOverrideHostConfig()) {
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, config), policy_.get()),
ExpectStartupWithRoundRobin(absl::Span<const absl::string_view> addresses) {
EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, MakeXdsOverrideHostConfig()),
policy_.get()),
absl::OkStatus());
ExpectConnectingUpdate();
for (size_t i = 0; i < addresses.size(); ++i) {
auto* subchannel = FindSubchannel(addresses[i]);
EXPECT_NE(subchannel, nullptr);
if (subchannel == nullptr) return nullptr;
EXPECT_TRUE(subchannel->ConnectionRequested());
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
if (i == 0) {
picker = WaitForConnected();
ExpectRoundRobinPicks(picker.get(), {addresses[0]});
} else {
picker = WaitForRoundRobinListChange(
absl::MakeSpan(addresses).subspan(0, i),
absl::MakeSpan(addresses).subspan(0, i + 1));
}
}
return picker;
return ExpectRoundRobinStartup(addresses);
}
ServerAddress MakeAddressWithHealthStatus(

Loading…
Cancel
Save