Use a global atomic for circuit breaking call counter.

pull/24481/head
Mark D. Roth 4 years ago
parent 1f7a2ee471
commit b607080400
  1. 122
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  2. 150
      test/cpp/end2end/xds_end2end_test.cc

@ -32,6 +32,7 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/work_serializer.h"
namespace grpc_core {
@ -40,6 +41,69 @@ TraceFlag grpc_xds_cluster_impl_lb_trace(false, "xds_cluster_impl_lb");
namespace {
//
// global circuit breaker atomic map
//
class CircuitBreakerCallCounterMap {
public:
using Key =
std::pair<std::string /*cluster*/, std::string /*eds_service_name*/>;
class CallCounter : public RefCounted<CallCounter> {
public:
explicit CallCounter(Key key) : key_(std::move(key)) {}
~CallCounter() override;
uint32_t Increment() { return concurrent_requests_.FetchAdd(1); }
void Decrement() { concurrent_requests_.FetchSub(1); }
private:
Key key_;
Atomic<uint32_t> concurrent_requests_{0};
};
RefCountedPtr<CallCounter> GetOrCreate(const std::string& cluster,
const std::string& eds_service_name);
private:
Mutex mu_;
std::map<Key, CallCounter*> map_;
};
CircuitBreakerCallCounterMap* g_call_counter_map = nullptr;
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter>
CircuitBreakerCallCounterMap::GetOrCreate(const std::string& cluster,
const std::string& eds_service_name) {
Key key(cluster, eds_service_name);
RefCountedPtr<CallCounter> result;
MutexLock lock(&mu_);
auto it = map_.find(key);
if (it == map_.end()) {
it = map_.insert({key, nullptr}).first;
} else {
result = it->second->RefIfNonZero();
}
if (result == nullptr) {
result = MakeRefCounted<CallCounter>(std::move(key));
it->second = result.get();
}
return result;
}
CircuitBreakerCallCounterMap::CallCounter::~CallCounter() {
MutexLock lock(&g_call_counter_map->mu_);
auto it = g_call_counter_map->map_.find(key_);
if (it != g_call_counter_map->map_.end() && it->second == this) {
g_call_counter_map->map_.erase(it);
}
}
//
// LB policy
//
constexpr char kXdsClusterImpl[] = "xds_cluster_impl_experimental";
// TODO (donnadionne): Check to see if circuit breaking is enabled, this will be
@ -138,13 +202,13 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
// A picker that wraps the picker from the child to perform drops.
class Picker : public SubchannelPicker {
public:
Picker(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_lb,
Picker(XdsClusterImplLb* xds_cluster_impl_lb,
RefCountedPtr<RefCountedPicker> picker);
PickResult Pick(PickArgs args) override;
private:
RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_lb_;
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
bool xds_circuit_breaking_enabled_;
uint32_t max_concurrent_requests_;
RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_;
@ -187,8 +251,8 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
// Current config from the resolver.
RefCountedPtr<XdsClusterImplLbConfig> config_;
// Current concurrent number of requests;
Atomic<uint32_t> concurrent_requests_{0};
// Current concurrent number of requests.
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
// Internal state.
bool shutting_down_ = false;
@ -211,19 +275,18 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
// XdsClusterImplLb::Picker
//
XdsClusterImplLb::Picker::Picker(
RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_lb,
RefCountedPtr<RefCountedPicker> picker)
: xds_cluster_impl_lb_(std::move(xds_cluster_impl_lb)),
XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
RefCountedPtr<RefCountedPicker> picker)
: call_counter_(xds_cluster_impl_lb->call_counter_),
xds_circuit_breaking_enabled_(XdsCircuitBreakingEnabled()),
max_concurrent_requests_(
xds_cluster_impl_lb_->config_->max_concurrent_requests()),
drop_config_(xds_cluster_impl_lb_->config_->drop_config()),
drop_stats_(xds_cluster_impl_lb_->drop_stats_),
xds_cluster_impl_lb->config_->max_concurrent_requests()),
drop_config_(xds_cluster_impl_lb->config_->drop_config()),
drop_stats_(xds_cluster_impl_lb->drop_stats_),
picker_(std::move(picker)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] constructed new picker %p",
xds_cluster_impl_lb_.get(), this);
xds_cluster_impl_lb, this);
}
}
@ -238,11 +301,11 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
return result;
}
// Handle circuit breaking.
uint32_t current = xds_cluster_impl_lb_->concurrent_requests_.FetchAdd(1);
uint32_t current = call_counter_->Increment();
if (xds_circuit_breaking_enabled_) {
// Check and see if we exceeded the max concurrent requests count.
if (current >= max_concurrent_requests_) {
xds_cluster_impl_lb_->concurrent_requests_.FetchSub(1);
call_counter_->Decrement();
if (drop_stats_ != nullptr) drop_stats_->AddUncategorizedDrops();
PickResult result;
result.type = PickResult::PICK_COMPLETE;
@ -257,7 +320,7 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"xds_cluster_impl picker not given any child picker"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
xds_cluster_impl_lb_->concurrent_requests_.FetchSub(1);
call_counter_->Decrement();
return result;
}
// Not dropping, so delegate to child picker.
@ -275,17 +338,15 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
result.subchannel = subchannel_wrapper->wrapped_subchannel();
}
// Intercept the recv_trailing_metadata op to record call completion.
XdsClusterImplLb* xds_cluster_impl_lb = static_cast<XdsClusterImplLb*>(
xds_cluster_impl_lb_->Ref(DEBUG_LOCATION, "DropPickPicker+call")
.release());
auto* call_counter = call_counter_->Ref(DEBUG_LOCATION, "call").release();
auto original_recv_trailing_metadata_ready =
result.recv_trailing_metadata_ready;
result.recv_trailing_metadata_ready =
// Note: This callback does not run in either the control plane
// work serializer or in the data plane mutex.
[locality_stats, original_recv_trailing_metadata_ready,
xds_cluster_impl_lb](grpc_error* error, MetadataInterface* metadata,
CallState* call_state) {
[locality_stats, original_recv_trailing_metadata_ready, call_counter](
grpc_error* error, MetadataInterface* metadata,
CallState* call_state) {
// Record call completion for load reporting.
if (locality_stats != nullptr) {
const bool call_failed = error != GRPC_ERROR_NONE;
@ -293,8 +354,8 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call");
}
// Decrement number of calls in flight.
xds_cluster_impl_lb->concurrent_requests_.FetchSub(1);
xds_cluster_impl_lb->Unref(DEBUG_LOCATION, "DropPickPicker+call");
call_counter->Decrement();
call_counter->Unref(DEBUG_LOCATION, "call");
// Invoke the original recv_trailing_metadata_ready callback, if any.
if (original_recv_trailing_metadata_ready != nullptr) {
original_recv_trailing_metadata_ready(error, metadata, call_state);
@ -305,7 +366,7 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
// where a pick fails. This is challenging, because we don't know which
// picks are for wait_for_ready RPCs or how many times we'll return a
// failure for the same wait_for_ready RPC.
xds_cluster_impl_lb_->concurrent_requests_.FetchSub(1);
call_counter_->Decrement();
}
return result;
}
@ -375,6 +436,8 @@ void XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
config_->lrs_load_reporting_server_name().value(),
config_->cluster_name(), config_->eds_service_name());
}
call_counter_ = g_call_counter_map->GetOrCreate(
config_->cluster_name(), config_->eds_service_name());
} else {
// Cluster name, EDS service name, and LRS server name should never
// change, because the EDS policy above us should be swapped out if
@ -398,8 +461,7 @@ void XdsClusterImplLb::MaybeUpdatePickerLocked() {
// If we're dropping all calls, report READY, regardless of what (or
// whether) the child has reported.
if (config_->drop_config() != nullptr && config_->drop_config()->drop_all()) {
auto drop_picker =
absl::make_unique<Picker>(Ref(DEBUG_LOCATION, "Picker"), picker_);
auto drop_picker = absl::make_unique<Picker>(this, picker_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_impl_lb %p] updating connectivity (drop all): "
@ -413,8 +475,7 @@ void XdsClusterImplLb::MaybeUpdatePickerLocked() {
}
// Otherwise, update only if we have a child picker.
if (picker_ != nullptr) {
auto drop_picker =
absl::make_unique<Picker>(Ref(DEBUG_LOCATION, "Picker"), picker_);
auto drop_picker = absl::make_unique<Picker>(this, picker_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_impl_lb %p] updating connectivity: state=%s "
@ -737,9 +798,12 @@ class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory {
//
void grpc_lb_policy_xds_cluster_impl_init() {
grpc_core::g_call_counter_map = new grpc_core::CircuitBreakerCallCounterMap();
grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory(
absl::make_unique<grpc_core::XdsClusterImplLbFactory>());
}
void grpc_lb_policy_xds_cluster_impl_shutdown() {}
void grpc_lb_policy_xds_cluster_impl_shutdown() {
delete grpc_core::g_call_counter_map;
}

@ -1503,7 +1503,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
}
std::shared_ptr<Channel> CreateChannel(
int failover_timeout = 0, const char* server_name = kServerName) {
int failover_timeout = 0, const char* server_name = kServerName,
grpc_core::FakeResolverResponseGenerator* response_generator = nullptr) {
ChannelArguments args;
if (failover_timeout > 0) {
args.SetInt(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS, failover_timeout);
@ -1511,8 +1512,11 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
// If the parent channel is using the fake resolver, we inject the
// response generator here.
if (!GetParam().use_xds_resolver()) {
if (response_generator == nullptr) {
response_generator = response_generator_.get();
}
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_.get());
response_generator);
}
std::string uri = absl::StrCat(
GetParam().use_xds_resolver() ? "xds" : "fake", ":///", server_name);
@ -1697,7 +1701,9 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
return addresses;
}
void SetNextResolution(const std::vector<int>& ports) {
void SetNextResolution(
const std::vector<int>& ports,
grpc_core::FakeResolverResponseGenerator* response_generator = nullptr) {
if (GetParam().use_xds_resolver()) return; // Not used with xds resolver.
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
@ -1711,7 +1717,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
grpc_core::ServiceConfig::Create(nullptr, service_config_json, &error);
ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_string(error);
ASSERT_NE(result.service_config.get(), nullptr);
response_generator_->SetResponse(std::move(result));
if (response_generator == nullptr) {
response_generator = response_generator_.get();
}
response_generator->SetResponse(std::move(result));
}
void SetNextResolutionForLbChannelAllBalancers(
@ -1992,6 +2001,28 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
std::shared_ptr<LrsServiceImpl> lrs_service_;
};
class LongRunningRpc {
public:
void StartRpc(grpc::testing::EchoTestService::Stub* stub) {
sender_thread_ = std::thread([this, stub]() {
EchoResponse response;
EchoRequest request;
request.mutable_param()->set_client_cancel_after_us(1 * 1000 * 1000);
request.set_message(kRequestMessage);
stub->Echo(&context_, request, &response);
});
}
void CancelRpc() {
context_.TryCancel();
sender_thread_.join();
}
private:
std::thread sender_thread_;
ClientContext context_;
};
const size_t num_backends_;
const size_t num_balancers_;
const int client_load_reporting_interval_seconds_;
@ -2370,31 +2401,6 @@ TEST_P(XdsResolverOnlyTest, DefaultRouteSpecifiesSlashPrefix) {
}
TEST_P(XdsResolverOnlyTest, CircuitBreaking) {
class TestRpc {
public:
TestRpc() {}
void StartRpc(grpc::testing::EchoTestService::Stub* stub) {
sender_thread_ = std::thread([this, stub]() {
EchoResponse response;
EchoRequest request;
request.mutable_param()->set_client_cancel_after_us(1 * 1000 * 1000);
request.set_message(kRequestMessage);
status_ = stub->Echo(&context_, request, &response);
});
}
void CancelRpc() {
context_.TryCancel();
sender_thread_.join();
}
private:
std::thread sender_thread_;
ClientContext context_;
Status status_;
};
gpr_setenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING", "true");
constexpr size_t kMaxConcurrentRequests = 10;
SetNextResolution({});
@ -2413,7 +2419,7 @@ TEST_P(XdsResolverOnlyTest, CircuitBreaking) {
threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests);
balancers_[0]->ads_service()->SetCdsResource(cluster);
// Send exactly max_concurrent_requests long RPCs.
TestRpc rpcs[kMaxConcurrentRequests];
LongRunningRpc rpcs[kMaxConcurrentRequests];
for (size_t i = 0; i < kMaxConcurrentRequests; ++i) {
rpcs[i].StartRpc(stub_.get());
}
@ -2441,32 +2447,64 @@ TEST_P(XdsResolverOnlyTest, CircuitBreaking) {
gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING");
}
TEST_P(XdsResolverOnlyTest, CircuitBreakingDisabled) {
class TestRpc {
public:
TestRpc() {}
void StartRpc(grpc::testing::EchoTestService::Stub* stub) {
sender_thread_ = std::thread([this, stub]() {
EchoResponse response;
EchoRequest request;
request.mutable_param()->set_client_cancel_after_us(1 * 1000 * 1000);
request.set_message(kRequestMessage);
status_ = stub->Echo(&context_, request, &response);
});
}
void CancelRpc() {
context_.TryCancel();
sender_thread_.join();
}
private:
std::thread sender_thread_;
ClientContext context_;
Status status_;
};
TEST_P(XdsResolverOnlyTest, CircuitBreakingMultipleChannelsShareCallCounter) {
gpr_setenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING", "true");
constexpr size_t kMaxConcurrentRequests = 10;
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts(0, 1)},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args));
// Update CDS resource to set max concurrent request.
CircuitBreakers circuit_breaks;
Cluster cluster = balancers_[0]->ads_service()->default_cluster();
auto* threshold = cluster.mutable_circuit_breakers()->add_thresholds();
threshold->set_priority(RoutingPriority::DEFAULT);
threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests);
balancers_[0]->ads_service()->SetCdsResource(cluster);
// Create second channel.
auto response_generator2 =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
auto channel2 = CreateChannel(
/*failover_timeout=*/0, /*server_name=*/kServerName,
response_generator2.get());
auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
// Set resolution results for both channels and for the xDS channel.
SetNextResolution({});
SetNextResolution({}, response_generator2.get());
SetNextResolutionForLbChannelAllBalancers();
// Send exactly max_concurrent_requests long RPCs, alternating between
// the two channels.
LongRunningRpc rpcs[kMaxConcurrentRequests];
for (size_t i = 0; i < kMaxConcurrentRequests; ++i) {
rpcs[i].StartRpc(i % 2 == 0 ? stub_.get() : stub2.get());
}
// Wait for all RPCs to be in flight.
while (backends_[0]->backend_service()->RpcsWaitingForClientCancel() <
kMaxConcurrentRequests) {
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(1 * 1000, GPR_TIMESPAN)));
}
// Sending a RPC now should fail, the error message should tell us
// we hit the max concurrent requests limit and got dropped.
Status status = SendRpc();
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
// Cancel one RPC to allow another one through
rpcs[0].CancelRpc();
status = SendRpc();
EXPECT_TRUE(status.ok());
for (size_t i = 1; i < kMaxConcurrentRequests; ++i) {
rpcs[i].CancelRpc();
}
// Make sure RPCs go to the correct backend:
EXPECT_EQ(kMaxConcurrentRequests + 1,
backends_[0]->backend_service()->request_count());
gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING");
}
TEST_P(XdsResolverOnlyTest, CircuitBreakingDisabled) {
constexpr size_t kMaxConcurrentRequests = 10;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
@ -2484,7 +2522,7 @@ TEST_P(XdsResolverOnlyTest, CircuitBreakingDisabled) {
threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests);
balancers_[0]->ads_service()->SetCdsResource(cluster);
// Send exactly max_concurrent_requests long RPCs.
TestRpc rpcs[kMaxConcurrentRequests];
LongRunningRpc rpcs[kMaxConcurrentRequests];
for (size_t i = 0; i < kMaxConcurrentRequests; ++i) {
rpcs[i].StartRpc(stub_.get());
}

Loading…
Cancel
Save