From b607080400d1fbc298de7e5c6bccdcd91aa42f85 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 19 Oct 2020 15:08:23 -0700 Subject: [PATCH] Use a global atomic for circuit breaking call counter. --- .../lb_policy/xds/xds_cluster_impl.cc | 122 ++++++++++---- test/cpp/end2end/xds_end2end_test.cc | 150 +++++++++++------- 2 files changed, 187 insertions(+), 85 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc index ad2f61cffb0..b34767768ce 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc @@ -32,6 +32,7 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/work_serializer.h" namespace grpc_core { @@ -40,6 +41,69 @@ TraceFlag grpc_xds_cluster_impl_lb_trace(false, "xds_cluster_impl_lb"); namespace { +// +// global circuit breaker atomic map +// + +class CircuitBreakerCallCounterMap { + public: + using Key = + std::pair; + + class CallCounter : public RefCounted { + public: + explicit CallCounter(Key key) : key_(std::move(key)) {} + ~CallCounter() override; + + uint32_t Increment() { return concurrent_requests_.FetchAdd(1); } + void Decrement() { concurrent_requests_.FetchSub(1); } + + private: + Key key_; + Atomic concurrent_requests_{0}; + }; + + RefCountedPtr GetOrCreate(const std::string& cluster, + const std::string& eds_service_name); + + private: + Mutex mu_; + std::map map_; +}; + +CircuitBreakerCallCounterMap* g_call_counter_map = nullptr; + +RefCountedPtr +CircuitBreakerCallCounterMap::GetOrCreate(const std::string& cluster, + const std::string& eds_service_name) { + Key key(cluster, eds_service_name); + RefCountedPtr result; + MutexLock lock(&mu_); + auto it = map_.find(key); + if (it == map_.end()) { + it = map_.insert({key, nullptr}).first; + } else { + result = it->second->RefIfNonZero(); + } + if (result == nullptr) { + result = MakeRefCounted(std::move(key)); + it->second = result.get(); + } + return result; +} + +CircuitBreakerCallCounterMap::CallCounter::~CallCounter() { + MutexLock lock(&g_call_counter_map->mu_); + auto it = g_call_counter_map->map_.find(key_); + if (it != g_call_counter_map->map_.end() && it->second == this) { + g_call_counter_map->map_.erase(it); + } +} + +// +// LB policy +// + constexpr char kXdsClusterImpl[] = "xds_cluster_impl_experimental"; // TODO (donnadionne): Check to see if circuit breaking is enabled, this will be @@ -138,13 +202,13 @@ class XdsClusterImplLb : public LoadBalancingPolicy { // A picker that wraps the picker from the child to perform drops. class Picker : public SubchannelPicker { public: - Picker(RefCountedPtr xds_cluster_impl_lb, + Picker(XdsClusterImplLb* xds_cluster_impl_lb, RefCountedPtr picker); PickResult Pick(PickArgs args) override; private: - RefCountedPtr xds_cluster_impl_lb_; + RefCountedPtr call_counter_; bool xds_circuit_breaking_enabled_; uint32_t max_concurrent_requests_; RefCountedPtr drop_config_; @@ -187,8 +251,8 @@ class XdsClusterImplLb : public LoadBalancingPolicy { // Current config from the resolver. RefCountedPtr config_; - // Current concurrent number of requests; - Atomic concurrent_requests_{0}; + // Current concurrent number of requests. + RefCountedPtr call_counter_; // Internal state. bool shutting_down_ = false; @@ -211,19 +275,18 @@ class XdsClusterImplLb : public LoadBalancingPolicy { // XdsClusterImplLb::Picker // -XdsClusterImplLb::Picker::Picker( - RefCountedPtr xds_cluster_impl_lb, - RefCountedPtr picker) - : xds_cluster_impl_lb_(std::move(xds_cluster_impl_lb)), +XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb, + RefCountedPtr picker) + : call_counter_(xds_cluster_impl_lb->call_counter_), xds_circuit_breaking_enabled_(XdsCircuitBreakingEnabled()), max_concurrent_requests_( - xds_cluster_impl_lb_->config_->max_concurrent_requests()), - drop_config_(xds_cluster_impl_lb_->config_->drop_config()), - drop_stats_(xds_cluster_impl_lb_->drop_stats_), + xds_cluster_impl_lb->config_->max_concurrent_requests()), + drop_config_(xds_cluster_impl_lb->config_->drop_config()), + drop_stats_(xds_cluster_impl_lb->drop_stats_), picker_(std::move(picker)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) { gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] constructed new picker %p", - xds_cluster_impl_lb_.get(), this); + xds_cluster_impl_lb, this); } } @@ -238,11 +301,11 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick( return result; } // Handle circuit breaking. - uint32_t current = xds_cluster_impl_lb_->concurrent_requests_.FetchAdd(1); + uint32_t current = call_counter_->Increment(); if (xds_circuit_breaking_enabled_) { // Check and see if we exceeded the max concurrent requests count. if (current >= max_concurrent_requests_) { - xds_cluster_impl_lb_->concurrent_requests_.FetchSub(1); + call_counter_->Decrement(); if (drop_stats_ != nullptr) drop_stats_->AddUncategorizedDrops(); PickResult result; result.type = PickResult::PICK_COMPLETE; @@ -257,7 +320,7 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick( GRPC_ERROR_CREATE_FROM_STATIC_STRING( "xds_cluster_impl picker not given any child picker"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL); - xds_cluster_impl_lb_->concurrent_requests_.FetchSub(1); + call_counter_->Decrement(); return result; } // Not dropping, so delegate to child picker. @@ -275,17 +338,15 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick( result.subchannel = subchannel_wrapper->wrapped_subchannel(); } // Intercept the recv_trailing_metadata op to record call completion. - XdsClusterImplLb* xds_cluster_impl_lb = static_cast( - xds_cluster_impl_lb_->Ref(DEBUG_LOCATION, "DropPickPicker+call") - .release()); + auto* call_counter = call_counter_->Ref(DEBUG_LOCATION, "call").release(); auto original_recv_trailing_metadata_ready = result.recv_trailing_metadata_ready; result.recv_trailing_metadata_ready = // Note: This callback does not run in either the control plane // work serializer or in the data plane mutex. - [locality_stats, original_recv_trailing_metadata_ready, - xds_cluster_impl_lb](grpc_error* error, MetadataInterface* metadata, - CallState* call_state) { + [locality_stats, original_recv_trailing_metadata_ready, call_counter]( + grpc_error* error, MetadataInterface* metadata, + CallState* call_state) { // Record call completion for load reporting. if (locality_stats != nullptr) { const bool call_failed = error != GRPC_ERROR_NONE; @@ -293,8 +354,8 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick( locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call"); } // Decrement number of calls in flight. - xds_cluster_impl_lb->concurrent_requests_.FetchSub(1); - xds_cluster_impl_lb->Unref(DEBUG_LOCATION, "DropPickPicker+call"); + call_counter->Decrement(); + call_counter->Unref(DEBUG_LOCATION, "call"); // Invoke the original recv_trailing_metadata_ready callback, if any. if (original_recv_trailing_metadata_ready != nullptr) { original_recv_trailing_metadata_ready(error, metadata, call_state); @@ -305,7 +366,7 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick( // where a pick fails. This is challenging, because we don't know which // picks are for wait_for_ready RPCs or how many times we'll return a // failure for the same wait_for_ready RPC. - xds_cluster_impl_lb_->concurrent_requests_.FetchSub(1); + call_counter_->Decrement(); } return result; } @@ -375,6 +436,8 @@ void XdsClusterImplLb::UpdateLocked(UpdateArgs args) { config_->lrs_load_reporting_server_name().value(), config_->cluster_name(), config_->eds_service_name()); } + call_counter_ = g_call_counter_map->GetOrCreate( + config_->cluster_name(), config_->eds_service_name()); } else { // Cluster name, EDS service name, and LRS server name should never // change, because the EDS policy above us should be swapped out if @@ -398,8 +461,7 @@ void XdsClusterImplLb::MaybeUpdatePickerLocked() { // If we're dropping all calls, report READY, regardless of what (or // whether) the child has reported. if (config_->drop_config() != nullptr && config_->drop_config()->drop_all()) { - auto drop_picker = - absl::make_unique(Ref(DEBUG_LOCATION, "Picker"), picker_); + auto drop_picker = absl::make_unique(this, picker_); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) { gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] updating connectivity (drop all): " @@ -413,8 +475,7 @@ void XdsClusterImplLb::MaybeUpdatePickerLocked() { } // Otherwise, update only if we have a child picker. if (picker_ != nullptr) { - auto drop_picker = - absl::make_unique(Ref(DEBUG_LOCATION, "Picker"), picker_); + auto drop_picker = absl::make_unique(this, picker_); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) { gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] updating connectivity: state=%s " @@ -737,9 +798,12 @@ class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory { // void grpc_lb_policy_xds_cluster_impl_init() { + grpc_core::g_call_counter_map = new grpc_core::CircuitBreakerCallCounterMap(); grpc_core::LoadBalancingPolicyRegistry::Builder:: RegisterLoadBalancingPolicyFactory( absl::make_unique()); } -void grpc_lb_policy_xds_cluster_impl_shutdown() {} +void grpc_lb_policy_xds_cluster_impl_shutdown() { + delete grpc_core::g_call_counter_map; +} diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index f0b4b995e43..7e02a99e461 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -1503,7 +1503,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam { } std::shared_ptr CreateChannel( - int failover_timeout = 0, const char* server_name = kServerName) { + int failover_timeout = 0, const char* server_name = kServerName, + grpc_core::FakeResolverResponseGenerator* response_generator = nullptr) { ChannelArguments args; if (failover_timeout > 0) { args.SetInt(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS, failover_timeout); @@ -1511,8 +1512,11 @@ class XdsEnd2endTest : public ::testing::TestWithParam { // If the parent channel is using the fake resolver, we inject the // response generator here. if (!GetParam().use_xds_resolver()) { + if (response_generator == nullptr) { + response_generator = response_generator_.get(); + } args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, - response_generator_.get()); + response_generator); } std::string uri = absl::StrCat( GetParam().use_xds_resolver() ? "xds" : "fake", ":///", server_name); @@ -1697,7 +1701,9 @@ class XdsEnd2endTest : public ::testing::TestWithParam { return addresses; } - void SetNextResolution(const std::vector& ports) { + void SetNextResolution( + const std::vector& ports, + grpc_core::FakeResolverResponseGenerator* response_generator = nullptr) { if (GetParam().use_xds_resolver()) return; // Not used with xds resolver. grpc_core::ExecCtx exec_ctx; grpc_core::Resolver::Result result; @@ -1711,7 +1717,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam { grpc_core::ServiceConfig::Create(nullptr, service_config_json, &error); ASSERT_EQ(error, GRPC_ERROR_NONE) << grpc_error_string(error); ASSERT_NE(result.service_config.get(), nullptr); - response_generator_->SetResponse(std::move(result)); + if (response_generator == nullptr) { + response_generator = response_generator_.get(); + } + response_generator->SetResponse(std::move(result)); } void SetNextResolutionForLbChannelAllBalancers( @@ -1992,6 +2001,28 @@ class XdsEnd2endTest : public ::testing::TestWithParam { std::shared_ptr lrs_service_; }; + class LongRunningRpc { + public: + void StartRpc(grpc::testing::EchoTestService::Stub* stub) { + sender_thread_ = std::thread([this, stub]() { + EchoResponse response; + EchoRequest request; + request.mutable_param()->set_client_cancel_after_us(1 * 1000 * 1000); + request.set_message(kRequestMessage); + stub->Echo(&context_, request, &response); + }); + } + + void CancelRpc() { + context_.TryCancel(); + sender_thread_.join(); + } + + private: + std::thread sender_thread_; + ClientContext context_; + }; + const size_t num_backends_; const size_t num_balancers_; const int client_load_reporting_interval_seconds_; @@ -2370,31 +2401,6 @@ TEST_P(XdsResolverOnlyTest, DefaultRouteSpecifiesSlashPrefix) { } TEST_P(XdsResolverOnlyTest, CircuitBreaking) { - class TestRpc { - public: - TestRpc() {} - - void StartRpc(grpc::testing::EchoTestService::Stub* stub) { - sender_thread_ = std::thread([this, stub]() { - EchoResponse response; - EchoRequest request; - request.mutable_param()->set_client_cancel_after_us(1 * 1000 * 1000); - request.set_message(kRequestMessage); - status_ = stub->Echo(&context_, request, &response); - }); - } - - void CancelRpc() { - context_.TryCancel(); - sender_thread_.join(); - } - - private: - std::thread sender_thread_; - ClientContext context_; - Status status_; - }; - gpr_setenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING", "true"); constexpr size_t kMaxConcurrentRequests = 10; SetNextResolution({}); @@ -2413,7 +2419,7 @@ TEST_P(XdsResolverOnlyTest, CircuitBreaking) { threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests); balancers_[0]->ads_service()->SetCdsResource(cluster); // Send exactly max_concurrent_requests long RPCs. - TestRpc rpcs[kMaxConcurrentRequests]; + LongRunningRpc rpcs[kMaxConcurrentRequests]; for (size_t i = 0; i < kMaxConcurrentRequests; ++i) { rpcs[i].StartRpc(stub_.get()); } @@ -2441,32 +2447,64 @@ TEST_P(XdsResolverOnlyTest, CircuitBreaking) { gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"); } -TEST_P(XdsResolverOnlyTest, CircuitBreakingDisabled) { - class TestRpc { - public: - TestRpc() {} - - void StartRpc(grpc::testing::EchoTestService::Stub* stub) { - sender_thread_ = std::thread([this, stub]() { - EchoResponse response; - EchoRequest request; - request.mutable_param()->set_client_cancel_after_us(1 * 1000 * 1000); - request.set_message(kRequestMessage); - status_ = stub->Echo(&context_, request, &response); - }); - } - - void CancelRpc() { - context_.TryCancel(); - sender_thread_.join(); - } - - private: - std::thread sender_thread_; - ClientContext context_; - Status status_; - }; +TEST_P(XdsResolverOnlyTest, CircuitBreakingMultipleChannelsShareCallCounter) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING", "true"); + constexpr size_t kMaxConcurrentRequests = 10; + // Populate new EDS resources. + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", GetBackendPorts(0, 1)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args)); + // Update CDS resource to set max concurrent request. + CircuitBreakers circuit_breaks; + Cluster cluster = balancers_[0]->ads_service()->default_cluster(); + auto* threshold = cluster.mutable_circuit_breakers()->add_thresholds(); + threshold->set_priority(RoutingPriority::DEFAULT); + threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests); + balancers_[0]->ads_service()->SetCdsResource(cluster); + // Create second channel. + auto response_generator2 = + grpc_core::MakeRefCounted(); + auto channel2 = CreateChannel( + /*failover_timeout=*/0, /*server_name=*/kServerName, + response_generator2.get()); + auto stub2 = grpc::testing::EchoTestService::NewStub(channel2); + // Set resolution results for both channels and for the xDS channel. + SetNextResolution({}); + SetNextResolution({}, response_generator2.get()); + SetNextResolutionForLbChannelAllBalancers(); + // Send exactly max_concurrent_requests long RPCs, alternating between + // the two channels. + LongRunningRpc rpcs[kMaxConcurrentRequests]; + for (size_t i = 0; i < kMaxConcurrentRequests; ++i) { + rpcs[i].StartRpc(i % 2 == 0 ? stub_.get() : stub2.get()); + } + // Wait for all RPCs to be in flight. + while (backends_[0]->backend_service()->RpcsWaitingForClientCancel() < + kMaxConcurrentRequests) { + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1 * 1000, GPR_TIMESPAN))); + } + // Sending a RPC now should fail, the error message should tell us + // we hit the max concurrent requests limit and got dropped. + Status status = SendRpc(); + EXPECT_FALSE(status.ok()); + EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); + // Cancel one RPC to allow another one through + rpcs[0].CancelRpc(); + status = SendRpc(); + EXPECT_TRUE(status.ok()); + for (size_t i = 1; i < kMaxConcurrentRequests; ++i) { + rpcs[i].CancelRpc(); + } + // Make sure RPCs go to the correct backend: + EXPECT_EQ(kMaxConcurrentRequests + 1, + backends_[0]->backend_service()->request_count()); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"); +} +TEST_P(XdsResolverOnlyTest, CircuitBreakingDisabled) { constexpr size_t kMaxConcurrentRequests = 10; SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); @@ -2484,7 +2522,7 @@ TEST_P(XdsResolverOnlyTest, CircuitBreakingDisabled) { threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests); balancers_[0]->ads_service()->SetCdsResource(cluster); // Send exactly max_concurrent_requests long RPCs. - TestRpc rpcs[kMaxConcurrentRequests]; + LongRunningRpc rpcs[kMaxConcurrentRequests]; for (size_t i = 0; i < kMaxConcurrentRequests; ++i) { rpcs[i].StartRpc(stub_.get()); }