Merge pull request #24396 from donnadionne/protect_cb

Protect circuit breaking functionality with an environment variable
pull/24382/head
donnadionne 4 years ago committed by GitHub
commit e99d6008ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 32
      src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
  2. 66
      test/cpp/end2end/xds_end2end_test.cc

@ -36,6 +36,7 @@
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -56,6 +57,17 @@ constexpr char kEds[] = "eds_experimental";
const char* kXdsLocalityNameAttributeKey = "xds_locality_name";
// TODO (donnadionne): Check to see if circuit breaking is enabled, this will be
// removed once circuit breaking feature is fully integrated and enabled by
// default.
bool XdsCircuitBreakingEnabled() {
char* value = gpr_getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING");
bool parsed_value;
bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value);
gpr_free(value);
return parse_succeeded && parsed_value;
}
// Config for EDS LB policy.
class EdsLbConfig : public LoadBalancingPolicy::Config {
public:
@ -206,6 +218,7 @@ class EdsLb : public LoadBalancingPolicy {
RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_;
RefCountedPtr<XdsClusterDropStats> drop_stats_;
RefCountedPtr<ChildPickerWrapper> child_picker_;
bool xds_circuit_breaking_enabled_;
uint32_t max_concurrent_requests_;
};
@ -309,6 +322,7 @@ EdsLb::EdsPicker::EdsPicker(RefCountedPtr<EdsLb> eds_policy)
: eds_policy_(std::move(eds_policy)),
drop_stats_(eds_policy_->drop_stats_),
child_picker_(eds_policy_->child_picker_),
xds_circuit_breaking_enabled_(XdsCircuitBreakingEnabled()),
max_concurrent_requests_(
eds_policy_->config_->max_concurrent_requests()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
@ -318,16 +332,18 @@ EdsLb::EdsPicker::EdsPicker(RefCountedPtr<EdsLb> eds_policy)
}
EdsLb::PickResult EdsLb::EdsPicker::Pick(PickArgs args) {
// Check and see if we exceeded the max concurrent requests count.
uint32_t current = eds_policy_->concurrent_requests_.FetchAdd(1);
if (current >= max_concurrent_requests_) {
eds_policy_->concurrent_requests_.FetchSub(1);
if (drop_stats_ != nullptr) {
drop_stats_->AddUncategorizedDrops();
if (xds_circuit_breaking_enabled_) {
// Check and see if we exceeded the max concurrent requests count.
if (current >= max_concurrent_requests_) {
eds_policy_->concurrent_requests_.FetchSub(1);
if (drop_stats_ != nullptr) {
drop_stats_->AddUncategorizedDrops();
}
PickResult result;
result.type = PickResult::PICK_COMPLETE;
return result;
}
PickResult result;
result.type = PickResult::PICK_COMPLETE;
return result;
}
// If we're not dropping the call, we should always have a child picker.
if (child_picker_ == nullptr) { // Should never happen.

@ -2290,6 +2290,7 @@ TEST_P(XdsResolverOnlyTest, CircuitBreaking) {
Status status_;
};
gpr_setenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING", "true");
constexpr size_t kMaxConcurrentRequests = 10;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
@ -2332,6 +2333,71 @@ TEST_P(XdsResolverOnlyTest, CircuitBreaking) {
// Make sure RPCs go to the correct backend:
EXPECT_EQ(kMaxConcurrentRequests + 1,
backends_[0]->backend_service()->request_count());
gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING");
}
TEST_P(XdsResolverOnlyTest, CircuitBreakingDisabled) {
class TestRpc {
public:
TestRpc() {}
void StartRpc(grpc::testing::EchoTestService::Stub* stub) {
sender_thread_ = std::thread([this, stub]() {
EchoResponse response;
EchoRequest request;
request.mutable_param()->set_client_cancel_after_us(1 * 1000 * 1000);
request.set_message(kRequestMessage);
status_ = stub->Echo(&context_, request, &response);
});
}
void CancelRpc() {
context_.TryCancel();
sender_thread_.join();
}
private:
std::thread sender_thread_;
ClientContext context_;
Status status_;
};
constexpr size_t kMaxConcurrentRequests = 10;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts(0, 1)},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args));
// Update CDS resource to set max concurrent request.
CircuitBreakers circuit_breaks;
Cluster cluster = balancers_[0]->ads_service()->default_cluster();
auto* threshold = cluster.mutable_circuit_breakers()->add_thresholds();
threshold->set_priority(RoutingPriority::DEFAULT);
threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests);
balancers_[0]->ads_service()->SetCdsResource(cluster);
// Send exactly max_concurrent_requests long RPCs.
TestRpc rpcs[kMaxConcurrentRequests];
for (size_t i = 0; i < kMaxConcurrentRequests; ++i) {
rpcs[i].StartRpc(stub_.get());
}
// Wait for all RPCs to be in flight.
while (backends_[0]->backend_service()->RpcsWaitingForClientCancel() <
kMaxConcurrentRequests) {
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(1 * 1000, GPR_TIMESPAN)));
}
// Sending a RPC now should not fail as circuit breaking is disabled.
Status status = SendRpc();
EXPECT_TRUE(status.ok());
for (size_t i = 0; i < kMaxConcurrentRequests; ++i) {
rpcs[i].CancelRpc();
}
// Make sure RPCs go to the correct backend:
EXPECT_EQ(kMaxConcurrentRequests + 1,
backends_[0]->backend_service()->request_count());
}
TEST_P(XdsResolverOnlyTest, MultipleChannelsShareXdsClient) {

Loading…
Cancel
Save