Merge pull request #19729 from AspirinSJL/xds_drop

Add drop in xds
pull/19858/head
Juanli Shen 6 years ago committed by GitHub
commit a72a6f9908
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 250
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  2. 2
      src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc
  3. 2
      src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h
  4. 68
      src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc
  5. 44
      src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h
  6. 438
      test/cpp/end2end/xds_end2end_test.cc

@ -405,7 +405,10 @@ class XdsLb : public LoadBalancingPolicy {
// previous value in the vector and is 0 for the first element.
using PickerList =
InlinedVector<Pair<uint32_t, RefCountedPtr<PickerWrapper>>, 1>;
explicit Picker(PickerList pickers) : pickers_(std::move(pickers)) {}
Picker(RefCountedPtr<XdsLb> xds_policy, PickerList pickers)
: xds_policy_(std::move(xds_policy)),
pickers_(std::move(pickers)),
drop_config_(xds_policy_->drop_config_) {}
PickResult Pick(PickArgs args) override;
@ -413,7 +416,9 @@ class XdsLb : public LoadBalancingPolicy {
// Calls the picker of the locality that the key falls within.
PickResult PickFromLocality(const uint32_t key, PickArgs args);
RefCountedPtr<XdsLb> xds_policy_;
PickerList pickers_;
RefCountedPtr<XdsDropConfig> drop_config_;
};
class FallbackHelper : public ChannelControlHelper {
@ -458,6 +463,14 @@ class XdsLb : public LoadBalancingPolicy {
void ResetBackoffLocked();
void Orphan() override;
grpc_connectivity_state connectivity_state() const {
return connectivity_state_;
}
uint32_t locality_weight() const { return locality_weight_; }
RefCountedPtr<PickerWrapper> picker_wrapper() const {
return picker_wrapper_;
}
private:
class Helper : public ChannelControlHelper {
public:
@ -500,15 +513,19 @@ class XdsLb : public LoadBalancingPolicy {
uint32_t locality_weight_;
};
explicit LocalityMap(XdsLb* xds_policy) : xds_policy_(xds_policy) {}
void UpdateLocked(const XdsLocalityList& locality_list,
LoadBalancingPolicy::Config* child_policy_config,
const grpc_channel_args* args, XdsLb* parent);
void UpdateXdsPickerLocked();
void ShutdownLocked();
void ResetBackoffLocked();
private:
void PruneLocalities(const XdsLocalityList& locality_list);
XdsLb* xds_policy_;
Map<RefCountedPtr<XdsLocalityName>, OrphanablePtr<LocalityEntry>,
XdsLocalityName::Less>
map_;
@ -594,6 +611,9 @@ class XdsLb : public LoadBalancingPolicy {
// the current one when new localities in the pending map are ready
// to accept connections
// The config for dropping calls.
RefCountedPtr<XdsDropConfig> drop_config_;
// The stats for client-side load reporting.
XdsClientStats client_stats_;
};
@ -637,7 +657,14 @@ void XdsLb::PickerWrapper::RecordCallCompletion(
//
XdsLb::PickResult XdsLb::Picker::Pick(PickArgs args) {
// TODO(roth): Add support for drop handling.
// Handle drop.
const UniquePtr<char>* drop_category;
if (drop_config_->ShouldDrop(&drop_category)) {
xds_policy_->client_stats_.AddCallDropped(*drop_category);
PickResult result;
result.type = PickResult::PICK_COMPLETE;
return result;
}
// Generate a random number in [0, total weight).
const uint32_t key = rand() % pickers_[pickers_.size() - 1].first;
// Forward pick to whichever locality maps to the range in which the
@ -1092,12 +1119,12 @@ void XdsLb::LbChannelState::EdsCallState::OnResponseReceivedLocked(
GRPC_ERROR_UNREF(parse_error);
return;
}
if (update.locality_list.empty()) {
if (update.locality_list.empty() && !update.drop_all) {
char* response_slice_str =
grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
gpr_log(GPR_ERROR,
"[xdslb %p] EDS response '%s' doesn't contain any valid locality "
"update. Ignoring.",
"but doesn't require to drop all calls. Ignoring.",
xdslb_policy, response_slice_str);
gpr_free(response_slice_str);
return;
@ -1105,8 +1132,11 @@ void XdsLb::LbChannelState::EdsCallState::OnResponseReceivedLocked(
eds_calld->seen_response_ = true;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO,
"[xdslb %p] EDS response with %" PRIuPTR " localities received",
xdslb_policy, update.locality_list.size());
"[xdslb %p] EDS response with %" PRIuPTR
" localities and %" PRIuPTR
" drop categories received (drop_all=%d)",
xdslb_policy, update.locality_list.size(),
update.drop_config->drop_category_list().size(), update.drop_all);
for (size_t i = 0; i < update.locality_list.size(); ++i) {
const XdsLocalityInfo& locality = update.locality_list[i];
gpr_log(GPR_INFO,
@ -1127,8 +1157,17 @@ void XdsLb::LbChannelState::EdsCallState::OnResponseReceivedLocked(
gpr_free(ipport);
}
}
for (size_t i = 0; i < update.drop_config->drop_category_list().size();
++i) {
const XdsDropConfig::DropCategory& drop_category =
update.drop_config->drop_category_list()[i];
gpr_log(GPR_INFO,
"[xdslb %p] Drop category %s has drop rate %d per million",
xdslb_policy, drop_category.name.get(),
drop_category.parts_per_million);
}
}
// Pending LB channel receives a serverlist; promote it.
// Pending LB channel receives a response; promote it.
// Note that this call can't be on a discarded pending channel, because
// such channels don't have any current call but we have checked this call
// is a current call.
@ -1147,23 +1186,27 @@ void XdsLb::LbChannelState::EdsCallState::OnResponseReceivedLocked(
// load reporting.
LrsCallState* lrs_calld = lb_chand->lrs_calld_->lb_calld();
if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
// Ignore identical update.
// If the balancer tells us to drop all the calls, we should exit fallback
// mode immediately.
if (update.drop_all) xdslb_policy->MaybeExitFallbackMode();
// Update the drop config.
const bool drop_config_changed =
xdslb_policy->drop_config_ == nullptr ||
*xdslb_policy->drop_config_ != *update.drop_config;
xdslb_policy->drop_config_ = std::move(update.drop_config);
// Ignore identical locality update.
if (xdslb_policy->locality_list_ == update.locality_list) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO,
"[xdslb %p] Incoming server list identical to current, "
"ignoring.",
xdslb_policy);
"[xdslb %p] Incoming locality list identical to current, "
"ignoring. (drop_config_changed=%d)",
xdslb_policy, drop_config_changed);
}
if (drop_config_changed) {
xdslb_policy->locality_map_.UpdateXdsPickerLocked();
}
return;
}
// If the balancer tells us to drop all the calls, we should exit fallback
// mode immediately.
// TODO(juanlishen): When we add EDS drop, we should change to check
// drop_percentage.
if (update.locality_list[0].serverlist.empty()) {
xdslb_policy->MaybeExitFallbackMode();
}
// Update the locality list.
xdslb_policy->locality_list_ = std::move(update.locality_list);
// Update the locality map.
@ -1661,7 +1704,8 @@ grpc_channel_args* BuildBalancerChannelArgs(const grpc_channel_args* args) {
// ctor and dtor
//
XdsLb::XdsLb(Args args) : LoadBalancingPolicy(std::move(args)) {
XdsLb::XdsLb(Args args)
: LoadBalancingPolicy(std::move(args)), locality_map_(this) {
// Record server name.
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
const char* server_uri = grpc_channel_arg_get_string(arg);
@ -2032,6 +2076,91 @@ void XdsLb::LocalityMap::UpdateLocked(
PruneLocalities(locality_list);
}
void XdsLb::LocalityMap::UpdateXdsPickerLocked() {
// Construct a new xds picker which maintains a map of all locality pickers
// that are ready. Each locality is represented by a portion of the range
// proportional to its weight, such that the total range is the sum of the
// weights of all localities.
uint32_t end = 0;
size_t num_connecting = 0;
size_t num_idle = 0;
size_t num_transient_failures = 0;
Picker::PickerList pickers;
for (auto& p : map_) {
// TODO(juanlishen): We should prune a locality (and kill its stats) after
// we know we won't pick from it. We need to improve our update logic to
// make that easier. Consider the following situation: the current map has
// two READY localities A and B, and the update only contains B with the
// same addresses as before. Without the following hack, we will generate
// the same picker containing A and B because we haven't pruned A when the
// update happens. Remove the for loop below once we implement the locality
// map update.
bool in_locality_list = false;
for (size_t i = 0; i < xds_policy_->locality_list_.size(); ++i) {
if (*xds_policy_->locality_list_[i].locality_name == *p.first) {
in_locality_list = true;
break;
}
}
if (!in_locality_list) continue;
const LocalityEntry* entry = p.second.get();
switch (entry->connectivity_state()) {
case GRPC_CHANNEL_READY: {
end += entry->locality_weight();
pickers.push_back(MakePair(end, entry->picker_wrapper()));
break;
}
case GRPC_CHANNEL_CONNECTING: {
num_connecting++;
break;
}
case GRPC_CHANNEL_IDLE: {
num_idle++;
break;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
num_transient_failures++;
break;
}
default: {
gpr_log(GPR_ERROR, "Invalid locality connectivity state - %d",
entry->connectivity_state());
}
}
}
// Pass on the constructed xds picker if it has any ready pickers in their map
// otherwise pass a QueuePicker if any of the locality pickers are in a
// connecting or idle state, finally return a transient failure picker if all
// locality pickers are in transient failure.
if (!pickers.empty()) {
xds_policy_->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY,
UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
New<Picker>(xds_policy_->Ref(DEBUG_LOCATION, "XdsLb+Picker"),
std::move(pickers))));
} else if (num_connecting > 0) {
xds_policy_->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING,
UniquePtr<SubchannelPicker>(
New<QueuePicker>(xds_policy_->Ref(DEBUG_LOCATION, "QueuePicker"))));
} else if (num_idle > 0) {
xds_policy_->channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE,
UniquePtr<SubchannelPicker>(
New<QueuePicker>(xds_policy_->Ref(DEBUG_LOCATION, "QueuePicker"))));
} else {
GPR_ASSERT(num_transient_failures ==
xds_policy_->locality_map_.map_.size());
grpc_error* error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"connections to all localities failing"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
xds_policy_->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
}
}
void XdsLb::LocalityMap::ShutdownLocked() { map_.clear(); }
void XdsLb::LocalityMap::ResetBackoffLocked() {
@ -2326,87 +2455,8 @@ void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState(
std::move(picker),
entry_->parent_->client_stats_.FindLocalityStats(entry_->name_));
entry_->connectivity_state_ = state;
// Construct a new xds picker which maintains a map of all locality pickers
// that are ready. Each locality is represented by a portion of the range
// proportional to its weight, such that the total range is the sum of the
// weights of all localities
uint32_t end = 0;
size_t num_connecting = 0;
size_t num_idle = 0;
size_t num_transient_failures = 0;
Picker::PickerList pickers;
for (auto& p : entry_->parent_->locality_map_.map_) {
// TODO(juanlishen): We should prune a locality (and kill its stats) after
// we know we won't pick from it. We need to improve our update logic to
// make that easier. Consider the following situation: the current map has
// two READY localities A and B, and the update only contains B with the
// same addresses as before. Without the following hack, we will generate
// the same picker containing A and B because we haven't pruned A when the
// update happens. Remove the for loop below once we implement the locality
// map update.
bool in_locality_list = false;
for (size_t i = 0; i < entry_->parent_->locality_list_.size(); ++i) {
if (*entry_->parent_->locality_list_[i].locality_name == *p.first) {
in_locality_list = true;
break;
}
}
if (!in_locality_list) continue;
const LocalityEntry* entry = p.second.get();
grpc_connectivity_state connectivity_state = entry->connectivity_state_;
switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
end += entry->locality_weight_;
pickers.push_back(MakePair(end, entry->picker_wrapper_));
break;
}
case GRPC_CHANNEL_CONNECTING: {
num_connecting++;
break;
}
case GRPC_CHANNEL_IDLE: {
num_idle++;
break;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
num_transient_failures++;
break;
}
default: {
gpr_log(GPR_ERROR, "Invalid locality connectivity state - %d",
connectivity_state);
}
}
}
// Pass on the constructed xds picker if it has any ready pickers in their map
// otherwise pass a QueuePicker if any of the locality pickers are in a
// connecting or idle state, finally return a transient failure picker if all
// locality pickers are in transient failure
if (!pickers.empty()) {
entry_->parent_->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
New<Picker>(std::move(pickers))));
} else if (num_connecting > 0) {
entry_->parent_->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING,
UniquePtr<SubchannelPicker>(New<QueuePicker>(
entry_->parent_->Ref(DEBUG_LOCATION, "QueuePicker"))));
} else if (num_idle > 0) {
entry_->parent_->channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE,
UniquePtr<SubchannelPicker>(New<QueuePicker>(
entry_->parent_->Ref(DEBUG_LOCATION, "QueuePicker"))));
} else {
GPR_ASSERT(num_transient_failures ==
entry_->parent_->locality_map_.map_.size());
grpc_error* error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"connections to all localities failing"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
entry_->parent_->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE,
UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
}
// Construct a new xds picker and pass it to the channel.
entry_->parent_->locality_map_.UpdateXdsPickerLocked();
}
void XdsLb::LocalityMap::LocalityEntry::Helper::RequestReresolution() {

@ -174,7 +174,7 @@ void XdsClientStats::PruneLocalityStats() {
}
}
void XdsClientStats::AddCallDropped(UniquePtr<char> category) {
void XdsClientStats::AddCallDropped(const UniquePtr<char>& category) {
total_dropped_requests_.FetchAdd(1, MemoryOrder::RELAXED);
MutexLock lock(&dropped_requests_mu_);
auto iter = dropped_requests_.find(category);

@ -208,7 +208,7 @@ class XdsClientStats {
RefCountedPtr<LocalityStats> FindLocalityStats(
const RefCountedPtr<XdsLocalityName>& locality_name);
void PruneLocalityStats();
void AddCallDropped(UniquePtr<char> category);
void AddCallDropped(const UniquePtr<char>& category);
private:
// The stats for each locality.

@ -35,6 +35,7 @@
#include "envoy/api/v2/endpoint/endpoint.upb.h"
#include "envoy/api/v2/endpoint/load_report.upb.h"
#include "envoy/service/load_stats/v2/lrs.upb.h"
#include "envoy/type/percent.upb.h"
#include "google/protobuf/any.upb.h"
#include "google/protobuf/duration.upb.h"
#include "google/protobuf/struct.upb.h"
@ -52,6 +53,19 @@ constexpr char kEndpointRequired[] = "endpointRequired";
} // namespace
bool XdsDropConfig::ShouldDrop(const UniquePtr<char>** category_name) const {
for (size_t i = 0; i < drop_category_list_.size(); ++i) {
const auto& drop_category = drop_category_list_[i];
// Generate a random number in [0, 1000000).
const int random = rand() % 1000000;
if (random < drop_category.parts_per_million) {
*category_name = &drop_category.name;
return true;
}
}
return false;
}
grpc_slice XdsEdsRequestCreateAndEncode(const char* service_name) {
upb::Arena arena;
// Create a request.
@ -153,6 +167,44 @@ grpc_error* LocalityParse(
return GRPC_ERROR_NONE;
}
grpc_error* DropParseAndAppend(
const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* drop_overload,
XdsDropConfig* drop_config, bool* drop_all) {
// Get the category.
upb_strview category =
envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_category(
drop_overload);
if (category.size == 0) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty drop category name");
}
// Get the drop rate (per million).
const envoy_type_FractionalPercent* drop_percentage =
envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_drop_percentage(
drop_overload);
uint32_t numerator = envoy_type_FractionalPercent_numerator(drop_percentage);
const auto denominator =
static_cast<envoy_type_FractionalPercent_DenominatorType>(
envoy_type_FractionalPercent_denominator(drop_percentage));
// Normalize to million.
switch (denominator) {
case envoy_type_FractionalPercent_HUNDRED:
numerator *= 10000;
break;
case envoy_type_FractionalPercent_TEN_THOUSAND:
numerator *= 100;
break;
case envoy_type_FractionalPercent_MILLION:
break;
default:
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unknown denominator type");
}
// Cap numerator to 1000000.
numerator = GPR_MIN(numerator, 1000000);
if (numerator == 1000000) *drop_all = true;
drop_config->AddCategory(StringCopy(category), numerator);
return GRPC_ERROR_NONE;
}
} // namespace
grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response,
@ -193,6 +245,7 @@ grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response,
envoy_api_v2_ClusterLoadAssignment_parse(
encoded_cluster_load_assignment.data,
encoded_cluster_load_assignment.size, arena.ptr());
// Get the endpoints.
const envoy_api_v2_endpoint_LocalityLbEndpoints* const* endpoints =
envoy_api_v2_ClusterLoadAssignment_endpoints(cluster_load_assignment,
&size);
@ -207,6 +260,21 @@ grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response,
std::sort(update->locality_list.data(),
update->locality_list.data() + update->locality_list.size(),
XdsLocalityInfo::Less());
// Get the drop config.
update->drop_config = MakeRefCounted<XdsDropConfig>();
const envoy_api_v2_ClusterLoadAssignment_Policy* policy =
envoy_api_v2_ClusterLoadAssignment_policy(cluster_load_assignment);
if (policy != nullptr) {
const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* const*
drop_overload =
envoy_api_v2_ClusterLoadAssignment_Policy_drop_overloads(policy,
&size);
for (size_t i = 0; i < size; ++i) {
grpc_error* error = DropParseAndAppend(
drop_overload[i], update->drop_config.get(), &update->drop_all);
if (error != GRPC_ERROR_NONE) return error;
}
}
return GRPC_ERROR_NONE;
}

