From 1291ece848880ed64c712c557dd1d03430beeb1c Mon Sep 17 00:00:00 2001 From: Juanli Shen Date: Fri, 9 Aug 2019 19:55:11 -0700 Subject: [PATCH] Add drop in xds policy --- .../client_channel/lb_policy/xds/xds.cc | 250 ++++++---- .../lb_policy/xds/xds_client_stats.cc | 2 +- .../lb_policy/xds/xds_client_stats.h | 2 +- .../lb_policy/xds/xds_load_balancer_api.cc | 68 +++ .../lb_policy/xds/xds_load_balancer_api.h | 44 +- test/cpp/end2end/xds_end2end_test.cc | 438 +++++++++++++++--- 6 files changed, 648 insertions(+), 156 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 279f480946f..4f119edefce 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -405,7 +405,10 @@ class XdsLb : public LoadBalancingPolicy { // previous value in the vector and is 0 for the first element. using PickerList = InlinedVector>, 1>; - explicit Picker(PickerList pickers) : pickers_(std::move(pickers)) {} + Picker(RefCountedPtr xds_policy, PickerList pickers) + : xds_policy_(std::move(xds_policy)), + pickers_(std::move(pickers)), + drop_config_(xds_policy_->drop_config_) {} PickResult Pick(PickArgs args) override; @@ -413,7 +416,9 @@ class XdsLb : public LoadBalancingPolicy { // Calls the picker of the locality that the key falls within. PickResult PickFromLocality(const uint32_t key, PickArgs args); + RefCountedPtr xds_policy_; PickerList pickers_; + RefCountedPtr drop_config_; }; class FallbackHelper : public ChannelControlHelper { @@ -458,6 +463,14 @@ class XdsLb : public LoadBalancingPolicy { void ResetBackoffLocked(); void Orphan() override; + grpc_connectivity_state connectivity_state() const { + return connectivity_state_; + } + uint32_t locality_weight() const { return locality_weight_; } + RefCountedPtr picker_wrapper() const { + return picker_wrapper_; + } + private: class Helper : public ChannelControlHelper { public: @@ -500,15 +513,19 @@ class XdsLb : public LoadBalancingPolicy { uint32_t locality_weight_; }; + explicit LocalityMap(XdsLb* xds_policy) : xds_policy_(xds_policy) {} + void UpdateLocked(const XdsLocalityList& locality_list, LoadBalancingPolicy::Config* child_policy_config, const grpc_channel_args* args, XdsLb* parent); + void UpdateXdsPickerLocked(); void ShutdownLocked(); void ResetBackoffLocked(); private: void PruneLocalities(const XdsLocalityList& locality_list); + XdsLb* xds_policy_; Map, OrphanablePtr, XdsLocalityName::Less> map_; @@ -594,6 +611,9 @@ class XdsLb : public LoadBalancingPolicy { // the current one when new localities in the pending map are ready // to accept connections + // The config for dropping calls. + RefCountedPtr drop_config_; + // The stats for client-side load reporting. XdsClientStats client_stats_; }; @@ -637,7 +657,14 @@ void XdsLb::PickerWrapper::RecordCallCompletion( // XdsLb::PickResult XdsLb::Picker::Pick(PickArgs args) { - // TODO(roth): Add support for drop handling. + // Handle drop. + const UniquePtr* drop_category; + if (drop_config_->ShouldDrop(&drop_category)) { + xds_policy_->client_stats_.AddCallDropped(*drop_category); + PickResult result; + result.type = PickResult::PICK_COMPLETE; + return result; + } // Generate a random number in [0, total weight). const uint32_t key = rand() % pickers_[pickers_.size() - 1].first; // Forward pick to whichever locality maps to the range in which the @@ -1092,12 +1119,12 @@ void XdsLb::LbChannelState::EdsCallState::OnResponseReceivedLocked( GRPC_ERROR_UNREF(parse_error); return; } - if (update.locality_list.empty()) { + if (update.locality_list.empty() && !update.drop_all) { char* response_slice_str = grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX); gpr_log(GPR_ERROR, "[xdslb %p] EDS response '%s' doesn't contain any valid locality " - "update. Ignoring.", + "but doesn't require to drop all calls. Ignoring.", xdslb_policy, response_slice_str); gpr_free(response_slice_str); return; @@ -1105,8 +1132,11 @@ void XdsLb::LbChannelState::EdsCallState::OnResponseReceivedLocked( eds_calld->seen_response_ = true; if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, - "[xdslb %p] EDS response with %" PRIuPTR " localities received", - xdslb_policy, update.locality_list.size()); + "[xdslb %p] EDS response with %" PRIuPTR + " localities and %" PRIuPTR + " drop categories received (drop_all=%d)", + xdslb_policy, update.locality_list.size(), + update.drop_config->drop_category_list().size(), update.drop_all); for (size_t i = 0; i < update.locality_list.size(); ++i) { const XdsLocalityInfo& locality = update.locality_list[i]; gpr_log(GPR_INFO, @@ -1127,8 +1157,17 @@ void XdsLb::LbChannelState::EdsCallState::OnResponseReceivedLocked( gpr_free(ipport); } } + for (size_t i = 0; i < update.drop_config->drop_category_list().size(); + ++i) { + const XdsDropConfig::DropCategory& drop_category = + update.drop_config->drop_category_list()[i]; + gpr_log(GPR_INFO, + "[xdslb %p] Drop category %s has drop rate %d per million", + xdslb_policy, drop_category.name.get(), + drop_category.parts_per_million); + } } - // Pending LB channel receives a serverlist; promote it. + // Pending LB channel receives a response; promote it. // Note that this call can't be on a discarded pending channel, because // such channels don't have any current call but we have checked this call // is a current call. @@ -1147,23 +1186,27 @@ void XdsLb::LbChannelState::EdsCallState::OnResponseReceivedLocked( // load reporting. LrsCallState* lrs_calld = lb_chand->lrs_calld_->lb_calld(); if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked(); - // Ignore identical update. + // If the balancer tells us to drop all the calls, we should exit fallback + // mode immediately. + if (update.drop_all) xdslb_policy->MaybeExitFallbackMode(); + // Update the drop config. + const bool drop_config_changed = + xdslb_policy->drop_config_ == nullptr || + *xdslb_policy->drop_config_ != *update.drop_config; + xdslb_policy->drop_config_ = std::move(update.drop_config); + // Ignore identical locality update. if (xdslb_policy->locality_list_ == update.locality_list) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, - "[xdslb %p] Incoming server list identical to current, " - "ignoring.", - xdslb_policy); + "[xdslb %p] Incoming locality list identical to current, " + "ignoring. (drop_config_changed=%d)", + xdslb_policy, drop_config_changed); + } + if (drop_config_changed) { + xdslb_policy->locality_map_.UpdateXdsPickerLocked(); } return; } - // If the balancer tells us to drop all the calls, we should exit fallback - // mode immediately. - // TODO(juanlishen): When we add EDS drop, we should change to check - // drop_percentage. - if (update.locality_list[0].serverlist.empty()) { - xdslb_policy->MaybeExitFallbackMode(); - } // Update the locality list. xdslb_policy->locality_list_ = std::move(update.locality_list); // Update the locality map. @@ -1661,7 +1704,8 @@ grpc_channel_args* BuildBalancerChannelArgs(const grpc_channel_args* args) { // ctor and dtor // -XdsLb::XdsLb(Args args) : LoadBalancingPolicy(std::move(args)) { +XdsLb::XdsLb(Args args) + : LoadBalancingPolicy(std::move(args)), locality_map_(this) { // Record server name. const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); const char* server_uri = grpc_channel_arg_get_string(arg); @@ -2032,6 +2076,91 @@ void XdsLb::LocalityMap::UpdateLocked( PruneLocalities(locality_list); } +void XdsLb::LocalityMap::UpdateXdsPickerLocked() { + // Construct a new xds picker which maintains a map of all locality pickers + // that are ready. Each locality is represented by a portion of the range + // proportional to its weight, such that the total range is the sum of the + // weights of all localities. + uint32_t end = 0; + size_t num_connecting = 0; + size_t num_idle = 0; + size_t num_transient_failures = 0; + Picker::PickerList pickers; + for (auto& p : map_) { + // TODO(juanlishen): We should prune a locality (and kill its stats) after + // we know we won't pick from it. We need to improve our update logic to + // make that easier. Consider the following situation: the current map has + // two READY localities A and B, and the update only contains B with the + // same addresses as before. Without the following hack, we will generate + // the same picker containing A and B because we haven't pruned A when the + // update happens. Remove the for loop below once we implement the locality + // map update. + bool in_locality_list = false; + for (size_t i = 0; i < xds_policy_->locality_list_.size(); ++i) { + if (*xds_policy_->locality_list_[i].locality_name == *p.first) { + in_locality_list = true; + break; + } + } + if (!in_locality_list) continue; + const LocalityEntry* entry = p.second.get(); + switch (entry->connectivity_state()) { + case GRPC_CHANNEL_READY: { + end += entry->locality_weight(); + pickers.push_back(MakePair(end, entry->picker_wrapper())); + break; + } + case GRPC_CHANNEL_CONNECTING: { + num_connecting++; + break; + } + case GRPC_CHANNEL_IDLE: { + num_idle++; + break; + } + case GRPC_CHANNEL_TRANSIENT_FAILURE: { + num_transient_failures++; + break; + } + default: { + gpr_log(GPR_ERROR, "Invalid locality connectivity state - %d", + entry->connectivity_state()); + } + } + } + // Pass on the constructed xds picker if it has any ready pickers in their map + // otherwise pass a QueuePicker if any of the locality pickers are in a + // connecting or idle state, finally return a transient failure picker if all + // locality pickers are in transient failure. + if (!pickers.empty()) { + xds_policy_->channel_control_helper()->UpdateState( + GRPC_CHANNEL_READY, + UniquePtr( + New(xds_policy_->Ref(DEBUG_LOCATION, "XdsLb+Picker"), + std::move(pickers)))); + } else if (num_connecting > 0) { + xds_policy_->channel_control_helper()->UpdateState( + GRPC_CHANNEL_CONNECTING, + UniquePtr( + New(xds_policy_->Ref(DEBUG_LOCATION, "QueuePicker")))); + } else if (num_idle > 0) { + xds_policy_->channel_control_helper()->UpdateState( + GRPC_CHANNEL_IDLE, + UniquePtr( + New(xds_policy_->Ref(DEBUG_LOCATION, "QueuePicker")))); + } else { + GPR_ASSERT(num_transient_failures == + xds_policy_->locality_map_.map_.size()); + grpc_error* error = + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "connections to all localities failing"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); + xds_policy_->channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, + UniquePtr(New(error))); + } +} + void XdsLb::LocalityMap::ShutdownLocked() { map_.clear(); } void XdsLb::LocalityMap::ResetBackoffLocked() { @@ -2326,87 +2455,8 @@ void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState( std::move(picker), entry_->parent_->client_stats_.FindLocalityStats(entry_->name_)); entry_->connectivity_state_ = state; - // Construct a new xds picker which maintains a map of all locality pickers - // that are ready. Each locality is represented by a portion of the range - // proportional to its weight, such that the total range is the sum of the - // weights of all localities - uint32_t end = 0; - size_t num_connecting = 0; - size_t num_idle = 0; - size_t num_transient_failures = 0; - Picker::PickerList pickers; - for (auto& p : entry_->parent_->locality_map_.map_) { - // TODO(juanlishen): We should prune a locality (and kill its stats) after - // we know we won't pick from it. We need to improve our update logic to - // make that easier. Consider the following situation: the current map has - // two READY localities A and B, and the update only contains B with the - // same addresses as before. Without the following hack, we will generate - // the same picker containing A and B because we haven't pruned A when the - // update happens. Remove the for loop below once we implement the locality - // map update. - bool in_locality_list = false; - for (size_t i = 0; i < entry_->parent_->locality_list_.size(); ++i) { - if (*entry_->parent_->locality_list_[i].locality_name == *p.first) { - in_locality_list = true; - break; - } - } - if (!in_locality_list) continue; - const LocalityEntry* entry = p.second.get(); - grpc_connectivity_state connectivity_state = entry->connectivity_state_; - switch (connectivity_state) { - case GRPC_CHANNEL_READY: { - end += entry->locality_weight_; - pickers.push_back(MakePair(end, entry->picker_wrapper_)); - break; - } - case GRPC_CHANNEL_CONNECTING: { - num_connecting++; - break; - } - case GRPC_CHANNEL_IDLE: { - num_idle++; - break; - } - case GRPC_CHANNEL_TRANSIENT_FAILURE: { - num_transient_failures++; - break; - } - default: { - gpr_log(GPR_ERROR, "Invalid locality connectivity state - %d", - connectivity_state); - } - } - } - // Pass on the constructed xds picker if it has any ready pickers in their map - // otherwise pass a QueuePicker if any of the locality pickers are in a - // connecting or idle state, finally return a transient failure picker if all - // locality pickers are in transient failure - if (!pickers.empty()) { - entry_->parent_->channel_control_helper()->UpdateState( - GRPC_CHANNEL_READY, UniquePtr( - New(std::move(pickers)))); - } else if (num_connecting > 0) { - entry_->parent_->channel_control_helper()->UpdateState( - GRPC_CHANNEL_CONNECTING, - UniquePtr(New( - entry_->parent_->Ref(DEBUG_LOCATION, "QueuePicker")))); - } else if (num_idle > 0) { - entry_->parent_->channel_control_helper()->UpdateState( - GRPC_CHANNEL_IDLE, - UniquePtr(New( - entry_->parent_->Ref(DEBUG_LOCATION, "QueuePicker")))); - } else { - GPR_ASSERT(num_transient_failures == - entry_->parent_->locality_map_.map_.size()); - grpc_error* error = - grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "connections to all localities failing"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); - entry_->parent_->channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, - UniquePtr(New(error))); - } + // Construct a new xds picker and pass it to the channel. + entry_->parent_->locality_map_.UpdateXdsPickerLocked(); } void XdsLb::LocalityMap::LocalityEntry::Helper::RequestReresolution() { diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc index 5057e614c68..85f7d669ec0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc @@ -174,7 +174,7 @@ void XdsClientStats::PruneLocalityStats() { } } -void XdsClientStats::AddCallDropped(UniquePtr category) { +void XdsClientStats::AddCallDropped(const UniquePtr& category) { total_dropped_requests_.FetchAdd(1, MemoryOrder::RELAXED); MutexLock lock(&dropped_requests_mu_); auto iter = dropped_requests_.find(category); diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h index 445675a4dcf..8f04272da75 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h @@ -208,7 +208,7 @@ class XdsClientStats { RefCountedPtr FindLocalityStats( const RefCountedPtr& locality_name); void PruneLocalityStats(); - void AddCallDropped(UniquePtr category); + void AddCallDropped(const UniquePtr& category); private: // The stats for each locality. diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc index b2615f319b9..bd8a7142e38 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc @@ -35,6 +35,7 @@ #include "envoy/api/v2/endpoint/endpoint.upb.h" #include "envoy/api/v2/endpoint/load_report.upb.h" #include "envoy/service/load_stats/v2/lrs.upb.h" +#include "envoy/type/percent.upb.h" #include "google/protobuf/any.upb.h" #include "google/protobuf/duration.upb.h" #include "google/protobuf/struct.upb.h" @@ -52,6 +53,19 @@ constexpr char kEndpointRequired[] = "endpointRequired"; } // namespace +bool XdsDropConfig::ShouldDrop(const UniquePtr** category_name) const { + for (size_t i = 0; i < drop_category_list_.size(); ++i) { + const auto& drop_category = drop_category_list_[i]; + // Generate a random number in [0, 1000000). + const int random = rand() % 1000000; + if (random < drop_category.parts_per_million) { + *category_name = &drop_category.name; + return true; + } + } + return false; +} + grpc_slice XdsEdsRequestCreateAndEncode(const char* service_name) { upb::Arena arena; // Create a request. @@ -153,6 +167,44 @@ grpc_error* LocalityParse( return GRPC_ERROR_NONE; } +grpc_error* DropParseAndAppend( + const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* drop_overload, + XdsDropConfig* drop_config, bool* drop_all) { + // Get the category. + upb_strview category = + envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_category( + drop_overload); + if (category.size == 0) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty drop category name"); + } + // Get the drop rate (per million). + const envoy_type_FractionalPercent* drop_percentage = + envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_drop_percentage( + drop_overload); + uint32_t numerator = envoy_type_FractionalPercent_numerator(drop_percentage); + const auto denominator = + static_cast( + envoy_type_FractionalPercent_denominator(drop_percentage)); + // Normalize to million. + switch (denominator) { + case envoy_type_FractionalPercent_HUNDRED: + numerator *= 10000; + break; + case envoy_type_FractionalPercent_TEN_THOUSAND: + numerator *= 100; + break; + case envoy_type_FractionalPercent_MILLION: + break; + default: + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unknown denominator type"); + } + // Cap numerator to 1000000. + numerator = GPR_MIN(numerator, 1000000); + if (numerator == 1000000) *drop_all = true; + drop_config->AddCategory(StringCopy(category), numerator); + return GRPC_ERROR_NONE; +} + } // namespace grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response, @@ -193,6 +245,7 @@ grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response, envoy_api_v2_ClusterLoadAssignment_parse( encoded_cluster_load_assignment.data, encoded_cluster_load_assignment.size, arena.ptr()); + // Get the endpoints. const envoy_api_v2_endpoint_LocalityLbEndpoints* const* endpoints = envoy_api_v2_ClusterLoadAssignment_endpoints(cluster_load_assignment, &size); @@ -207,6 +260,21 @@ grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response, std::sort(update->locality_list.data(), update->locality_list.data() + update->locality_list.size(), XdsLocalityInfo::Less()); + // Get the drop config. + update->drop_config = MakeRefCounted(); + const envoy_api_v2_ClusterLoadAssignment_Policy* policy = + envoy_api_v2_ClusterLoadAssignment_policy(cluster_load_assignment); + if (policy != nullptr) { + const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* const* + drop_overload = + envoy_api_v2_ClusterLoadAssignment_Policy_drop_overloads(policy, + &size); + for (size_t i = 0; i < size; ++i) { + grpc_error* error = DropParseAndAppend( + drop_overload[i], update->drop_config.get(), &update->drop_all); + if (error != GRPC_ERROR_NONE) return error; + } + } return GRPC_ERROR_NONE; } diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h index ed3b49ff836..cea5b50b9ba 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h @@ -50,9 +50,51 @@ struct XdsLocalityInfo { using XdsLocalityList = InlinedVector; +// There are two phases of accessing this class's content: +// 1. to initialize in the control plane combiner; +// 2. to use in the data plane combiner. +// So no additional synchronization is needed. +class XdsDropConfig : public RefCounted { + public: + struct DropCategory { + bool operator==(const DropCategory& other) const { + return strcmp(name.get(), other.name.get()) == 0 && + parts_per_million == other.parts_per_million; + } + + UniquePtr name; + const uint32_t parts_per_million; + }; + + using DropCategoryList = InlinedVector; + + void AddCategory(UniquePtr name, uint32_t parts_per_million) { + drop_category_list_.emplace_back( + DropCategory{std::move(name), parts_per_million}); + } + + // The only method invoked from the data plane combiner. + bool ShouldDrop(const UniquePtr** category_name) const; + + const DropCategoryList& drop_category_list() const { + return drop_category_list_; + } + + bool operator==(const XdsDropConfig& other) const { + return drop_category_list_ == other.drop_category_list_; + } + bool operator!=(const XdsDropConfig& other) const { + return !(*this == other); + } + + private: + DropCategoryList drop_category_list_; +}; + struct XdsUpdate { XdsLocalityList locality_list; - // TODO(juanlishen): Pass drop_per_million when adding drop support. + RefCountedPtr drop_config; + bool drop_all = false; }; // Creates an EDS request querying \a service_name. diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index d62dc828818..5d114c0ef2b 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -84,6 +84,7 @@ using ::envoy::api::v2::ClusterLoadAssignment; using ::envoy::api::v2::DiscoveryRequest; using ::envoy::api::v2::DiscoveryResponse; using ::envoy::api::v2::EndpointDiscoveryService; +using ::envoy::api::v2::FractionalPercent; using ::envoy::service::load_stats::v2::ClusterStats; using ::envoy::service::load_stats::v2::LoadReportingService; using ::envoy::service::load_stats::v2::LoadStatsRequest; @@ -95,6 +96,8 @@ constexpr char kEdsTypeUrl[] = constexpr char kDefaultLocalityRegion[] = "xds_default_locality_region"; constexpr char kDefaultLocalityZone[] = "xds_default_locality_zone"; constexpr char kDefaultLocalitySubzone[] = "xds_default_locality_subzone"; +constexpr char kLbDropType[] = "lb"; +constexpr char kThrottleDropType[] = "throttle"; template class CountedService : public ServiceType { @@ -205,6 +208,11 @@ class ClientStats { locality_stats_.emplace(input_locality_stats.locality().sub_zone(), LocalityStats(input_locality_stats)); } + for (const auto& input_dropped_requests : + cluster_stats.dropped_requests()) { + dropped_requests_.emplace(input_dropped_requests.category(), + input_dropped_requests.dropped_count()); + } } uint64_t total_successful_requests() const { @@ -236,10 +244,16 @@ class ClientStats { return sum; } uint64_t total_dropped_requests() const { return total_dropped_requests_; } + uint64_t dropped_requests(const grpc::string& category) const { + auto iter = dropped_requests_.find(category); + GPR_ASSERT(iter != dropped_requests_.end()); + return iter->second; + } private: std::map locality_stats_; uint64_t total_dropped_requests_; + std::map dropped_requests_; }; class EdsServiceImpl : public EdsService { @@ -301,8 +315,11 @@ class EdsServiceImpl : public EdsService { gpr_log(GPR_INFO, "LB[%p]: shut down", this); } - static DiscoveryResponse BuildResponseForBackends( - const std::vector>& backend_ports) { + static DiscoveryResponse BuildResponse( + const std::vector>& backend_ports, + const std::map& drop_categories = {}, + const FractionalPercent::DenominatorType denominator = + FractionalPercent::MILLION) { ClusterLoadAssignment assignment; assignment.set_cluster_name("service name"); for (size_t i = 0; i < backend_ports.size(); ++i) { @@ -323,6 +340,18 @@ class EdsServiceImpl : public EdsService { socket_address->set_port_value(backend_port); } } + if (!drop_categories.empty()) { + auto* policy = assignment.mutable_policy(); + for (const auto& p : drop_categories) { + const grpc::string& name = p.first; + const uint32_t parts_per_million = p.second; + auto* drop_overload = policy->add_drop_overloads(); + drop_overload->set_category(name); + auto* drop_percentage = drop_overload->mutable_drop_percentage(); + drop_percentage->set_numerator(parts_per_million); + drop_percentage->set_denominator(denominator); + } + } DiscoveryResponse response; response.set_type_url(kEdsTypeUrl); response.add_resources()->PackFrom(assignment); @@ -883,8 +912,7 @@ TEST_F(SingleBalancerTest, Vanilla) { SetNextResolutionForLbChannelAllBalancers(); const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( - 0, EdsServiceImpl::BuildResponseForBackends(GetBackendPortsInGroups()), - 0); + 0, EdsServiceImpl::BuildResponse(GetBackendPortsInGroups()), 0); // Make sure that trying to connect works without a call. channel_->GetState(true /* try_to_connect */); // We need to wait for all backends to come online. @@ -912,8 +940,7 @@ TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) { ports.push_back(backends_[0]->port()); ports.push_back(backends_[0]->port()); const size_t kNumRpcsPerAddress = 10; - ScheduleResponseForBalancer( - 0, EdsServiceImpl::BuildResponseForBackends({ports}), 0); + ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse({ports}), 0); // We need to wait for the backend to come online. WaitForBackend(0); // Send kNumRpcsPerAddress RPCs per server. @@ -934,8 +961,7 @@ TEST_F(SingleBalancerTest, SecureNaming) { SetNextResolutionForLbChannel({balancers_[0]->port()}); const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( - 0, EdsServiceImpl::BuildResponseForBackends(GetBackendPortsInGroups()), - 0); + 0, EdsServiceImpl::BuildResponse(GetBackendPortsInGroups()), 0); // Make sure that trying to connect works without a call. channel_->GetState(true /* try_to_connect */); // We need to wait for all backends to come online. @@ -980,11 +1006,10 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); const int kCallDeadlineMs = kServerlistDelayMs * 2; // First response is an empty serverlist, sent right away. - ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponseForBackends({{}}), - 0); + ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse({{}}), 0); // Send non-empty serverlist only after kServerlistDelayMs ScheduleResponseForBalancer( - 0, EdsServiceImpl::BuildResponseForBackends(GetBackendPortsInGroups()), + 0, EdsServiceImpl::BuildResponse(GetBackendPortsInGroups()), kServerlistDelayMs); const auto t0 = system_clock::now(); // Client will block: LB will initially send empty serverlist. @@ -1012,8 +1037,7 @@ TEST_F(SingleBalancerTest, AllServersUnreachableFailFast) { for (size_t i = 0; i < kNumUnreachableServers; ++i) { ports.push_back(grpc_pick_unused_port_or_die()); } - ScheduleResponseForBalancer( - 0, EdsServiceImpl::BuildResponseForBackends({ports}), 0); + ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse({ports}), 0); const Status status = SendRpc(); // The error shouldn't be DEADLINE_EXCEEDED. EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code()); @@ -1023,6 +1047,254 @@ TEST_F(SingleBalancerTest, AllServersUnreachableFailFast) { EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count()); } +TEST_F(SingleBalancerTest, Drop) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + const size_t kNumRpcs = 5000; + const uint32_t kDropPerMillionForLb = 100000; + const uint32_t kDropPerMillionForThrottle = 200000; + const double kDropRateForLb = kDropPerMillionForLb / 1000000.0; + const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0; + const double KDropRateForLbAndThrottle = + kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle; + // The EDS response contains two drop categories. + ScheduleResponseForBalancer( + 0, + EdsServiceImpl::BuildResponse( + GetBackendPortsInGroups(), + {{kLbDropType, kDropPerMillionForLb}, + {kThrottleDropType, kDropPerMillionForThrottle}}), + 0); + WaitForAllBackends(); + // Send kNumRpcs RPCs and count the drops. + size_t num_drops = 0; + for (size_t i = 0; i < kNumRpcs; ++i) { + EchoResponse response; + const Status status = SendRpc(&response); + if (!status.ok() && + status.error_message() == "Call dropped by load balancing policy") { + ++num_drops; + } else { + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kRequestMessage_); + } + } + // The drop rate should be roughly equal to the expectation. + const double seen_drop_rate = static_cast(num_drops) / kNumRpcs; + const double kErrorTolerance = 0.2; + EXPECT_THAT( + seen_drop_rate, + ::testing::AllOf( + ::testing::Ge(KDropRateForLbAndThrottle * (1 - kErrorTolerance)), + ::testing::Le(KDropRateForLbAndThrottle * (1 + kErrorTolerance)))); + // The EDS service got a single request, and sent a single response. + EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count()); + EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count()); +} + +TEST_F(SingleBalancerTest, DropPerHundred) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + const size_t kNumRpcs = 5000; + const uint32_t kDropPerHundredForLb = 10; + const double kDropRateForLb = kDropPerHundredForLb / 100.0; + // The EDS response contains one drop category. + ScheduleResponseForBalancer( + 0, + EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), + {{kLbDropType, kDropPerHundredForLb}}, + FractionalPercent::HUNDRED), + 0); + WaitForAllBackends(); + // Send kNumRpcs RPCs and count the drops. + size_t num_drops = 0; + for (size_t i = 0; i < kNumRpcs; ++i) { + EchoResponse response; + const Status status = SendRpc(&response); + if (!status.ok() && + status.error_message() == "Call dropped by load balancing policy") { + ++num_drops; + } else { + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kRequestMessage_); + } + } + // The drop rate should be roughly equal to the expectation. + const double seen_drop_rate = static_cast(num_drops) / kNumRpcs; + const double kErrorTolerance = 0.2; + EXPECT_THAT( + seen_drop_rate, + ::testing::AllOf(::testing::Ge(kDropRateForLb * (1 - kErrorTolerance)), + ::testing::Le(kDropRateForLb * (1 + kErrorTolerance)))); + // The EDS service got a single request, and sent a single response. + EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count()); + EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count()); +} + +TEST_F(SingleBalancerTest, DropPerTenThousand) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + const size_t kNumRpcs = 5000; + const uint32_t kDropPerTenThousandForLb = 1000; + const double kDropRateForLb = kDropPerTenThousandForLb / 10000.0; + // The EDS response contains one drop category. + ScheduleResponseForBalancer( + 0, + EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), + {{kLbDropType, kDropPerTenThousandForLb}}, + FractionalPercent::TEN_THOUSAND), + 0); + WaitForAllBackends(); + // Send kNumRpcs RPCs and count the drops. + size_t num_drops = 0; + for (size_t i = 0; i < kNumRpcs; ++i) { + EchoResponse response; + const Status status = SendRpc(&response); + if (!status.ok() && + status.error_message() == "Call dropped by load balancing policy") { + ++num_drops; + } else { + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kRequestMessage_); + } + } + // The drop rate should be roughly equal to the expectation. + const double seen_drop_rate = static_cast(num_drops) / kNumRpcs; + const double kErrorTolerance = 0.2; + EXPECT_THAT( + seen_drop_rate, + ::testing::AllOf(::testing::Ge(kDropRateForLb * (1 - kErrorTolerance)), + ::testing::Le(kDropRateForLb * (1 + kErrorTolerance)))); + // The EDS service got a single request, and sent a single response. + EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count()); + EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count()); +} + +TEST_F(SingleBalancerTest, DropUpdate) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + const size_t kNumRpcs = 5000; + const uint32_t kDropPerMillionForLb = 100000; + const uint32_t kDropPerMillionForThrottle = 200000; + const double kDropRateForLb = kDropPerMillionForLb / 1000000.0; + const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0; + const double KDropRateForLbAndThrottle = + kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle; + // The first EDS response contains one drop category. + ScheduleResponseForBalancer( + 0, + EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), + {{kLbDropType, kDropPerMillionForLb}}), + 0); + // The second EDS response contains two drop categories. + ScheduleResponseForBalancer( + 0, + EdsServiceImpl::BuildResponse( + GetBackendPortsInGroups(), + {{kLbDropType, kDropPerMillionForLb}, + {kThrottleDropType, kDropPerMillionForThrottle}}), + 5000); + WaitForAllBackends(); + // Send kNumRpcs RPCs and count the drops. + size_t num_drops = 0; + gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); + for (size_t i = 0; i < kNumRpcs; ++i) { + EchoResponse response; + const Status status = SendRpc(&response); + if (!status.ok() && + status.error_message() == "Call dropped by load balancing policy") { + ++num_drops; + } else { + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kRequestMessage_); + } + } + gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH =========="); + // The drop rate should be roughly equal to the expectation. + double seen_drop_rate = static_cast(num_drops) / kNumRpcs; + const double kErrorTolerance = 0.2; + EXPECT_THAT( + seen_drop_rate, + ::testing::AllOf(::testing::Ge(kDropRateForLb * (1 - kErrorTolerance)), + ::testing::Le(kDropRateForLb * (1 + kErrorTolerance)))); + // Wait until the drop rate increases to the middle of the two configs, which + // implies that the update has been in effect. + const double kDropRateThreshold = + (kDropRateForLb + KDropRateForLbAndThrottle) / 2; + size_t num_rpcs = kNumRpcs; + while (seen_drop_rate < kDropRateThreshold) { + EchoResponse response; + const Status status = SendRpc(&response); + ++num_rpcs; + if (!status.ok() && + status.error_message() == "Call dropped by load balancing policy") { + ++num_drops; + } else { + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kRequestMessage_); + } + seen_drop_rate = static_cast(num_drops) / num_rpcs; + } + // Send kNumRpcs RPCs and count the drops. + num_drops = 0; + gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); + for (size_t i = 0; i < kNumRpcs; ++i) { + EchoResponse response; + const Status status = SendRpc(&response); + if (!status.ok() && + status.error_message() == "Call dropped by load balancing policy") { + ++num_drops; + } else { + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kRequestMessage_); + } + } + gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH =========="); + // The new drop rate should be roughly equal to the expectation. + seen_drop_rate = static_cast(num_drops) / kNumRpcs; + EXPECT_THAT( + seen_drop_rate, + ::testing::AllOf( + ::testing::Ge(KDropRateForLbAndThrottle * (1 - kErrorTolerance)), + ::testing::Le(KDropRateForLbAndThrottle * (1 + kErrorTolerance)))); + // The EDS service got a single request, + EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count()); + // and sent two responses + EXPECT_EQ(2U, balancers_[0]->eds_service()->response_count()); +} + +TEST_F(SingleBalancerTest, DropAll) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + const size_t kNumRpcs = 1000; + const uint32_t kDropPerMillionForLb = 100000; + const uint32_t kDropPerMillionForThrottle = 1000000; + // The EDS response contains two drop categories. + ScheduleResponseForBalancer( + 0, + EdsServiceImpl::BuildResponse( + GetBackendPortsInGroups(), + {{kLbDropType, kDropPerMillionForLb}, + {kThrottleDropType, kDropPerMillionForThrottle}}), + 0); + // Send kNumRpcs RPCs and all of them are dropped. + for (size_t i = 0; i < kNumRpcs; ++i) { + EchoResponse response; + const Status status = SendRpc(&response); + EXPECT_TRUE(!status.ok() && status.error_message() == + "Call dropped by load balancing policy"); + } + // The EDS service got a single request, and sent a single response. + EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count()); + EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count()); +} + TEST_F(SingleBalancerTest, Fallback) { const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor(); const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); @@ -1034,7 +1306,7 @@ TEST_F(SingleBalancerTest, Fallback) { // Send non-empty serverlist only after kServerlistDelayMs. ScheduleResponseForBalancer( 0, - EdsServiceImpl::BuildResponseForBackends( + EdsServiceImpl::BuildResponse( GetBackendPortsInGroups(kNumBackendsInResolution /* start_index */)), kServerlistDelayMs); // Wait until all the fallback backends are reachable. @@ -1083,7 +1355,7 @@ TEST_F(SingleBalancerTest, FallbackUpdate) { // Send non-empty serverlist only after kServerlistDelayMs. ScheduleResponseForBalancer( 0, - EdsServiceImpl::BuildResponseForBackends(GetBackendPortsInGroups( + EdsServiceImpl::BuildResponse(GetBackendPortsInGroups( kNumBackendsInResolution + kNumBackendsInResolutionUpdate /* start_index */)), kServerlistDelayMs); @@ -1184,10 +1456,8 @@ TEST_F(SingleBalancerTest, FallbackIfResponseReceivedButChildNotReady) { SetNextResolutionForLbChannelAllBalancers(); // Send a serverlist that only contains an unreachable backend before fallback // timeout. - ScheduleResponseForBalancer(0, - EdsServiceImpl::BuildResponseForBackends( - {{grpc_pick_unused_port_or_die()}}), - 0); + ScheduleResponseForBalancer( + 0, EdsServiceImpl::BuildResponse({{grpc_pick_unused_port_or_die()}}), 0); // Because no child policy is ready before fallback timeout, we enter fallback // mode. WaitForBackend(0); @@ -1199,9 +1469,12 @@ TEST_F(SingleBalancerTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) { SetNextResolutionForLbChannel({grpc_pick_unused_port_or_die()}); // Enter fallback mode because the LB channel fails to connect. WaitForBackend(0); - // Return a new balancer that sends an empty serverlist. - ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponseForBackends({{}}), - 0); + // Return a new balancer that sends a response to drop all calls. + ScheduleResponseForBalancer( + 0, + EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), + {{kLbDropType, 1000000}}), + 0); SetNextResolutionForLbChannelAllBalancers(); // Send RPCs until failure. gpr_timespec deadline = gpr_time_add( @@ -1222,7 +1495,7 @@ TEST_F(SingleBalancerTest, FallbackModeIsExitedAfterChildRready) { // Return a new balancer that sends a dead backend. ShutdownBackend(1); ScheduleResponseForBalancer( - 0, EdsServiceImpl::BuildResponseForBackends({{backends_[1]->port()}}), 0); + 0, EdsServiceImpl::BuildResponse({{backends_[1]->port()}}), 0); SetNextResolutionForLbChannelAllBalancers(); // The state (TRANSIENT_FAILURE) update from the child policy will be ignored // because we are still in fallback mode. @@ -1247,8 +1520,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) { SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannelAllBalancers(); ScheduleResponseForBalancer( - 0, EdsServiceImpl::BuildResponseForBackends(GetBackendPortsInGroups()), - 0); + 0, EdsServiceImpl::BuildResponse(GetBackendPortsInGroups()), 0); WaitForAllBackends(); // Stop backends. RPCs should fail. ShutdownAllBackends(); @@ -1269,10 +1541,10 @@ TEST_F(UpdatesTest, UpdateBalancersButKeepUsingOriginalBalancer) { SetNextResolutionForLbChannelAllBalancers(); auto first_backend = GetBackendPortsInGroups(0, 1); auto second_backend = GetBackendPortsInGroups(1, 2); - ScheduleResponseForBalancer( - 0, EdsServiceImpl::BuildResponseForBackends(first_backend), 0); - ScheduleResponseForBalancer( - 1, EdsServiceImpl::BuildResponseForBackends(second_backend), 0); + ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(first_backend), + 0); + ScheduleResponseForBalancer(1, EdsServiceImpl::BuildResponse(second_backend), + 0); // Wait until the first backend is ready. WaitForBackend(0); @@ -1322,10 +1594,10 @@ TEST_F(UpdatesTest, UpdateBalancerName) { SetNextResolutionForLbChannelAllBalancers(); auto first_backend = GetBackendPortsInGroups(0, 1); auto second_backend = GetBackendPortsInGroups(1, 2); - ScheduleResponseForBalancer( - 0, EdsServiceImpl::BuildResponseForBackends(first_backend), 0); - ScheduleResponseForBalancer( - 1, EdsServiceImpl::BuildResponseForBackends(second_backend), 0); + ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(first_backend), + 0); + ScheduleResponseForBalancer(1, EdsServiceImpl::BuildResponse(second_backend), + 0); // Wait until the first backend is ready. WaitForBackend(0); @@ -1393,10 +1665,10 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { SetNextResolutionForLbChannelAllBalancers(); auto first_backend = GetBackendPortsInGroups(0, 1); auto second_backend = GetBackendPortsInGroups(1, 2); - ScheduleResponseForBalancer( - 0, EdsServiceImpl::BuildResponseForBackends(first_backend), 0); - ScheduleResponseForBalancer( - 1, EdsServiceImpl::BuildResponseForBackends(second_backend), 0); + ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(first_backend), + 0); + ScheduleResponseForBalancer(1, EdsServiceImpl::BuildResponse(second_backend), + 0); // Wait until the first backend is ready. WaitForBackend(0); @@ -1461,10 +1733,10 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { SetNextResolutionForLbChannel({balancers_[0]->port()}); auto first_backend = GetBackendPortsInGroups(0, 1); auto second_backend = GetBackendPortsInGroups(1, 2); - ScheduleResponseForBalancer( - 0, EdsServiceImpl::BuildResponseForBackends(first_backend), 0); - ScheduleResponseForBalancer( - 1, EdsServiceImpl::BuildResponseForBackends(second_backend), 0); + ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(first_backend), + 0); + ScheduleResponseForBalancer(1, EdsServiceImpl::BuildResponse(second_backend), + 0); // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); @@ -1535,14 +1807,6 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { // TODO(juanlishen): Add TEST_F(UpdatesWithClientLoadReportingTest, // ReresolveDeadBalancer) -// The drop tests are deferred because the drop handling hasn't been added yet. - -// TODO(roth): Add TEST_F(SingleBalancerTest, Drop) - -// TODO(roth): Add TEST_F(SingleBalancerTest, DropAllFirst) - -// TODO(roth): Add TEST_F(SingleBalancerTest, DropAll) - class SingleBalancerWithClientLoadReportingTest : public XdsEnd2endTest { public: SingleBalancerWithClientLoadReportingTest() : XdsEnd2endTest(4, 1, 3) {} @@ -1555,7 +1819,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { // TODO(juanlishen): Partition the backends after multiple localities is // tested. ScheduleResponseForBalancer(0, - EdsServiceImpl::BuildResponseForBackends( + EdsServiceImpl::BuildResponse( GetBackendPortsInGroups(0, backends_.size())), 0); // Wait until all backends are ready. @@ -1595,7 +1859,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) { backends_.size() - kNumBackendsFirstPass; ScheduleResponseForBalancer( 0, - EdsServiceImpl::BuildResponseForBackends( + EdsServiceImpl::BuildResponse( GetBackendPortsInGroups(0, kNumBackendsFirstPass)), 0); // Wait until all backends returned by the balancer are ready. @@ -1626,7 +1890,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) { balancers_[0]->Start(server_host_); ScheduleResponseForBalancer( 0, - EdsServiceImpl::BuildResponseForBackends( + EdsServiceImpl::BuildResponse( GetBackendPortsInGroups(kNumBackendsFirstPass)), 0); // Wait for queries to start going to one of the new backends. @@ -1646,7 +1910,75 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) { EXPECT_EQ(0U, client_stats->total_dropped_requests()); } -// TODO(juanlishen): Add TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) +class SingleBalancerWithClientLoadReportingAndDropTest : public XdsEnd2endTest { + public: + SingleBalancerWithClientLoadReportingAndDropTest() + : XdsEnd2endTest(4, 1, 20) {} +}; + +TEST_F(SingleBalancerWithClientLoadReportingAndDropTest, Vanilla) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + const size_t kNumRpcs = 3000; + const uint32_t kDropPerMillionForLb = 100000; + const uint32_t kDropPerMillionForThrottle = 200000; + const double kDropRateForLb = kDropPerMillionForLb / 1000000.0; + const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0; + const double KDropRateForLbAndThrottle = + kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle; + // The EDS response contains two drop categories. + ScheduleResponseForBalancer( + 0, + EdsServiceImpl::BuildResponse( + GetBackendPortsInGroups(), + {{kLbDropType, kDropPerMillionForLb}, + {kThrottleDropType, kDropPerMillionForThrottle}}), + 0); + int num_ok = 0; + int num_failure = 0; + int num_drops = 0; + std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends(); + const size_t num_warmup = num_ok + num_failure + num_drops; + // Send kNumRpcs RPCs and count the drops. + for (size_t i = 0; i < kNumRpcs; ++i) { + EchoResponse response; + const Status status = SendRpc(&response); + if (!status.ok() && + status.error_message() == "Call dropped by load balancing policy") { + ++num_drops; + } else { + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kRequestMessage_); + } + } + // The drop rate should be roughly equal to the expectation. + const double seen_drop_rate = static_cast(num_drops) / kNumRpcs; + const double kErrorTolerance = 0.2; + EXPECT_THAT( + seen_drop_rate, + ::testing::AllOf( + ::testing::Ge(KDropRateForLbAndThrottle * (1 - kErrorTolerance)), + ::testing::Le(KDropRateForLbAndThrottle * (1 + kErrorTolerance)))); + // Check client stats. + ClientStats* client_stats = balancers_[0]->lrs_service()->WaitForLoadReport(); + EXPECT_EQ(num_drops, client_stats->total_dropped_requests()); + const size_t total_rpc = num_warmup + kNumRpcs; + EXPECT_THAT( + client_stats->dropped_requests(kLbDropType), + ::testing::AllOf( + ::testing::Ge(total_rpc * kDropRateForLb * (1 - kErrorTolerance)), + ::testing::Le(total_rpc * kDropRateForLb * (1 + kErrorTolerance)))); + EXPECT_THAT(client_stats->dropped_requests(kThrottleDropType), + ::testing::AllOf( + ::testing::Ge(total_rpc * (1 - kDropRateForLb) * + kDropRateForThrottle * (1 - kErrorTolerance)), + ::testing::Le(total_rpc * (1 - kDropRateForLb) * + kDropRateForThrottle * (1 + kErrorTolerance)))); + // The EDS service got a single request, and sent a single response. + EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count()); + EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count()); +} } // namespace } // namespace testing