From e7d7b7de86ec9f89c54b4a6a6bd2d1e4074d6669 Mon Sep 17 00:00:00 2001 From: Donna Dionne Date: Tue, 29 Sep 2020 14:47:39 -0700 Subject: [PATCH] xDS circuit breaking support --- .../client_channel/lb_policy/xds/cds.cc | 7 +- .../client_channel/lb_policy/xds/eds.cc | 100 +++++++++++++++--- src/core/ext/xds/xds_api.cc | 27 +++++ src/core/ext/xds/xds_api.h | 6 +- src/proto/grpc/testing/xds/BUILD | 1 + src/proto/grpc/testing/xds/cds_for_test.proto | 17 +++ src/proto/grpc/testing/xds/v3/BUILD | 1 + src/proto/grpc/testing/xds/v3/cluster.proto | 17 +++ test/cpp/end2end/test_service_impl.h | 10 ++ test/cpp/end2end/xds_end2end_test.cc | 73 +++++++++++++ 10 files changed, 239 insertions(+), 20 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index 796d2ff90dc..da048b553d1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -314,15 +314,18 @@ void CdsLb::OnClusterChanged(XdsApi::CdsUpdate cluster_data) { if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] received CDS update from xds client %p: " - "eds_service_name=%s lrs_load_reporting_server_name=%s", + "eds_service_name=%s lrs_load_reporting_server_name=%s " + "max_concurrent_requests=%d", this, xds_client_.get(), cluster_data.eds_service_name.c_str(), cluster_data.lrs_load_reporting_server_name.has_value() ? cluster_data.lrs_load_reporting_server_name.value().c_str() - : "(unset)"); + : "(unset)", + cluster_data.max_concurrent_requests); } // Construct config for child policy. Json::Object child_config = { {"clusterName", config_->cluster()}, + {"max_concurrent_requests", cluster_data.max_concurrent_requests}, {"localityPickingPolicy", Json::Array{ Json::Object{ diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc index 00db1b62d6b..1d3306c820d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/eds.cc @@ -36,6 +36,7 @@ #include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client_stats.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/timer.h" @@ -58,13 +59,15 @@ class EdsLbConfig : public LoadBalancingPolicy::Config { public: EdsLbConfig(std::string cluster_name, std::string eds_service_name, absl::optional lrs_load_reporting_server_name, - Json locality_picking_policy, Json endpoint_picking_policy) + Json locality_picking_policy, Json endpoint_picking_policy, + uint32_t max_concurrent_requests) : cluster_name_(std::move(cluster_name)), eds_service_name_(std::move(eds_service_name)), lrs_load_reporting_server_name_( std::move(lrs_load_reporting_server_name)), locality_picking_policy_(std::move(locality_picking_policy)), - endpoint_picking_policy_(std::move(endpoint_picking_policy)) {} + endpoint_picking_policy_(std::move(endpoint_picking_policy)), + max_concurrent_requests_(max_concurrent_requests) {} const char* name() const override { return kEds; } @@ -79,6 +82,9 @@ class EdsLbConfig : public LoadBalancingPolicy::Config { const Json& endpoint_picking_policy() const { return endpoint_picking_policy_; } + const uint32_t max_concurrent_requests() const { + return max_concurrent_requests_; + } private: std::string cluster_name_; @@ -86,6 +92,7 @@ class EdsLbConfig : public LoadBalancingPolicy::Config { absl::optional lrs_load_reporting_server_name_; Json locality_picking_policy_; Json endpoint_picking_policy_; + uint32_t max_concurrent_requests_; }; // EDS LB policy. @@ -145,14 +152,16 @@ class EdsLb : public LoadBalancingPolicy { // A picker that handles drops. class DropPicker : public SubchannelPicker { public: - explicit DropPicker(EdsLb* eds_policy); + explicit DropPicker(RefCountedPtr eds_policy); PickResult Pick(PickArgs args) override; private: + RefCountedPtr eds_policy_; RefCountedPtr drop_config_; RefCountedPtr drop_stats_; RefCountedPtr child_picker_; + uint32_t max_concurrent_requests_; }; class Helper : public ChannelControlHelper { @@ -236,6 +245,8 @@ class EdsLb : public LoadBalancingPolicy { RefCountedPtr drop_config_; RefCountedPtr drop_stats_; + // Current concurrent number of requests; + Atomic concurrent_requests_{0}; OrphanablePtr child_policy_; @@ -249,13 +260,16 @@ class EdsLb : public LoadBalancingPolicy { // EdsLb::DropPicker // -EdsLb::DropPicker::DropPicker(EdsLb* eds_policy) - : drop_config_(eds_policy->drop_config_), - drop_stats_(eds_policy->drop_stats_), - child_picker_(eds_policy->child_picker_) { +EdsLb::DropPicker::DropPicker(RefCountedPtr eds_policy) + : eds_policy_(std::move(eds_policy)), + drop_config_(eds_policy_->drop_config_), + drop_stats_(eds_policy_->drop_stats_), + child_picker_(eds_policy_->child_picker_), + max_concurrent_requests_( + eds_policy_->config_->max_concurrent_requests()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) { - gpr_log(GPR_INFO, "[edslb %p] constructed new drop picker %p", eds_policy, - this); + gpr_log(GPR_INFO, "[edslb %p] constructed new drop picker %p", + eds_policy_.get(), this); } } @@ -268,6 +282,17 @@ EdsLb::PickResult EdsLb::DropPicker::Pick(PickArgs args) { result.type = PickResult::PICK_COMPLETE; return result; } + // Check and see if we exceeded the max concurrent requests count. + uint32_t current = eds_policy_->concurrent_requests_.FetchAdd(1); + if (current >= max_concurrent_requests_) { + eds_policy_->concurrent_requests_.FetchSub(1); + if (drop_stats_ != nullptr) { + drop_stats_->AddUncategorizedDrops(); + } + PickResult result; + result.type = PickResult::PICK_COMPLETE; + return result; + } // If we're not dropping all calls, we should always have a child picker. if (child_picker_ == nullptr) { // Should never happen. PickResult result; @@ -276,10 +301,30 @@ EdsLb::PickResult EdsLb::DropPicker::Pick(PickArgs args) { grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "eds drop picker not given any child picker"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL); + eds_policy_->concurrent_requests_.FetchSub(1); return result; } // Not dropping, so delegate to child's picker. - return child_picker_->Pick(args); + PickResult result = child_picker_->Pick(args); + if (result.type == PickResult::PICK_COMPLETE) { + EdsLb* eds_policy = static_cast( + eds_policy_->Ref(DEBUG_LOCATION, "DropPickPicker+call").release()); + auto original_recv_trailing_metadata_ready = + result.recv_trailing_metadata_ready; + result.recv_trailing_metadata_ready = + [original_recv_trailing_metadata_ready, eds_policy]( + grpc_error* error, MetadataInterface* metadata, + CallState* call_state) { + if (original_recv_trailing_metadata_ready != nullptr) { + original_recv_trailing_metadata_ready(error, metadata, call_state); + } + eds_policy->concurrent_requests_.FetchSub(1); + eds_policy->Unref(DEBUG_LOCATION, "DropPickPicker+call"); + }; + } else { + eds_policy_->concurrent_requests_.FetchSub(1); + } + return result; } // @@ -469,9 +514,14 @@ void EdsLb::UpdateLocked(UpdateArgs args) { grpc_channel_args_destroy(args_); args_ = args.args; args.args = nullptr; + const bool lrs_server_changed = + is_initial_update || config_->lrs_load_reporting_server_name() != + old_config->lrs_load_reporting_server_name(); + const bool max_concurrent_requests_changed = + is_initial_update || config_->max_concurrent_requests() != + old_config->max_concurrent_requests(); // Update drop stats for load reporting if needed. - if (is_initial_update || config_->lrs_load_reporting_server_name() != - old_config->lrs_load_reporting_server_name()) { + if (lrs_server_changed) { drop_stats_.reset(); if (config_->lrs_load_reporting_server_name().has_value()) { const auto key = GetLrsClusterKey(); @@ -479,6 +529,8 @@ void EdsLb::UpdateLocked(UpdateArgs args) { config_->lrs_load_reporting_server_name().value(), key.first /*cluster_name*/, key.second /*eds_service_name*/); } + } + if (lrs_server_changed || max_concurrent_requests_changed) { MaybeUpdateDropPickerLocked(); } // Update child policy if needed. @@ -815,14 +867,16 @@ void EdsLb::MaybeUpdateDropPickerLocked() { // If we're dropping all calls, report READY, regardless of what (or // whether) the child has reported. if (drop_config_ != nullptr && drop_config_->drop_all()) { - channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(), - absl::make_unique(this)); + channel_control_helper()->UpdateState( + GRPC_CHANNEL_READY, absl::Status(), + absl::make_unique(Ref(DEBUG_LOCATION, "DropPicker"))); return; } // Update only if we have a child picker. if (child_picker_ != nullptr) { - channel_control_helper()->UpdateState(child_state_, child_status_, - absl::make_unique(this)); + channel_control_helper()->UpdateState( + child_state_, child_status_, + absl::make_unique(Ref(DEBUG_LOCATION, "DropPicker"))); } } @@ -938,13 +992,25 @@ class EdsLbFactory : public LoadBalancingPolicyFactory { "endpointPickingPolicy", &parse_error, 1)); GRPC_ERROR_UNREF(parse_error); } + // Max concurrent requests. + uint32_t max_concurrent_requests = 1024; + it = json.object_value().find("max_concurrent_requests"); + if (it != json.object_value().end()) { + if (it->second.type() != Json::Type::NUMBER) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:max_concurrent_requests error:must be of type number")); + } else { + max_concurrent_requests = + gpr_parse_nonnegative_int(it->second.string_value().c_str()); + } + } // Construct config. if (error_list.empty()) { return MakeRefCounted( std::move(cluster_name), std::move(eds_service_name), std::move(lrs_load_reporting_server_name), std::move(locality_picking_policy), - std::move(endpoint_picking_policy)); + std::move(endpoint_picking_policy), max_concurrent_requests); } else { *error = GRPC_ERROR_CREATE_FROM_VECTOR( "eds_experimental LB policy config", &error_list); diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc index 49dce583b53..2aa84fb61bd 100644 --- a/src/core/ext/xds/xds_api.cc +++ b/src/core/ext/xds/xds_api.cc @@ -42,6 +42,7 @@ #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "envoy/config/cluster/v3/circuit_breaker.upb.h" #include "envoy/config/cluster/v3/cluster.upb.h" #include "envoy/config/core/v3/address.upb.h" #include "envoy/config/core/v3/base.upb.h" @@ -1838,6 +1839,32 @@ grpc_error* CdsResponseParse( } cds_update.lrs_load_reporting_server_name.emplace(""); } + // The Cluster resource encodes the circuit breaking parameters in a list of + // Thresholds messages, where each message specifies the parameters for a + // particular RoutingPriority. we will look only at the first entry in the + // list for priority DEFAULT and default to 1024 if not found. + if (envoy_config_cluster_v3_Cluster_has_circuit_breakers(cluster)) { + const envoy_config_cluster_v3_CircuitBreakers* circuit_breakers = + envoy_config_cluster_v3_Cluster_circuit_breakers(cluster); + size_t num_thresholds; + const envoy_config_cluster_v3_CircuitBreakers_Thresholds* const* + thresholds = envoy_config_cluster_v3_CircuitBreakers_thresholds( + circuit_breakers, &num_thresholds); + for (size_t i = 0; i < num_thresholds; ++i) { + const auto* threshold = thresholds[i]; + if (envoy_config_cluster_v3_CircuitBreakers_Thresholds_priority( + threshold) == envoy_config_core_v3_DEFAULT) { + const google_protobuf_UInt32Value* max_requests = + envoy_config_cluster_v3_CircuitBreakers_Thresholds_max_requests( + threshold); + if (max_requests != nullptr) { + cds_update.max_concurrent_requests = + google_protobuf_UInt32Value_value(max_requests); + } + break; + } + } + } } return GRPC_ERROR_NONE; } diff --git a/src/core/ext/xds/xds_api.h b/src/core/ext/xds/xds_api.h index 87cfa29a617..97459ba1659 100644 --- a/src/core/ext/xds/xds_api.h +++ b/src/core/ext/xds/xds_api.h @@ -178,11 +178,15 @@ class XdsApi { // If set to the empty string, will use the same server we obtained the CDS // data from. absl::optional lrs_load_reporting_server_name; + // Maximum number of outstanding requests can be made to the upstream + // cluster. + uint32_t max_concurrent_requests = 1024; bool operator==(const CdsUpdate& other) const { return eds_service_name == other.eds_service_name && lrs_load_reporting_server_name == - other.lrs_load_reporting_server_name; + other.lrs_load_reporting_server_name && + max_concurrent_requests == other.max_concurrent_requests; } }; diff --git a/src/proto/grpc/testing/xds/BUILD b/src/proto/grpc/testing/xds/BUILD index f6a884d6d4b..5dd57959314 100644 --- a/src/proto/grpc/testing/xds/BUILD +++ b/src/proto/grpc/testing/xds/BUILD @@ -35,6 +35,7 @@ grpc_proto_library( srcs = [ "cds_for_test.proto", ], + well_known_protos = True, ) grpc_proto_library( diff --git a/src/proto/grpc/testing/xds/cds_for_test.proto b/src/proto/grpc/testing/xds/cds_for_test.proto index a4ee4b3b7fe..8ea198d0ee7 100644 --- a/src/proto/grpc/testing/xds/cds_for_test.proto +++ b/src/proto/grpc/testing/xds/cds_for_test.proto @@ -27,6 +27,8 @@ syntax = "proto3"; package envoy.api.v2; +import "google/protobuf/wrappers.proto"; + // Aggregated Discovery Service (ADS) options. This is currently empty, but when // set in :ref:`ConfigSource ` can be used to // specify that ADS is to be used. @@ -57,6 +59,19 @@ message ConfigSource { } } +enum RoutingPriority { + DEFAULT = 0; + HIGH = 1; +} + +message CircuitBreakers { + message Thresholds { + RoutingPriority priority = 1; + google.protobuf.UInt32Value max_requests = 4; + } + repeated Thresholds thresholds = 1; +} + message Cluster { // Refer to :ref:`service discovery type ` // for an explanation on each type. @@ -153,5 +168,7 @@ message Cluster { // Configuration to use for EDS updates for the Cluster. EdsClusterConfig eds_cluster_config = 3; + CircuitBreakers circuit_breakers = 10; + ConfigSource lrs_server = 42; } diff --git a/src/proto/grpc/testing/xds/v3/BUILD b/src/proto/grpc/testing/xds/v3/BUILD index aa2141e7eb6..ea828230637 100644 --- a/src/proto/grpc/testing/xds/v3/BUILD +++ b/src/proto/grpc/testing/xds/v3/BUILD @@ -81,6 +81,7 @@ grpc_proto_library( srcs = [ "cluster.proto", ], + well_known_protos = True, deps = [ "config_source_proto", ], diff --git a/src/proto/grpc/testing/xds/v3/cluster.proto b/src/proto/grpc/testing/xds/v3/cluster.proto index c4f410703df..c20d887bc21 100644 --- a/src/proto/grpc/testing/xds/v3/cluster.proto +++ b/src/proto/grpc/testing/xds/v3/cluster.proto @@ -20,6 +20,21 @@ package envoy.config.cluster.v3; import "src/proto/grpc/testing/xds/v3/config_source.proto"; +import "google/protobuf/wrappers.proto"; + +enum RoutingPriority { + DEFAULT = 0; + HIGH = 1; +} + +message CircuitBreakers { + message Thresholds { + RoutingPriority priority = 1; + google.protobuf.UInt32Value max_requests = 4; + } + repeated Thresholds thresholds = 1; +} + // [#protodoc-title: Cluster configuration] // Configuration for a single upstream cluster. @@ -127,6 +142,8 @@ message Cluster { // when picking a host in the cluster. LbPolicy lb_policy = 6; + CircuitBreakers circuit_breakers = 10; + // [#not-implemented-hide:] // If present, tells the client where to send load reports via LRS. If not present, the // client will fall back to a client-side default, which may be either (a) don't send any diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index 55b9fbb54dd..31432d96b59 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -167,6 +167,7 @@ class TestMultipleServiceImpl : public RpcService { { std::unique_lock lock(mu_); signal_client_ = true; + ++rpcs_waiting_for_client_cancel_; } while (!context->IsCancelled()) { gpr_sleep_until(gpr_time_add( @@ -174,6 +175,10 @@ class TestMultipleServiceImpl : public RpcService { gpr_time_from_micros(request->param().client_cancel_after_us(), GPR_TIMESPAN))); } + { + std::unique_lock lock(mu_); + --rpcs_waiting_for_client_cancel_; + } return Status::CANCELLED; } else if (request->has_param() && request->param().server_cancel_after_us()) { @@ -425,12 +430,17 @@ class TestMultipleServiceImpl : public RpcService { } void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); } void SignalServerToContinue() { signaller_.SignalServerToContinue(); } + uint64_t RpcsWaitingForClientCancel() { + std::unique_lock lock(mu_); + return rpcs_waiting_for_client_cancel_; + } private: bool signal_client_; std::mutex mu_; TestServiceSignaller signaller_; std::unique_ptr host_; + uint64_t rpcs_waiting_for_client_cancel_ = 0; }; class CallbackTestServiceImpl diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index d0f0f949212..d8f9033f6a6 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -86,7 +86,9 @@ namespace { using std::chrono::system_clock; +using ::envoy::config::cluster::v3::CircuitBreakers; using ::envoy::config::cluster::v3::Cluster; +using ::envoy::config::cluster::v3::RoutingPriority; using ::envoy::config::endpoint::v3::ClusterLoadAssignment; using ::envoy::config::endpoint::v3::HealthStatus; using ::envoy::config::listener::v3::Listener; @@ -2259,6 +2261,77 @@ TEST_P(XdsResolverOnlyTest, DefaultRouteSpecifiesSlashPrefix) { WaitForAllBackends(); } +TEST_P(XdsResolverOnlyTest, CircuitBreaking) { + class TestRpc { + public: + TestRpc() {} + + void StartRpc(grpc::testing::EchoTestService::Stub* stub) { + sender_thread_ = std::thread([this, stub]() { + EchoResponse response; + EchoRequest request; + request.mutable_param()->set_client_cancel_after_us(1 * 1000 * 1000); + request.set_message(kRequestMessage); + status_ = stub->Echo(&context_, request, &response); + }); + } + + void CancelRpc() { + context_.TryCancel(); + sender_thread_.join(); + } + + private: + std::thread sender_thread_; + ClientContext context_; + Status status_; + }; + + const char* kNewClusterName = "new_cluster"; + constexpr size_t kMaxConcurrentRequests = 10; + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + // Populate new EDS resources. + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", GetBackendPorts(0, 1)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + AdsServiceImpl::BuildEdsResource(args)); + // Update CDS resource to set max concurrent request. + CircuitBreakers circuit_breaks; + Cluster cluster = balancers_[0]->ads_service()->default_cluster(); + auto* threshold = cluster.mutable_circuit_breakers()->add_thresholds(); + threshold->set_priority(RoutingPriority::DEFAULT); + threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests); + balancers_[0]->ads_service()->SetCdsResource(cluster); + // Send exactly max_concurrent_requests long RPCs. + TestRpc rpcs[kMaxConcurrentRequests]; + for (size_t i = 0; i < kMaxConcurrentRequests; ++i) { + rpcs[i].StartRpc(stub_.get()); + } + // Wait for all RPCs to be in flight. + while (backends_[0]->backend_service()->RpcsWaitingForClientCancel() < + kMaxConcurrentRequests) { + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1 * 1000, GPR_TIMESPAN))); + } + // Sending a RPC now should fail, the error message should tell us + // we hit the max concurrent requests limit and got dropped. + Status status = SendRpc(); + EXPECT_FALSE(status.ok()); + EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); + // Cancel one RPC to allow another one through + rpcs[0].CancelRpc(); + status = SendRpc(); + EXPECT_TRUE(status.ok()); + for (size_t i = 1; i < kMaxConcurrentRequests; ++i) { + rpcs[i].CancelRpc(); + } + // Make sure RPCs go to the correct backend: + EXPECT_EQ(kMaxConcurrentRequests + 1, + backends_[0]->backend_service()->request_count()); +} + TEST_P(XdsResolverOnlyTest, MultipleChannelsShareXdsClient) { const char* kNewServerName = "new-server.example.com"; Listener listener = balancers_[0]->ads_service()->default_listener();