weighted_target and RLS: delegate to child picker on error (#29870)

* weighted_target and RLS: delegate to child picker on error

* fix RLS bug that caused us to always use the last target in the list

* add comment
pull/30046/head^2
Mark D. Roth 3 years ago committed by GitHub
parent 7976501534
commit 86e282ba97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 62
      src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
  2. 44
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
  3. 1
      test/core/util/test_lb_policies.cc
  4. 38
      test/cpp/end2end/rls_end2end_test.cc
  5. 22
      test/cpp/end2end/xds/xds_cluster_end2end_test.cc
  6. 37
      test/cpp/end2end/xds/xds_core_end2end_test.cc
  7. 11
      test/cpp/end2end/xds/xds_csds_end2end_test.cc
  8. 38
      test/cpp/end2end/xds/xds_end2end_test.cc
  9. 14
      test/cpp/end2end/xds/xds_routing_end2end_test.cc

@ -1192,46 +1192,48 @@ size_t RlsLb::Cache::Entry::Size() const {
}
LoadBalancingPolicy::PickResult RlsLb::Cache::Entry::Pick(PickArgs args) {
for (const auto& child_policy_wrapper : child_policy_wrappers_) {
size_t i = 0;
ChildPolicyWrapper* child_policy_wrapper = nullptr;
// Skip targets before the last one that are in state TRANSIENT_FAILURE.
for (; i < child_policy_wrappers_.size(); ++i) {
child_policy_wrapper = child_policy_wrappers_[i].get();
if (child_policy_wrapper->connectivity_state() ==
GRPC_CHANNEL_TRANSIENT_FAILURE) {
GRPC_CHANNEL_TRANSIENT_FAILURE &&
i < child_policy_wrappers_.size() - 1) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
gpr_log(GPR_INFO,
"[rlslb %p] cache entry=%p %s: target %s in state "
"TRANSIENT_FAILURE; skipping",
"[rlslb %p] cache entry=%p %s: target %s (%" PRIuPTR
" of %" PRIuPTR ") in state TRANSIENT_FAILURE; skipping",
lb_policy_.get(), this, lru_iterator_->ToString().c_str(),
child_policy_wrapper->target().c_str());
child_policy_wrapper->target().c_str(), i,
child_policy_wrappers_.size());
}
continue;
}
// Child policy not in TRANSIENT_FAILURE, so delegate.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
gpr_log(
GPR_INFO,
"[rlslb %p] cache entry=%p %s: target %s in state %s; "
"delegating",
lb_policy_.get(), this, lru_iterator_->ToString().c_str(),
child_policy_wrapper->target().c_str(),
ConnectivityStateName(child_policy_wrapper->connectivity_state()));
}
// Add header data.
if (!header_data_.empty()) {
char* copied_header_data =
static_cast<char*>(args.call_state->Alloc(header_data_.length() + 1));
strcpy(copied_header_data, header_data_.c_str());
args.initial_metadata->Add(kRlsHeaderKey, copied_header_data);
}
return child_policy_wrapper->Pick(args);
break;
}
// No child policy found.
// Child policy not in TRANSIENT_FAILURE or is the last target in
// the list, so delegate.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
gpr_log(GPR_INFO,
"[rlslb %p] cache entry=%p %s: no healthy target found; "
"failing pick",
lb_policy_.get(), this, lru_iterator_->ToString().c_str());
}
return PickResult::Fail(
absl::UnavailableError("all RLS targets unreachable"));
"[rlslb %p] cache entry=%p %s: target %s (%" PRIuPTR " of %" PRIuPTR
") in state %s; delegating",
lb_policy_.get(), this, lru_iterator_->ToString().c_str(),
child_policy_wrapper->target().c_str(), i,
child_policy_wrappers_.size(),
ConnectivityStateName(child_policy_wrapper->connectivity_state()));
}
// Add header data.
// Note that even if the target we're using is in TRANSIENT_FAILURE,
// the pick might still succeed (e.g., if the child is ring_hash), so
// we need to pass the right header info down in all cases.
if (!header_data_.empty()) {
char* copied_header_data =
static_cast<char*>(args.call_state->Alloc(header_data_.length() + 1));
strcpy(copied_header_data, header_data_.c_str());
args.initial_metadata->Add(kRlsHeaderKey, copied_header_data);
}
return child_policy_wrapper->Pick(args);
}
void RlsLb::Cache::Entry::ResetBackoff() {

@ -338,6 +338,14 @@ void WeightedTargetLb::UpdateLocked(UpdateArgs args) {
target->UpdateLocked(config, std::move(addresses), args.args);
}
update_in_progress_ = false;
if (config_->target_map().empty()) {
absl::Status status = absl::UnavailableError(absl::StrCat(
"no children in weighted_target policy: ", args.resolution_note));
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
return;
}
UpdateStateLocked();
}
@ -354,17 +362,19 @@ void WeightedTargetLb::UpdateStateLocked() {
"connectivity state",
this);
}
// Construct a new picker which maintains a map of all child pickers
// that are ready. Each child is represented by a portion of the range
// proportional to its weight, such that the total range is the sum of the
// weights of all children.
WeightedPicker::PickerList picker_list;
uint32_t end = 0;
// Also count the number of children in each state, to determine the
// overall state.
// Construct lists of child pickers with associated weights, one for
// children that are in state READY and another for children that are
// in state TRANSIENT_FAILURE. Each child is represented by a portion of
// the range proportional to its weight, such that the total range is the
// sum of the weights of all children.
WeightedPicker::PickerList ready_picker_list;
uint32_t ready_end = 0;
WeightedPicker::PickerList tf_picker_list;
uint32_t tf_end = 0;
// Also count the number of children in CONNECTING and IDLE, to determine
// the aggregated state.
size_t num_connecting = 0;
size_t num_idle = 0;
size_t num_transient_failures = 0;
for (const auto& p : targets_) {
const std::string& child_name = p.first;
const WeightedChild* child = p.second.get();
@ -382,8 +392,8 @@ void WeightedTargetLb::UpdateStateLocked() {
switch (child->connectivity_state()) {
case GRPC_CHANNEL_READY: {
GPR_ASSERT(child->weight() > 0);
end += child->weight();
picker_list.push_back(std::make_pair(end, child->picker_wrapper()));
ready_end += child->weight();
ready_picker_list.emplace_back(ready_end, child->picker_wrapper());
break;
}
case GRPC_CHANNEL_CONNECTING: {
@ -395,7 +405,9 @@ void WeightedTargetLb::UpdateStateLocked() {
break;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
++num_transient_failures;
GPR_ASSERT(child->weight() > 0);
tf_end += child->weight();
tf_picker_list.emplace_back(tf_end, child->picker_wrapper());
break;
}
default:
@ -404,7 +416,7 @@ void WeightedTargetLb::UpdateStateLocked() {
}
// Determine aggregated connectivity state.
grpc_connectivity_state connectivity_state;
if (!picker_list.empty()) {
if (!ready_picker_list.empty()) {
connectivity_state = GRPC_CHANNEL_READY;
} else if (num_connecting > 0) {
connectivity_state = GRPC_CHANNEL_CONNECTING;
@ -421,7 +433,7 @@ void WeightedTargetLb::UpdateStateLocked() {
absl::Status status;
switch (connectivity_state) {
case GRPC_CHANNEL_READY:
picker = absl::make_unique<WeightedPicker>(std::move(picker_list));
picker = absl::make_unique<WeightedPicker>(std::move(ready_picker_list));
break;
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
@ -429,9 +441,7 @@ void WeightedTargetLb::UpdateStateLocked() {
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
break;
default:
status = absl::UnavailableError(
"weighted_target: all children report state TRANSIENT_FAILURE");
picker = absl::make_unique<TransientFailurePicker>(status);
picker = absl::make_unique<WeightedPicker>(std::move(tf_picker_list));
}
channel_control_helper()->UpdateState(connectivity_state, status,
std::move(picker));

@ -458,6 +458,7 @@ class FixedAddressLoadBalancingPolicy : public ForwardingLoadBalancingPolicy {
gpr_log(GPR_ERROR,
"%s: could not parse URI (%s), using empty address list",
kFixedAddressLbPolicyName, uri.status().ToString().c_str());
args.resolution_note = "no address in fixed_address_lb policy";
}
ForwardingLoadBalancingPolicy::UpdateLocked(std::move(args));
}

@ -1236,6 +1236,37 @@ TEST_F(RlsEnd2endTest, CacheSizeLimit) {
}
TEST_F(RlsEnd2endTest, MultipleTargets) {
StartBackends(1);
SetNextResolution(
MakeServiceConfigBuilder()
.AddKeyBuilder(absl::StrFormat("\"names\":[{"
" \"service\":\"%s\","
" \"method\":\"%s\""
"}],"
"\"headers\":["
" {"
" \"key\":\"%s\","
" \"names\":["
" \"key1\""
" ]"
" }"
"]",
kServiceValue, kMethodValue, kTestKey))
.Build());
rls_server_->service_.SetResponse(
BuildRlsRequest({{kTestKey, kTestValue}}),
BuildRlsResponse(
// Second target will report TRANSIENT_FAILURE, but should
// never be used.
{TargetStringForPort(backends_[0]->port_), "invalid_target"}));
CheckRpcSendOk(DEBUG_LOCATION,
RpcOptions().set_metadata({{"key1", kTestValue}}));
EXPECT_EQ(rls_server_->service_.request_count(), 1);
EXPECT_EQ(rls_server_->service_.response_count(), 1);
EXPECT_EQ(backends_[0]->service_.request_count(), 1);
}
TEST_F(RlsEnd2endTest, MultipleTargetsFirstInTransientFailure) {
StartBackends(1);
SetNextResolution(
MakeServiceConfigBuilder()
@ -1342,9 +1373,10 @@ TEST_F(RlsEnd2endTest, ConnectivityStateTransientFailure) {
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(/*try_to_connect=*/false));
rls_server_->service_.SetResponse(BuildRlsRequest({{kTestKey, kTestValue}}),
BuildRlsResponse({"invalid_target"}));
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
"all RLS targets unreachable",
RpcOptions().set_metadata({{"key1", kTestValue}}));
CheckRpcSendFailure(
DEBUG_LOCATION, StatusCode::UNAVAILABLE,
"empty address list: no address in fixed_address_lb policy",
RpcOptions().set_metadata({{"key1", kTestValue}}));
EXPECT_EQ(rls_server_->service_.request_count(), 1);
EXPECT_EQ(rls_server_->service_.response_count(), 1);
EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE,

@ -495,7 +495,7 @@ TEST_P(EdsTest, InitiallyEmptyServerlist) {
constexpr char kErrorMessage[] =
// TODO(roth): Improve this error message as part of
// https://github.com/grpc/grpc/issues/22883.
"weighted_target: all children report state TRANSIENT_FAILURE";
"empty address list: ";
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
// Send non-empty serverlist.
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
@ -545,9 +545,9 @@ TEST_P(EdsTest, BackendsRestart) {
// RPCs should fail.
CheckRpcSendFailure(
DEBUG_LOCATION, StatusCode::UNAVAILABLE,
// TODO(roth): Improve this error message as part of
// https://github.com/grpc/grpc/issues/22883.
"weighted_target: all children report state TRANSIENT_FAILURE");
"connections to all backends failing; last error: "
"(UNKNOWN: Failed to connect to remote host: Connection refused|"
"UNAVAILABLE: Failed to connect to remote host: FD shutdown)");
// Restart all backends. RPCs should start succeeding again.
StartAllBackends();
CheckRpcSendOk(DEBUG_LOCATION, 1,
@ -1156,11 +1156,12 @@ TEST_P(FailoverTest, UpdateInitialUnavailable) {
{"locality1", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
constexpr char kErrorMessage[] =
// TODO(roth): Improve this error message as part of
// https://github.com/grpc/grpc/issues/22883.
"weighted_target: all children report state TRANSIENT_FAILURE";
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
constexpr char kErrorMessageRegex[] =
"connections to all backends failing; last error: "
"(UNKNOWN: Failed to connect to remote host: Connection refused|"
"UNAVAILABLE: Failed to connect to remote host: FD shutdown)";
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
kErrorMessageRegex);
args = EdsResourceArgs({
{"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
0},
@ -1171,7 +1172,8 @@ TEST_P(FailoverTest, UpdateInitialUnavailable) {
WaitForBackend(DEBUG_LOCATION, 0, [&](const RpcResult& result) {
if (!result.status.ok()) {
EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
EXPECT_EQ(result.status.error_message(), kErrorMessage);
EXPECT_THAT(result.status.error_message(),
::testing::MatchesRegex(kErrorMessageRegex));
}
});
}

@ -541,23 +541,21 @@ TEST_P(TimeoutTest, CdsSecondResourceNotPresentInRequest) {
TEST_P(TimeoutTest, EdsServerIgnoresRequest) {
balancer_->ads_service()->IgnoreResourceType(kEdsTypeUrl);
CheckRpcSendFailure(
DEBUG_LOCATION, StatusCode::UNAVAILABLE,
// TODO(roth): Improve this error message as part of
// https://github.com/grpc/grpc/issues/22883.
"weighted_target: all children report state TRANSIENT_FAILURE",
RpcOptions().set_timeout_ms(4000));
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
// TODO(roth): Improve this error message as part of
// https://github.com/grpc/grpc/issues/22883.
"no children in weighted_target policy: ",
RpcOptions().set_timeout_ms(4000));
}
TEST_P(TimeoutTest, EdsResourceNotPresentInRequest) {
// No need to remove EDS resource, since the test suite does not add it
// by default.
CheckRpcSendFailure(
DEBUG_LOCATION, StatusCode::UNAVAILABLE,
// TODO(roth): Improve this error message as part of
// https://github.com/grpc/grpc/issues/22883.
"weighted_target: all children report state TRANSIENT_FAILURE",
RpcOptions().set_timeout_ms(4000));
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
// TODO(roth): Improve this error message as part of
// https://github.com/grpc/grpc/issues/22883.
"no children in weighted_target policy: ",
RpcOptions().set_timeout_ms(4000));
}
TEST_P(TimeoutTest, EdsSecondResourceNotPresentInRequest) {
@ -586,11 +584,10 @@ TEST_P(TimeoutTest, EdsSecondResourceNotPresentInRequest) {
[](const RpcResult& result) {
if (result.status.ok()) return true; // Keep going.
EXPECT_EQ(StatusCode::UNAVAILABLE, result.status.error_code());
// TODO(roth): Improve this error message as part of
// https://github.com/grpc/grpc/issues/22883.
EXPECT_EQ(
result.status.error_message(),
"weighted_target: all children report state TRANSIENT_FAILURE");
EXPECT_EQ(result.status.error_message(),
// TODO(roth): Improve this error message as part of
// https://github.com/grpc/grpc/issues/22883.
"no children in weighted_target policy: ");
return false;
},
/*timeout_ms=*/30000,
@ -1051,10 +1048,10 @@ TEST_P(XdsFederationTest, EdsResourceNameAuthorityUnknown) {
EchoResponse response;
grpc::Status status = stub2->Echo(&context, request, &response);
EXPECT_EQ(status.error_code(), StatusCode::UNAVAILABLE);
// TODO(roth): Improve this error message as part of
// https://github.com/grpc/grpc/issues/22883.
EXPECT_EQ(status.error_message(),
"weighted_target: all children report state TRANSIENT_FAILURE");
// TODO(roth): Improve this error message as part of
// https://github.com/grpc/grpc/issues/22883.
"no children in weighted_target policy: ");
ASSERT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel2->GetState(false));
}

@ -680,12 +680,11 @@ TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpClusterDoesNotExist) {
TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpEndpointDoesNotExist) {
int kTimeoutMillisecond = 1000000; // 1000s wait for the transient failure.
balancer_->ads_service()->UnsetResource(kEdsTypeUrl, kDefaultEdsServiceName);
CheckRpcSendFailure(
DEBUG_LOCATION, StatusCode::UNAVAILABLE,
// TODO(roth): Improve this error message as part of
// https://github.com/grpc/grpc/issues/22883.
"weighted_target: all children report state TRANSIENT_FAILURE",
RpcOptions().set_timeout_ms(kTimeoutMillisecond));
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
// TODO(roth): Improve this error message as part of
// https://github.com/grpc/grpc/issues/22883.
"no children in weighted_target policy: ",
RpcOptions().set_timeout_ms(kTimeoutMillisecond));
auto csds_response = FetchCsdsResponse();
EXPECT_THAT(
csds_response.config(0).generic_xds_configs(),

@ -372,11 +372,19 @@ class XdsSecurityTest : public XdsEnd2endTest {
WaitForBackend(DEBUG_LOCATION, 0, [](const RpcResult& result) {
if (!result.status.ok()) {
EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
EXPECT_EQ(result.status.error_message(),
// TODO(roth): Improve this message as part of
// https://github.com/grpc/grpc/issues/22883.
"weighted_target: all children report state "
"TRANSIENT_FAILURE");
// TODO(yashkt): Rework this test suite such that the caller
// explicitly indicates which failure they're allowed to see here,
// rather than blindly allowing every possibility in every test.
// TODO(roth): Plumb a better error out of the handshakers
// as part of https://github.com/grpc/grpc/issues/22883.
EXPECT_THAT(
result.status.error_message(),
::testing::MatchesRegex(
"connections to all backends failing; last error: "
"(UNKNOWN: Failed to connect to remote host: Connection "
"(refused|reset by peer)|UNAVAILABLE: Failed to connect "
"to remote host: FD shutdown|UNKNOWN: Handshake failed|"
"UNAVAILABLE: Socket closed)"));
}
});
Status status = SendRpc();
@ -776,15 +784,7 @@ TEST_P(XdsSecurityTest, TestTlsConfigurationInCombinedValidationContext) {
->set_instance_name("fake_plugin1");
transport_socket->mutable_typed_config()->PackFrom(upstream_tls_context);
balancer_->ads_service()->SetCdsResource(cluster);
WaitForBackend(DEBUG_LOCATION, 0, [](const RpcResult& result) {
if (!result.status.ok()) {
EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
EXPECT_EQ(result.status.error_message(),
// TODO(roth): Improve this message as part of
// https://github.com/grpc/grpc/issues/22883.
"weighted_target: all children report state TRANSIENT_FAILURE");
}
});
CheckRpcSendOk(DEBUG_LOCATION);
}
// TODO(yashykt): Remove this test once we stop supporting old fields
@ -801,15 +801,7 @@ TEST_P(XdsSecurityTest,
->set_instance_name("fake_plugin1");
transport_socket->mutable_typed_config()->PackFrom(upstream_tls_context);
balancer_->ads_service()->SetCdsResource(cluster);
WaitForBackend(DEBUG_LOCATION, 0, [](const RpcResult& result) {
if (!result.status.ok()) {
EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
EXPECT_EQ(result.status.error_message(),
// TODO(roth): Improve this message as part of
// https://github.com/grpc/grpc/issues/22883.
"weighted_target: all children report state TRANSIENT_FAILURE");
}
});
CheckRpcSendOk(DEBUG_LOCATION);
}
TEST_P(XdsSecurityTest, TestMtlsConfigurationWithNoSanMatchers) {

@ -1824,11 +1824,12 @@ TEST_P(LdsRdsTest, XdsRoutingClusterUpdateClustersWithPickingDelays) {
RpcOptions().set_wait_for_ready(true).set_timeout_ms(0));
// Send a non-wait_for_ready RPC, which should fail. This tells us
// that the client has received the update and attempted to connect.
constexpr char kErrorMessage[] =
// TODO(roth): Improve this error message as part of
// https://github.com/grpc/grpc/issues/22883.
"weighted_target: all children report state TRANSIENT_FAILURE";
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
constexpr char kErrorMessageRegex[] =
"connections to all backends failing; last error: "
"(UNKNOWN: Failed to connect to remote host: Connection refused|"
"UNAVAILABLE: Failed to connect to remote host: FD shutdown)";
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
kErrorMessageRegex);
// Now create a new cluster, pointing to backend 1.
const char* kNewClusterName = "new_cluster";
const char* kNewEdsServiceName = "new_eds_service_name";
@ -1854,7 +1855,8 @@ TEST_P(LdsRdsTest, XdsRoutingClusterUpdateClustersWithPickingDelays) {
[&](const RpcResult& result) {
if (!result.status.ok()) {
EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
EXPECT_EQ(result.status.error_message(), kErrorMessage);
EXPECT_THAT(result.status.error_message(),
::testing::MatchesRegex(kErrorMessageRegex));
}
},
WaitForBackendOptions().set_reset_counters(false));

Loading…
Cancel
Save