Merge pull request #22502 from markdroth/xds_drop_all_fix

xds: don't report TRANSIENT_FAILURE when we're told to drop all
pull/22518/head
Mark D. Roth 5 years ago committed by GitHub
commit 0c1414f1c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      src/core/ext/filters/client_channel/client_channel.cc
  2. 21
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  3. 9
      src/core/ext/filters/client_channel/xds/xds_api.cc
  4. 5
      src/core/ext/filters/client_channel/xds/xds_api.h
  5. 2
      src/core/ext/filters/client_channel/xds/xds_client.cc
  6. 8
      test/cpp/end2end/xds_end2end_test.cc

@ -3975,8 +3975,10 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem,
if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem);
// Handle drops.
if (GPR_UNLIKELY(result.subchannel == nullptr)) {
result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy");
result.error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
} else {
// Grab a ref to the connected subchannel while we're still
// holding the data plane mutex.

@ -496,6 +496,16 @@ XdsLb::PickResult XdsLb::LocalityPicker::Pick(PickArgs args) {
result.type = PickResult::PICK_COMPLETE;
return result;
}
// If we didn't drop, we better have some localities to pick from.
if (pickers_.empty()) { // Should never happen.
PickResult result;
result.type = PickResult::PICK_FAILED;
result.error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"xds picker not given any localities"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
return result;
}
// Generate a random number in [0, total weight).
const uint32_t key = rand() % pickers_[pickers_.size() - 1].first;
// Forward pick to whichever locality maps to the range in which the
@ -570,7 +580,7 @@ class XdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface {
}
// If the balancer tells us to drop all the calls, we should exit fallback
// mode immediately.
if (update.drop_all) xds_policy_->MaybeExitFallbackMode();
if (update.drop_config->drop_all()) xds_policy_->MaybeExitFallbackMode();
// Update the drop config.
const bool drop_config_changed =
xds_policy_->drop_config_ == nullptr ||
@ -919,6 +929,15 @@ void XdsLb::UpdatePrioritiesLocked(bool update_locality_stats) {
void XdsLb::UpdateXdsPickerLocked() {
// If we are in fallback mode, don't generate an xds picker from localities.
if (fallback_policy_ != nullptr) return;
// If we're dropping all calls, report READY, even though we won't
// have a selected priority.
if (drop_config_ != nullptr && drop_config_->drop_all()) {
channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY,
absl::make_unique<LocalityPicker>(this, LocalityPicker::PickerList{}));
return;
}
// If we don't have a selected priority, report TRANSIENT_FAILURE.
if (current_priority_ == UINT32_MAX) {
if (fallback_policy_ == nullptr) {
grpc_error* error = grpc_error_set_int(

@ -1310,7 +1310,7 @@ grpc_error* LocalityParse(
grpc_error* DropParseAndAppend(
const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* drop_overload,
XdsApi::DropConfig* drop_config, bool* drop_all) {
XdsApi::DropConfig* drop_config) {
// Get the category.
upb_strview category =
envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_category(
@ -1341,7 +1341,6 @@ grpc_error* DropParseAndAppend(
}
// Cap numerator to 1000000.
numerator = GPR_MIN(numerator, 1000000);
if (numerator == 1000000) *drop_all = true;
drop_config->AddCategory(std::string(category.data, category.size),
numerator);
return GRPC_ERROR_NONE;
@ -1417,13 +1416,13 @@ grpc_error* EdsResponseParse(
policy, &drop_size);
for (size_t j = 0; j < drop_size; ++j) {
grpc_error* error =
DropParseAndAppend(drop_overload[j], eds_update.drop_config.get(),
&eds_update.drop_all);
DropParseAndAppend(drop_overload[j], eds_update.drop_config.get());
if (error != GRPC_ERROR_NONE) return error;
}
}
// Validate the update content.
if (eds_update.priority_list_update.empty() && !eds_update.drop_all) {
if (eds_update.priority_list_update.empty() &&
!eds_update.drop_config->drop_all()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"EDS response doesn't contain any valid "
"locality but doesn't require to drop all calls.");

@ -164,6 +164,7 @@ class XdsApi {
void AddCategory(std::string name, uint32_t parts_per_million) {
drop_category_list_.emplace_back(
DropCategory{std::move(name), parts_per_million});
if (parts_per_million == 1000000) drop_all_ = true;
}
// The only method invoked from the data plane combiner.
@ -173,6 +174,8 @@ class XdsApi {
return drop_category_list_;
}
bool drop_all() const { return drop_all_; }
bool operator==(const DropConfig& other) const {
return drop_category_list_ == other.drop_category_list_;
}
@ -180,12 +183,12 @@ class XdsApi {
private:
DropCategoryList drop_category_list_;
bool drop_all_ = false;
};
struct EdsUpdate {
PriorityListUpdate priority_list_update;
RefCountedPtr<DropConfig> drop_config;
bool drop_all = false;
};
using EdsUpdateMap = std::map<std::string /*eds_service_name*/, EdsUpdate>;

@ -1074,7 +1074,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
" drop categories received (drop_all=%d)",
xds_client(), eds_update.priority_list_update.size(),
eds_update.drop_config->drop_category_list().size(),
eds_update.drop_all);
eds_update.drop_config->drop_all());
for (size_t priority = 0;
priority < eds_update.priority_list_update.size(); ++priority) {
const auto* locality_map_update = eds_update.priority_list_update.Find(

@ -3024,9 +3024,7 @@ TEST_P(DropTest, DropAll) {
const uint32_t kDropPerMillionForLb = 100000;
const uint32_t kDropPerMillionForThrottle = 1000000;
// The ADS response contains two drop categories.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts()},
});
AdsServiceImpl::EdsResourceArgs args;
args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}};
balancers_[0]->ads_service()->SetEdsResource(
@ -3035,8 +3033,8 @@ TEST_P(DropTest, DropAll) {
for (size_t i = 0; i < kNumRpcs; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
EXPECT_TRUE(!status.ok() && status.error_message() ==
"Call dropped by load balancing policy");
EXPECT_EQ(status.error_code(), StatusCode::UNAVAILABLE);
EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
}
}

Loading…
Cancel
Save