xds: don't report TRANSIENT_FAILURE when we're told to drop all

pull/22502/head
Mark D. Roth 5 years ago
parent 348c5ec813
commit 0e1c63a7a1
  1. 6
      src/core/ext/filters/client_channel/client_channel.cc
  2. 21
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  3. 9
      src/core/ext/filters/client_channel/xds/xds_api.cc
  4. 5
      src/core/ext/filters/client_channel/xds/xds_api.h
  5. 2
      src/core/ext/filters/client_channel/xds/xds_client.cc
  6. 8
      test/cpp/end2end/xds_end2end_test.cc

@ -3975,8 +3975,10 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem,
if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem); if (pick_queued_) RemoveCallFromQueuedPicksLocked(elem);
// Handle drops. // Handle drops.
if (GPR_UNLIKELY(result.subchannel == nullptr)) { if (GPR_UNLIKELY(result.subchannel == nullptr)) {
result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( result.error = grpc_error_set_int(
"Call dropped by load balancing policy"); GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
} else { } else {
// Grab a ref to the connected subchannel while we're still // Grab a ref to the connected subchannel while we're still
// holding the data plane mutex. // holding the data plane mutex.

@ -496,6 +496,16 @@ XdsLb::PickResult XdsLb::LocalityPicker::Pick(PickArgs args) {
result.type = PickResult::PICK_COMPLETE; result.type = PickResult::PICK_COMPLETE;
return result; return result;
} }
// If we didn't drop, we better have some localities to pick from.
if (pickers_.empty()) { // Should never happen.
PickResult result;
result.type = PickResult::PICK_FAILED;
result.error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"xds picker not given any localities"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
return result;
}
// Generate a random number in [0, total weight). // Generate a random number in [0, total weight).
const uint32_t key = rand() % pickers_[pickers_.size() - 1].first; const uint32_t key = rand() % pickers_[pickers_.size() - 1].first;
// Forward pick to whichever locality maps to the range in which the // Forward pick to whichever locality maps to the range in which the
@ -570,7 +580,7 @@ class XdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface {
} }
// If the balancer tells us to drop all the calls, we should exit fallback // If the balancer tells us to drop all the calls, we should exit fallback
// mode immediately. // mode immediately.
if (update.drop_all) xds_policy_->MaybeExitFallbackMode(); if (update.drop_config->drop_all()) xds_policy_->MaybeExitFallbackMode();
// Update the drop config. // Update the drop config.
const bool drop_config_changed = const bool drop_config_changed =
xds_policy_->drop_config_ == nullptr || xds_policy_->drop_config_ == nullptr ||
@ -919,6 +929,15 @@ void XdsLb::UpdatePrioritiesLocked(bool update_locality_stats) {
void XdsLb::UpdateXdsPickerLocked() { void XdsLb::UpdateXdsPickerLocked() {
// If we are in fallback mode, don't generate an xds picker from localities. // If we are in fallback mode, don't generate an xds picker from localities.
if (fallback_policy_ != nullptr) return; if (fallback_policy_ != nullptr) return;
// If we're dropping all calls, report READY, even though we won't
// have a selected priority.
if (drop_config_ != nullptr && drop_config_->drop_all()) {
channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY,
absl::make_unique<LocalityPicker>(this, LocalityPicker::PickerList{}));
return;
}
// If we don't have a selected priority, report TRANSIENT_FAILURE.
if (current_priority_ == UINT32_MAX) { if (current_priority_ == UINT32_MAX) {
if (fallback_policy_ == nullptr) { if (fallback_policy_ == nullptr) {
grpc_error* error = grpc_error_set_int( grpc_error* error = grpc_error_set_int(

@ -1310,7 +1310,7 @@ grpc_error* LocalityParse(
grpc_error* DropParseAndAppend( grpc_error* DropParseAndAppend(
const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* drop_overload, const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* drop_overload,
XdsApi::DropConfig* drop_config, bool* drop_all) { XdsApi::DropConfig* drop_config) {
// Get the category. // Get the category.
upb_strview category = upb_strview category =
envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_category( envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_category(
@ -1341,7 +1341,6 @@ grpc_error* DropParseAndAppend(
} }
// Cap numerator to 1000000. // Cap numerator to 1000000.
numerator = GPR_MIN(numerator, 1000000); numerator = GPR_MIN(numerator, 1000000);
if (numerator == 1000000) *drop_all = true;
drop_config->AddCategory(std::string(category.data, category.size), drop_config->AddCategory(std::string(category.data, category.size),
numerator); numerator);
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
@ -1417,13 +1416,13 @@ grpc_error* EdsResponseParse(
policy, &drop_size); policy, &drop_size);
for (size_t j = 0; j < drop_size; ++j) { for (size_t j = 0; j < drop_size; ++j) {
grpc_error* error = grpc_error* error =
DropParseAndAppend(drop_overload[j], eds_update.drop_config.get(), DropParseAndAppend(drop_overload[j], eds_update.drop_config.get());
&eds_update.drop_all);
if (error != GRPC_ERROR_NONE) return error; if (error != GRPC_ERROR_NONE) return error;
} }
} }
// Validate the update content. // Validate the update content.
if (eds_update.priority_list_update.empty() && !eds_update.drop_all) { if (eds_update.priority_list_update.empty() &&
!eds_update.drop_config->drop_all()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING( return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"EDS response doesn't contain any valid " "EDS response doesn't contain any valid "
"locality but doesn't require to drop all calls."); "locality but doesn't require to drop all calls.");

@ -164,6 +164,7 @@ class XdsApi {
void AddCategory(std::string name, uint32_t parts_per_million) { void AddCategory(std::string name, uint32_t parts_per_million) {
drop_category_list_.emplace_back( drop_category_list_.emplace_back(
DropCategory{std::move(name), parts_per_million}); DropCategory{std::move(name), parts_per_million});
if (parts_per_million == 1000000) drop_all_ = true;
} }
// The only method invoked from the data plane combiner. // The only method invoked from the data plane combiner.
@ -173,6 +174,8 @@ class XdsApi {
return drop_category_list_; return drop_category_list_;
} }
bool drop_all() const { return drop_all_; }
bool operator==(const DropConfig& other) const { bool operator==(const DropConfig& other) const {
return drop_category_list_ == other.drop_category_list_; return drop_category_list_ == other.drop_category_list_;
} }
@ -180,12 +183,12 @@ class XdsApi {
private: private:
DropCategoryList drop_category_list_; DropCategoryList drop_category_list_;
bool drop_all_ = false;
}; };
struct EdsUpdate { struct EdsUpdate {
PriorityListUpdate priority_list_update; PriorityListUpdate priority_list_update;
RefCountedPtr<DropConfig> drop_config; RefCountedPtr<DropConfig> drop_config;
bool drop_all = false;
}; };
using EdsUpdateMap = std::map<std::string /*eds_service_name*/, EdsUpdate>; using EdsUpdateMap = std::map<std::string /*eds_service_name*/, EdsUpdate>;

@ -1074,7 +1074,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
" drop categories received (drop_all=%d)", " drop categories received (drop_all=%d)",
xds_client(), eds_update.priority_list_update.size(), xds_client(), eds_update.priority_list_update.size(),
eds_update.drop_config->drop_category_list().size(), eds_update.drop_config->drop_category_list().size(),
eds_update.drop_all); eds_update.drop_config->drop_all());
for (size_t priority = 0; for (size_t priority = 0;
priority < eds_update.priority_list_update.size(); ++priority) { priority < eds_update.priority_list_update.size(); ++priority) {
const auto* locality_map_update = eds_update.priority_list_update.Find( const auto* locality_map_update = eds_update.priority_list_update.Find(

@ -3024,9 +3024,7 @@ TEST_P(DropTest, DropAll) {
const uint32_t kDropPerMillionForLb = 100000; const uint32_t kDropPerMillionForLb = 100000;
const uint32_t kDropPerMillionForThrottle = 1000000; const uint32_t kDropPerMillionForThrottle = 1000000;
// The ADS response contains two drop categories. // The ADS response contains two drop categories.
AdsServiceImpl::EdsResourceArgs args({ AdsServiceImpl::EdsResourceArgs args;
{"locality0", GetBackendPorts()},
});
args.drop_categories = {{kLbDropType, kDropPerMillionForLb}, args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}; {kThrottleDropType, kDropPerMillionForThrottle}};
balancers_[0]->ads_service()->SetEdsResource( balancers_[0]->ads_service()->SetEdsResource(
@ -3035,8 +3033,8 @@ TEST_P(DropTest, DropAll) {
for (size_t i = 0; i < kNumRpcs; ++i) { for (size_t i = 0; i < kNumRpcs; ++i) {
EchoResponse response; EchoResponse response;
const Status status = SendRpc(&response); const Status status = SendRpc(&response);
EXPECT_TRUE(!status.ok() && status.error_message() == EXPECT_EQ(status.error_code(), StatusCode::UNAVAILABLE);
"Call dropped by load balancing policy"); EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
} }
} }

Loading…
Cancel
Save