diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc index 5fae55a93dc..6afdce4d5cc 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc @@ -36,6 +36,7 @@ #include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client_stats.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -56,6 +57,17 @@ constexpr char kEds[] = "eds_experimental"; const char* kXdsLocalityNameAttributeKey = "xds_locality_name"; +// TODO (donnadionne): Check to see if circuit breaking is enabled, this will be +// removed once circuit breaking feature is fully integrated and enabled by +// default. +bool XdsCircuitBreakingEnabled() { + char* value = gpr_getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"); + bool parsed_value; + bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value); + gpr_free(value); + return parse_succeeded && parsed_value; +} + // Config for EDS LB policy. class EdsLbConfig : public LoadBalancingPolicy::Config { public: @@ -206,6 +218,7 @@ class EdsLb : public LoadBalancingPolicy { RefCountedPtr drop_config_; RefCountedPtr drop_stats_; RefCountedPtr child_picker_; + bool xds_circuit_breaking_enabled_; uint32_t max_concurrent_requests_; }; @@ -309,6 +322,7 @@ EdsLb::EdsPicker::EdsPicker(RefCountedPtr eds_policy) : eds_policy_(std::move(eds_policy)), drop_stats_(eds_policy_->drop_stats_), child_picker_(eds_policy_->child_picker_), + xds_circuit_breaking_enabled_(XdsCircuitBreakingEnabled()), max_concurrent_requests_( eds_policy_->config_->max_concurrent_requests()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { @@ -318,16 +332,18 @@ EdsLb::EdsPicker::EdsPicker(RefCountedPtr eds_policy) } EdsLb::PickResult EdsLb::EdsPicker::Pick(PickArgs args) { - // Check and see if we exceeded the max concurrent requests count. uint32_t current = eds_policy_->concurrent_requests_.FetchAdd(1); - if (current >= max_concurrent_requests_) { - eds_policy_->concurrent_requests_.FetchSub(1); - if (drop_stats_ != nullptr) { - drop_stats_->AddUncategorizedDrops(); + if (xds_circuit_breaking_enabled_) { + // Check and see if we exceeded the max concurrent requests count. + if (current >= max_concurrent_requests_) { + eds_policy_->concurrent_requests_.FetchSub(1); + if (drop_stats_ != nullptr) { + drop_stats_->AddUncategorizedDrops(); + } + PickResult result; + result.type = PickResult::PICK_COMPLETE; + return result; } - PickResult result; - result.type = PickResult::PICK_COMPLETE; - return result; } // If we're not dropping the call, we should always have a child picker. if (child_picker_ == nullptr) { // Should never happen. diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 744d7aac3fd..97be5d6da16 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -2290,6 +2290,7 @@ TEST_P(XdsResolverOnlyTest, CircuitBreaking) { Status status_; }; + gpr_setenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING", "true"); constexpr size_t kMaxConcurrentRequests = 10; SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); @@ -2332,6 +2333,71 @@ TEST_P(XdsResolverOnlyTest, CircuitBreaking) { // Make sure RPCs go to the correct backend: EXPECT_EQ(kMaxConcurrentRequests + 1, backends_[0]->backend_service()->request_count()); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"); +} + +TEST_P(XdsResolverOnlyTest, CircuitBreakingDisabled) { + class TestRpc { + public: + TestRpc() {} + + void StartRpc(grpc::testing::EchoTestService::Stub* stub) { + sender_thread_ = std::thread([this, stub]() { + EchoResponse response; + EchoRequest request; + request.mutable_param()->set_client_cancel_after_us(1 * 1000 * 1000); + request.set_message(kRequestMessage); + status_ = stub->Echo(&context_, request, &response); + }); + } + + void CancelRpc() { + context_.TryCancel(); + sender_thread_.join(); + } + + private: + std::thread sender_thread_; + ClientContext context_; + Status status_; + }; + + constexpr size_t kMaxConcurrentRequests = 10; + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + // Populate new EDS resources. + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", GetBackendPorts(0, 1)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args)); + // Update CDS resource to set max concurrent request. + CircuitBreakers circuit_breaks; + Cluster cluster = balancers_[0]->ads_service()->default_cluster(); + auto* threshold = cluster.mutable_circuit_breakers()->add_thresholds(); + threshold->set_priority(RoutingPriority::DEFAULT); + threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests); + balancers_[0]->ads_service()->SetCdsResource(cluster); + // Send exactly max_concurrent_requests long RPCs. + TestRpc rpcs[kMaxConcurrentRequests]; + for (size_t i = 0; i < kMaxConcurrentRequests; ++i) { + rpcs[i].StartRpc(stub_.get()); + } + // Wait for all RPCs to be in flight. + while (backends_[0]->backend_service()->RpcsWaitingForClientCancel() < + kMaxConcurrentRequests) { + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1 * 1000, GPR_TIMESPAN))); + } + // Sending a RPC now should not fail as circuit breaking is disabled. + Status status = SendRpc(); + EXPECT_TRUE(status.ok()); + for (size_t i = 0; i < kMaxConcurrentRequests; ++i) { + rpcs[i].CancelRpc(); + } + // Make sure RPCs go to the correct backend: + EXPECT_EQ(kMaxConcurrentRequests + 1, + backends_[0]->backend_service()->request_count()); } TEST_P(XdsResolverOnlyTest, MultipleChannelsShareXdsClient) {