@ -50,9 +50,51 @@ struct XdsLocalityInfo {
using XdsLocalityList = InlinedVector<XdsLocalityInfo, 1>;
// There are two phases of accessing this class's content:
// 1. to initialize in the control plane combiner;
// 2. to use in the data plane combiner.
// So no additional synchronization is needed.
class XdsDropConfig : public RefCounted<XdsDropConfig> {
public:
struct DropCategory {
bool operator==(const DropCategory& other) const {
return strcmp(name.get(), other.name.get()) == 0 &&
parts_per_million == other.parts_per_million;
}
UniquePtr<char> name;
const uint32_t parts_per_million;
};
using DropCategoryList = InlinedVector<DropCategory, 2>;
void AddCategory(UniquePtr<char> name, uint32_t parts_per_million) {
drop_category_list_.emplace_back(
DropCategory{std::move(name), parts_per_million});
}
// The only method invoked from the data plane combiner.
bool ShouldDrop(const UniquePtr<char>** category_name) const;
const DropCategoryList& drop_category_list() const {
return drop_category_list_;
}
bool operator==(const XdsDropConfig& other) const {
return drop_category_list_ == other.drop_category_list_;
}
bool operator!=(const XdsDropConfig& other) const {
return !(*this == other);
}
private:
DropCategoryList drop_category_list_;
};
struct XdsUpdate {
XdsLocalityList locality_list;
// TODO(juanlishen): Pass drop_per_million when adding drop support.
RefCountedPtr<XdsDropConfig> drop_config;
bool drop_all = false;
};
// Creates an EDS request querying \a service_name.

@ -84,6 +84,7 @@ using ::envoy::api::v2::ClusterLoadAssignment;
using ::envoy::api::v2::DiscoveryRequest;
using ::envoy::api::v2::DiscoveryResponse;
using ::envoy::api::v2::EndpointDiscoveryService;
using ::envoy::api::v2::FractionalPercent;
using ::envoy::service::load_stats::v2::ClusterStats;
using ::envoy::service::load_stats::v2::LoadReportingService;
using ::envoy::service::load_stats::v2::LoadStatsRequest;
@ -95,6 +96,8 @@ constexpr char kEdsTypeUrl[] =
constexpr char kDefaultLocalityRegion[] = "xds_default_locality_region";
constexpr char kDefaultLocalityZone[] = "xds_default_locality_zone";
constexpr char kDefaultLocalitySubzone[] = "xds_default_locality_subzone";
constexpr char kLbDropType[] = "lb";
constexpr char kThrottleDropType[] = "throttle";
template <typename ServiceType>
class CountedService : public ServiceType {
@ -205,6 +208,11 @@ class ClientStats {
locality_stats_.emplace(input_locality_stats.locality().sub_zone(),
LocalityStats(input_locality_stats));
}
for (const auto& input_dropped_requests :
cluster_stats.dropped_requests()) {
dropped_requests_.emplace(input_dropped_requests.category(),
input_dropped_requests.dropped_count());
}
}
uint64_t total_successful_requests() const {
@ -236,10 +244,16 @@ class ClientStats {
return sum;
}
uint64_t total_dropped_requests() const { return total_dropped_requests_; }
uint64_t dropped_requests(const grpc::string& category) const {
auto iter = dropped_requests_.find(category);
GPR_ASSERT(iter != dropped_requests_.end());
return iter->second;
}
private:
std::map<grpc::string, LocalityStats> locality_stats_;
uint64_t total_dropped_requests_;
std::map<grpc::string, uint64_t> dropped_requests_;
};
class EdsServiceImpl : public EdsService {
@ -301,8 +315,11 @@ class EdsServiceImpl : public EdsService {
gpr_log(GPR_INFO, "LB[%p]: shut down", this);
}
static DiscoveryResponse BuildResponseForBackends(
const std::vector<std::vector<int>>& backend_ports) {
static DiscoveryResponse BuildResponse(
const std::vector<std::vector<int>>& backend_ports,
const std::map<grpc::string, uint32_t>& drop_categories = {},
const FractionalPercent::DenominatorType denominator =
FractionalPercent::MILLION) {
ClusterLoadAssignment assignment;
assignment.set_cluster_name("service name");
for (size_t i = 0; i < backend_ports.size(); ++i) {
@ -323,6 +340,18 @@ class EdsServiceImpl : public EdsService {
socket_address->set_port_value(backend_port);
}
}
if (!drop_categories.empty()) {
auto* policy = assignment.mutable_policy();
for (const auto& p : drop_categories) {
const grpc::string& name = p.first;
const uint32_t parts_per_million = p.second;
auto* drop_overload = policy->add_drop_overloads();
drop_overload->set_category(name);
auto* drop_percentage = drop_overload->mutable_drop_percentage();
drop_percentage->set_numerator(parts_per_million);
drop_percentage->set_denominator(denominator);
}
}
DiscoveryResponse response;
response.set_type_url(kEdsTypeUrl);
response.add_resources()->PackFrom(assignment);
@ -883,8 +912,7 @@ TEST_F(SingleBalancerTest, Vanilla) {
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponseForBackends(GetBackendPortsInGroups()),
0);
0, EdsServiceImpl::BuildResponse(GetBackendPortsInGroups()), 0);
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
// We need to wait for all backends to come online.
@ -912,8 +940,7 @@ TEST_F(SingleBalancerTest, SameBackendListedMultipleTimes) {
ports.push_back(backends_[0]->port());
ports.push_back(backends_[0]->port());
const size_t kNumRpcsPerAddress = 10;
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponseForBackends({ports}), 0);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse({ports}), 0);
// We need to wait for the backend to come online.
WaitForBackend(0);
// Send kNumRpcsPerAddress RPCs per server.
@ -934,8 +961,7 @@ TEST_F(SingleBalancerTest, SecureNaming) {
SetNextResolutionForLbChannel({balancers_[0]->port()});
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponseForBackends(GetBackendPortsInGroups()),
0);
0, EdsServiceImpl::BuildResponse(GetBackendPortsInGroups()), 0);
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
// We need to wait for all backends to come online.
@ -980,11 +1006,10 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
const int kCallDeadlineMs = kServerlistDelayMs * 2;
// First response is an empty serverlist, sent right away.
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponseForBackends({{}}),
0);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse({{}}), 0);
// Send non-empty serverlist only after kServerlistDelayMs
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponseForBackends(GetBackendPortsInGroups()),
0, EdsServiceImpl::BuildResponse(GetBackendPortsInGroups()),
kServerlistDelayMs);
const auto t0 = system_clock::now();
// Client will block: LB will initially send empty serverlist.
@ -1012,8 +1037,7 @@ TEST_F(SingleBalancerTest, AllServersUnreachableFailFast) {
for (size_t i = 0; i < kNumUnreachableServers; ++i) {
ports.push_back(grpc_pick_unused_port_or_die());
}
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponseForBackends({ports}), 0);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse({ports}), 0);
const Status status = SendRpc();
// The error shouldn't be DEADLINE_EXCEEDED.
EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
@ -1023,6 +1047,254 @@ TEST_F(SingleBalancerTest, AllServersUnreachableFailFast) {
EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count());
}
TEST_F(SingleBalancerTest, Drop) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 5000;
const uint32_t kDropPerMillionForLb = 100000;
const uint32_t kDropPerMillionForThrottle = 200000;
const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
const double KDropRateForLbAndThrottle =
kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
// The EDS response contains two drop categories.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(),
{{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}),
0);
WaitForAllBackends();
// Send kNumRpcs RPCs and count the drops.
size_t num_drops = 0;
for (size_t i = 0; i < kNumRpcs; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kRequestMessage_);
}
}
// The drop rate should be roughly equal to the expectation.
const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
const double kErrorTolerance = 0.2;
EXPECT_THAT(
seen_drop_rate,
::testing::AllOf(
::testing::Ge(KDropRateForLbAndThrottle * (1 - kErrorTolerance)),
::testing::Le(KDropRateForLbAndThrottle * (1 + kErrorTolerance))));
// The EDS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count());
EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count());
}
TEST_F(SingleBalancerTest, DropPerHundred) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 5000;
const uint32_t kDropPerHundredForLb = 10;
const double kDropRateForLb = kDropPerHundredForLb / 100.0;
// The EDS response contains one drop category.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(),
{{kLbDropType, kDropPerHundredForLb}},
FractionalPercent::HUNDRED),
0);
WaitForAllBackends();
// Send kNumRpcs RPCs and count the drops.
size_t num_drops = 0;
for (size_t i = 0; i < kNumRpcs; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kRequestMessage_);
}
}
// The drop rate should be roughly equal to the expectation.
const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
const double kErrorTolerance = 0.2;
EXPECT_THAT(
seen_drop_rate,
::testing::AllOf(::testing::Ge(kDropRateForLb * (1 - kErrorTolerance)),
::testing::Le(kDropRateForLb * (1 + kErrorTolerance))));
// The EDS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count());
EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count());
}
TEST_F(SingleBalancerTest, DropPerTenThousand) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 5000;
const uint32_t kDropPerTenThousandForLb = 1000;
const double kDropRateForLb = kDropPerTenThousandForLb / 10000.0;
// The EDS response contains one drop category.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(),
{{kLbDropType, kDropPerTenThousandForLb}},
FractionalPercent::TEN_THOUSAND),
0);
WaitForAllBackends();
// Send kNumRpcs RPCs and count the drops.
size_t num_drops = 0;
for (size_t i = 0; i < kNumRpcs; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kRequestMessage_);
}
}
// The drop rate should be roughly equal to the expectation.
const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
const double kErrorTolerance = 0.2;
EXPECT_THAT(
seen_drop_rate,
::testing::AllOf(::testing::Ge(kDropRateForLb * (1 - kErrorTolerance)),
::testing::Le(kDropRateForLb * (1 + kErrorTolerance))));
// The EDS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count());
EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count());
}
TEST_F(SingleBalancerTest, DropUpdate) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 5000;
const uint32_t kDropPerMillionForLb = 100000;
const uint32_t kDropPerMillionForThrottle = 200000;
const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
const double KDropRateForLbAndThrottle =
kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
// The first EDS response contains one drop category.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(),
{{kLbDropType, kDropPerMillionForLb}}),
0);
// The second EDS response contains two drop categories.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(),
{{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}),
5000);
WaitForAllBackends();
// Send kNumRpcs RPCs and count the drops.
size_t num_drops = 0;
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
for (size_t i = 0; i < kNumRpcs; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kRequestMessage_);
}
}
gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
// The drop rate should be roughly equal to the expectation.
double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
const double kErrorTolerance = 0.2;
EXPECT_THAT(
seen_drop_rate,
::testing::AllOf(::testing::Ge(kDropRateForLb * (1 - kErrorTolerance)),
::testing::Le(kDropRateForLb * (1 + kErrorTolerance))));
// Wait until the drop rate increases to the middle of the two configs, which
// implies that the update has been in effect.
const double kDropRateThreshold =
(kDropRateForLb + KDropRateForLbAndThrottle) / 2;
size_t num_rpcs = kNumRpcs;
while (seen_drop_rate < kDropRateThreshold) {
EchoResponse response;
const Status status = SendRpc(&response);
++num_rpcs;
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kRequestMessage_);
}
seen_drop_rate = static_cast<double>(num_drops) / num_rpcs;
}
// Send kNumRpcs RPCs and count the drops.
num_drops = 0;
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
for (size_t i = 0; i < kNumRpcs; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kRequestMessage_);
}
}
gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
// The new drop rate should be roughly equal to the expectation.
seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
EXPECT_THAT(
seen_drop_rate,
::testing::AllOf(
::testing::Ge(KDropRateForLbAndThrottle * (1 - kErrorTolerance)),
::testing::Le(KDropRateForLbAndThrottle * (1 + kErrorTolerance))));
// The EDS service got a single request,
EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count());
// and sent two responses
EXPECT_EQ(2U, balancers_[0]->eds_service()->response_count());
}
TEST_F(SingleBalancerTest, DropAll) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 1000;
const uint32_t kDropPerMillionForLb = 100000;
const uint32_t kDropPerMillionForThrottle = 1000000;
// The EDS response contains two drop categories.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(),
{{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}),
0);
// Send kNumRpcs RPCs and all of them are dropped.
for (size_t i = 0; i < kNumRpcs; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
EXPECT_TRUE(!status.ok() && status.error_message() ==
"Call dropped by load balancing policy");
}
// The EDS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count());
EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count());
}
TEST_F(SingleBalancerTest, Fallback) {
const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
@ -1034,7 +1306,7 @@ TEST_F(SingleBalancerTest, Fallback) {
// Send non-empty serverlist only after kServerlistDelayMs.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponseForBackends(
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(kNumBackendsInResolution /* start_index */)),
kServerlistDelayMs);
// Wait until all the fallback backends are reachable.
@ -1083,7 +1355,7 @@ TEST_F(SingleBalancerTest, FallbackUpdate) {
// Send non-empty serverlist only after kServerlistDelayMs.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponseForBackends(GetBackendPortsInGroups(
EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(
kNumBackendsInResolution +
kNumBackendsInResolutionUpdate /* start_index */)),
kServerlistDelayMs);
@ -1184,10 +1456,8 @@ TEST_F(SingleBalancerTest, FallbackIfResponseReceivedButChildNotReady) {
SetNextResolutionForLbChannelAllBalancers();
// Send a serverlist that only contains an unreachable backend before fallback
// timeout.
ScheduleResponseForBalancer(0,
EdsServiceImpl::BuildResponseForBackends(
{{grpc_pick_unused_port_or_die()}}),
0);
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponse({{grpc_pick_unused_port_or_die()}}), 0);
// Because no child policy is ready before fallback timeout, we enter fallback
// mode.
WaitForBackend(0);
@ -1199,9 +1469,12 @@ TEST_F(SingleBalancerTest, FallbackModeIsExitedWhenBalancerSaysToDropAllCalls) {
SetNextResolutionForLbChannel({grpc_pick_unused_port_or_die()});
// Enter fallback mode because the LB channel fails to connect.
WaitForBackend(0);
// Return a new balancer that sends an empty serverlist.
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponseForBackends({{}}),
0);
// Return a new balancer that sends a response to drop all calls.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(),
{{kLbDropType, 1000000}}),
0);
SetNextResolutionForLbChannelAllBalancers();
// Send RPCs until failure.
gpr_timespec deadline = gpr_time_add(
@ -1222,7 +1495,7 @@ TEST_F(SingleBalancerTest, FallbackModeIsExitedAfterChildRready) {
// Return a new balancer that sends a dead backend.
ShutdownBackend(1);
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponseForBackends({{backends_[1]->port()}}), 0);
0, EdsServiceImpl::BuildResponse({{backends_[1]->port()}}), 0);
SetNextResolutionForLbChannelAllBalancers();
// The state (TRANSIENT_FAILURE) update from the child policy will be ignored
// because we are still in fallback mode.
@ -1247,8 +1520,7 @@ TEST_F(SingleBalancerTest, BackendsRestart) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponseForBackends(GetBackendPortsInGroups()),
0);
0, EdsServiceImpl::BuildResponse(GetBackendPortsInGroups()), 0);
WaitForAllBackends();
// Stop backends. RPCs should fail.
ShutdownAllBackends();
@ -1269,10 +1541,10 @@ TEST_F(UpdatesTest, UpdateBalancersButKeepUsingOriginalBalancer) {
SetNextResolutionForLbChannelAllBalancers();
auto first_backend = GetBackendPortsInGroups(0, 1);
auto second_backend = GetBackendPortsInGroups(1, 2);
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponseForBackends(first_backend), 0);
ScheduleResponseForBalancer(
1, EdsServiceImpl::BuildResponseForBackends(second_backend), 0);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(first_backend),
0);
ScheduleResponseForBalancer(1, EdsServiceImpl::BuildResponse(second_backend),
0);
// Wait until the first backend is ready.
WaitForBackend(0);
@ -1322,10 +1594,10 @@ TEST_F(UpdatesTest, UpdateBalancerName) {
SetNextResolutionForLbChannelAllBalancers();
auto first_backend = GetBackendPortsInGroups(0, 1);
auto second_backend = GetBackendPortsInGroups(1, 2);
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponseForBackends(first_backend), 0);
ScheduleResponseForBalancer(
1, EdsServiceImpl::BuildResponseForBackends(second_backend), 0);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(first_backend),
0);
ScheduleResponseForBalancer(1, EdsServiceImpl::BuildResponse(second_backend),
0);
// Wait until the first backend is ready.
WaitForBackend(0);
@ -1393,10 +1665,10 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
SetNextResolutionForLbChannelAllBalancers();
auto first_backend = GetBackendPortsInGroups(0, 1);
auto second_backend = GetBackendPortsInGroups(1, 2);
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponseForBackends(first_backend), 0);
ScheduleResponseForBalancer(
1, EdsServiceImpl::BuildResponseForBackends(second_backend), 0);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(first_backend),
0);
ScheduleResponseForBalancer(1, EdsServiceImpl::BuildResponse(second_backend),
0);
// Wait until the first backend is ready.
WaitForBackend(0);
@ -1461,10 +1733,10 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
SetNextResolutionForLbChannel({balancers_[0]->port()});
auto first_backend = GetBackendPortsInGroups(0, 1);
auto second_backend = GetBackendPortsInGroups(1, 2);
ScheduleResponseForBalancer(
0, EdsServiceImpl::BuildResponseForBackends(first_backend), 0);
ScheduleResponseForBalancer(
1, EdsServiceImpl::BuildResponseForBackends(second_backend), 0);
ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(first_backend),
0);
ScheduleResponseForBalancer(1, EdsServiceImpl::BuildResponse(second_backend),
0);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@ -1535,14 +1807,6 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
// TODO(juanlishen): Add TEST_F(UpdatesWithClientLoadReportingTest,
// ReresolveDeadBalancer)
// The drop tests are deferred because the drop handling hasn't been added yet.
// TODO(roth): Add TEST_F(SingleBalancerTest, Drop)
// TODO(roth): Add TEST_F(SingleBalancerTest, DropAllFirst)
// TODO(roth): Add TEST_F(SingleBalancerTest, DropAll)
class SingleBalancerWithClientLoadReportingTest : public XdsEnd2endTest {
public:
SingleBalancerWithClientLoadReportingTest() : XdsEnd2endTest(4, 1, 3) {}
@ -1555,7 +1819,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
// TODO(juanlishen): Partition the backends after multiple localities is
// tested.
ScheduleResponseForBalancer(0,
EdsServiceImpl::BuildResponseForBackends(
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(0, backends_.size())),
0);
// Wait until all backends are ready.
@ -1595,7 +1859,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) {
backends_.size() - kNumBackendsFirstPass;
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponseForBackends(
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(0, kNumBackendsFirstPass)),
0);
// Wait until all backends returned by the balancer are ready.
@ -1626,7 +1890,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) {
balancers_[0]->Start(server_host_);
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponseForBackends(
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(kNumBackendsFirstPass)),
0);
// Wait for queries to start going to one of the new backends.
@ -1646,7 +1910,75 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, BalancerRestart) {
EXPECT_EQ(0U, client_stats->total_dropped_requests());
}
// TODO(juanlishen): Add TEST_F(SingleBalancerWithClientLoadReportingTest, Drop)
class SingleBalancerWithClientLoadReportingAndDropTest : public XdsEnd2endTest {
public:
SingleBalancerWithClientLoadReportingAndDropTest()
: XdsEnd2endTest(4, 1, 20) {}
};
TEST_F(SingleBalancerWithClientLoadReportingAndDropTest, Vanilla) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
const size_t kNumRpcs = 3000;
const uint32_t kDropPerMillionForLb = 100000;
const uint32_t kDropPerMillionForThrottle = 200000;
const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
const double KDropRateForLbAndThrottle =
kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
// The EDS response contains two drop categories.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(
GetBackendPortsInGroups(),
{{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}),
0);
int num_ok = 0;
int num_failure = 0;
int num_drops = 0;
std::tie(num_ok, num_failure, num_drops) = WaitForAllBackends();
const size_t num_warmup = num_ok + num_failure + num_drops;
// Send kNumRpcs RPCs and count the drops.
for (size_t i = 0; i < kNumRpcs; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kRequestMessage_);
}
}
// The drop rate should be roughly equal to the expectation.
const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
const double kErrorTolerance = 0.2;
EXPECT_THAT(
seen_drop_rate,
::testing::AllOf(
::testing::Ge(KDropRateForLbAndThrottle * (1 - kErrorTolerance)),
::testing::Le(KDropRateForLbAndThrottle * (1 + kErrorTolerance))));
// Check client stats.
ClientStats* client_stats = balancers_[0]->lrs_service()->WaitForLoadReport();
EXPECT_EQ(num_drops, client_stats->total_dropped_requests());
const size_t total_rpc = num_warmup + kNumRpcs;
EXPECT_THAT(
client_stats->dropped_requests(kLbDropType),
::testing::AllOf(
::testing::Ge(total_rpc * kDropRateForLb * (1 - kErrorTolerance)),
::testing::Le(total_rpc * kDropRateForLb * (1 + kErrorTolerance))));
EXPECT_THAT(client_stats->dropped_requests(kThrottleDropType),
::testing::AllOf(
::testing::Ge(total_rpc * (1 - kDropRateForLb) *
kDropRateForThrottle * (1 - kErrorTolerance)),
::testing::Le(total_rpc * (1 - kDropRateForLb) *
kDropRateForThrottle * (1 + kErrorTolerance))));
// The EDS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count());
EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count());
}
} // namespace
} // namespace testing

Loading…
Cancel
Save