rls: fix various bugs in adaptive throttling code (#28477)

* rls: fix adaptive throttling window size

* clang-format

* fix adaptive throttling logic and fix FailedRlsRequestWithoutDefaultTarget test
pull/28568/head
Mark D. Roth 3 years ago committed by GitHub
parent 8ca42ec6f8
commit 227d65367c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 84
      src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
  2. 31
      test/cpp/end2end/rls_end2end_test.cc

@ -92,9 +92,9 @@ const grpc_millis kCacheBackoffInitial = 1 * GPR_MS_PER_SEC;
const double kCacheBackoffMultiplier = 1.6;
const double kCacheBackoffJitter = 0.2;
const grpc_millis kCacheBackoffMax = 120 * GPR_MS_PER_SEC;
const grpc_millis kDefaultThrottleWindowSize = 30 * GPR_MS_PER_SEC;
const double kDefaultThrottleRatioForSuccesses = 2.0;
const int kDefaultThrottlePaddings = 8;
const grpc_millis kDefaultThrottleWindowSizeMs = 30 * GPR_MS_PER_SEC;
const float kDefaultThrottleRatioForSuccesses = 2.0;
const int kDefaultThrottlePadding = 8;
const grpc_millis kCacheCleanupTimerInterval = 60 * GPR_MS_PER_SEC;
const int64_t kMaxCacheSizeBytes = 5 * 1024 * 1024;
@ -556,8 +556,13 @@ class RlsLb : public LoadBalancingPolicy {
// Throttle state for RLS requests.
class Throttle {
public:
explicit Throttle(int window_size_seconds = 0,
double ratio_for_successes = 0, int paddings = 0);
explicit Throttle(
int window_size_ms = kDefaultThrottleWindowSizeMs,
float ratio_for_successes = kDefaultThrottleRatioForSuccesses,
int padding = kDefaultThrottlePadding)
: window_size_ms_(window_size_ms),
ratio_for_successes_(ratio_for_successes),
padding_(padding) {}
bool ShouldThrottle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
@ -565,15 +570,15 @@ class RlsLb : public LoadBalancingPolicy {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
private:
grpc_millis window_size_;
double ratio_for_successes_;
int paddings_;
grpc_millis window_size_ms_;
float ratio_for_successes_;
int padding_;
// Logged timestamp of requests.
// Logged timestamps of requests.
std::deque<grpc_millis> requests_ ABSL_GUARDED_BY(&RlsLb::mu_);
// Logged timestamp of responses that were successful.
std::deque<grpc_millis> successes_ ABSL_GUARDED_BY(&RlsLb::mu_);
// Logged timestamps of failures.
std::deque<grpc_millis> failures_ ABSL_GUARDED_BY(&RlsLb::mu_);
};
RefCountedPtr<RlsLb> lb_policy_;
@ -1458,41 +1463,40 @@ void RlsLb::RlsChannel::StateWatcher::OnConnectivityStateChange(
// RlsLb::RlsChannel::Throttle
//
RlsLb::RlsChannel::Throttle::Throttle(int window_size_seconds,
double ratio_for_successes,
int paddings) {
GPR_DEBUG_ASSERT(window_size_seconds >= 0);
GPR_DEBUG_ASSERT(ratio_for_successes >= 0);
GPR_DEBUG_ASSERT(paddings >= 0);
window_size_ = window_size_seconds == 0 ? window_size_seconds * GPR_MS_PER_SEC
: kDefaultThrottleWindowSize;
ratio_for_successes_ = ratio_for_successes == 0
? kDefaultThrottleRatioForSuccesses
: ratio_for_successes;
paddings_ = paddings == 0 ? kDefaultThrottlePaddings : paddings;
}
bool RlsLb::RlsChannel::Throttle::ShouldThrottle() {
grpc_millis now = ExecCtx::Get()->Now();
while (!requests_.empty() && now - requests_.front() > window_size_) {
while (!requests_.empty() && now - requests_.front() > window_size_ms_) {
requests_.pop_front();
}
while (!successes_.empty() && now - successes_.front() > window_size_) {
successes_.pop_front();
}
int successes = successes_.size();
int requests = requests_.size();
bool result = ((rand() % (requests + paddings_)) <
static_cast<double>(requests) -
static_cast<double>(successes) * ratio_for_successes_);
requests_.push_back(now);
return result;
while (!failures_.empty() && now - failures_.front() > window_size_ms_) {
failures_.pop_front();
}
// Compute probability of throttling.
float num_requests = requests_.size();
float num_successes = num_requests - failures_.size();
// Note: it's possible that this ratio will be negative, in which case
// no throttling will be done.
float throttle_probability =
(num_requests - (num_successes * ratio_for_successes_)) /
(num_requests + padding_);
// Generate a random number for the request.
std::random_device rd;
std::mt19937 mt(rd());
std::uniform_real_distribution<float> dist(0, 1.0);
// Check if we should throttle the request.
bool throttle = dist(mt) < throttle_probability;
// If we're throttling, record the request and the failure.
if (throttle) {
requests_.push_back(now);
failures_.push_back(now);
}
return throttle;
}
void RlsLb::RlsChannel::Throttle::RegisterResponse(bool success) {
if (success) {
successes_.push_back(ExecCtx::Get()->Now());
}
grpc_millis now = ExecCtx::Get()->Now();
requests_.push_back(now);
if (!success) failures_.push_back(now);
}
//
@ -1761,7 +1765,7 @@ void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) {
{
MutexLock lock(&lb_policy_->mu_);
if (lb_policy_->is_shutdown_) return;
rls_channel_->ReportResponseLocked(!response.status.ok());
rls_channel_->ReportResponseLocked(response.status.ok());
Cache::Entry* cache_entry = lb_policy_->cache_.FindOrInsert(key_);
child_policies_to_finish_update = cache_entry->OnRlsResponseLocked(
std::move(response), std::move(backoff_state_));

@ -19,6 +19,7 @@
// - RLS channel is down; wait_for_ready request is sent and RLS request fails
// and goes into backoff; RLS channel comes back up before backoff timer
// fires; request is processed at that point
// - find some deterministic way to exercise adaptive throttler code
#include <deque>
#include <map>
@ -912,6 +913,28 @@ TEST_F(RlsEnd2endTest, FailedRlsRequestWithoutDefaultTarget) {
"]",
kServiceValue, kMethodValue, kTestKey))
.Build());
// The test below has one RLS RPC fail and then a subsequent one that
// should succeed. However, once the first RPC fails, the adaptive
// throttling code will throttle the second RPC with about 11% probability,
// which would cause the test to be flaky. To avoid that, we seed the
// throttling state by sending two successful RPCs before we start the
// real test, which ensures that the second RPC of the real test will
// not be throttled (with 3 successes and 1 failure, the throttling
// probability will be negative, so the subsequent request will never be
// throttled).
const char* kTestValue2 = "test_value_2";
const char* kTestValue3 = "test_value_3";
rls_server_->service_.SetResponse(
BuildRlsRequest({{kTestKey, kTestValue2}}),
BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
rls_server_->service_.SetResponse(
BuildRlsRequest({{kTestKey, kTestValue3}}),
BuildRlsResponse({TargetStringForPort(backends_[0]->port_)}));
CheckRpcSendOk(DEBUG_LOCATION,
RpcOptions().set_metadata({{"key1", kTestValue2}}));
CheckRpcSendOk(DEBUG_LOCATION,
RpcOptions().set_metadata({{"key1", kTestValue3}}));
// Now start the real test.
// Send an RPC before we give the RLS server a response.
// The RLS request will fail, and thus so will the data plane RPC.
CheckRpcSendFailure(DEBUG_LOCATION,
@ -932,9 +955,9 @@ TEST_F(RlsEnd2endTest, FailedRlsRequestWithoutDefaultTarget) {
gpr_sleep_until(grpc_timeout_seconds_to_deadline(3));
CheckRpcSendOk(DEBUG_LOCATION,
RpcOptions().set_metadata({{"key1", kTestValue}}));
EXPECT_EQ(rls_server_->service_.request_count(), 2);
EXPECT_EQ(rls_server_->service_.response_count(), 1);
EXPECT_EQ(backends_[0]->service_.request_count(), 1);
EXPECT_EQ(rls_server_->service_.request_count(), 4);
EXPECT_EQ(rls_server_->service_.response_count(), 3);
EXPECT_EQ(backends_[0]->service_.request_count(), 3);
}
TEST_F(RlsEnd2endTest, FailedRlsRequestWithDefaultTarget) {
@ -1325,7 +1348,7 @@ TEST_F(RlsEnd2endTest, MultipleTargets) {
rls_server_->service_.SetResponse(
BuildRlsRequest({{kTestKey, kTestValue}}),
BuildRlsResponse(
// First target will report TRANSIENT_FAILURE..
// First target will report TRANSIENT_FAILURE.
{"invalid_target", TargetStringForPort(backends_[0]->port_)}));
CheckRpcSendOk(DEBUG_LOCATION,
RpcOptions().set_metadata({{"key1", kTestValue}}));

Loading…
Cancel
Save