ring hash: fix picker propagation bug in xds_cluster_manager policy (#29959)

* ring hash: fix picker propagation bug in xds_cluster_manager policy

* fix build

* fix build
pull/29735/head
Mark D. Roth 3 years ago committed by GitHub
parent e6c6840db3
commit 470a3066c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
  2. 6
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  3. 19
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
  4. 13
      test/cpp/end2end/xds/xds_end2end_test_lib.cc
  5. 13
      test/cpp/end2end/xds/xds_end2end_test_lib.h
  6. 82
      test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc

@ -219,7 +219,6 @@ class WeightedTargetLb : public LoadBalancingPolicy {
RefCountedPtr<ChildPickerWrapper> picker_wrapper_;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
bool seen_failure_since_ready_ = false;
OrphanablePtr<DelayedRemovalTimer> delayed_removal_timer_;
};
@ -610,19 +609,12 @@ void WeightedTargetLb::WeightedChild::OnConnectivityStateUpdateLocked(
// If the child reports IDLE, immediately tell it to exit idle.
if (state == GRPC_CHANNEL_IDLE) child_policy_->ExitIdleLocked();
// Decide what state to report for aggregation purposes.
// If we haven't seen a failure since the last time we were in state
// READY, then we report the state change as-is. However, once we do see
// a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
// changes until we go back into state READY.
if (!seen_failure_since_ready_) {
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
seen_failure_since_ready_ = true;
}
} else {
if (state != GRPC_CHANNEL_READY) return;
seen_failure_since_ready_ = false;
// If the last recorded state was TRANSIENT_FAILURE and the new state
// is something other than READY, don't change the state.
if (connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE ||
state == GRPC_CHANNEL_READY) {
connectivity_state_ = state;
}
connectivity_state_ = state;
// Notify the LB policy.
weighted_target_policy_->UpdateStateLocked();
}

@ -530,8 +530,7 @@ void XdsClusterImplLb::MaybeUpdatePickerLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_impl_lb %p] updating connectivity (drop all): "
"state=READY "
"picker=%p",
"state=READY picker=%p",
this, drop_picker.get());
}
channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
@ -544,8 +543,7 @@ void XdsClusterImplLb::MaybeUpdatePickerLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_impl_lb %p] updating connectivity: state=%s "
"status=(%s) "
"picker=%p",
"status=(%s) picker=%p",
this, ConnectivityStateName(state_), status_.ToString().c_str(),
drop_picker.get());
}

@ -197,7 +197,6 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
RefCountedPtr<ChildPickerWrapper> picker_wrapper_;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
bool seen_failure_since_ready_ = false;
// States for delayed removal.
grpc_timer delayed_removal_timer_;
@ -577,19 +576,13 @@ void XdsClusterManagerLb::ClusterChild::Helper::UpdateState(
MakeRefCounted<ChildPickerWrapper>(xds_cluster_manager_child_->name_,
std::move(picker));
// Decide what state to report for aggregation purposes.
// If we haven't seen a failure since the last time we were in state
// READY, then we report the state change as-is. However, once we do see
// a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
// changes until we go back into state READY.
if (!xds_cluster_manager_child_->seen_failure_since_ready_) {
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
xds_cluster_manager_child_->seen_failure_since_ready_ = true;
}
} else {
if (state != GRPC_CHANNEL_READY) return;
xds_cluster_manager_child_->seen_failure_since_ready_ = false;
// If the last recorded state was TRANSIENT_FAILURE and the new state
// is something other than READY, don't change the state.
if (xds_cluster_manager_child_->connectivity_state_ !=
GRPC_CHANNEL_TRANSIENT_FAILURE ||
state == GRPC_CHANNEL_READY) {
xds_cluster_manager_child_->connectivity_state_ = state;
}
xds_cluster_manager_child_->connectivity_state_ = state;
// Notify the LB policy.
xds_cluster_manager_child_->xds_cluster_manager_policy_->UpdateStateLocked();
}

@ -848,6 +848,19 @@ void XdsEnd2endTest::CheckRpcSendOk(
}
}
void XdsEnd2endTest::CheckRpcSendFailure(
const grpc_core::DebugLocation& debug_location, StatusCode expected_status,
absl::string_view expected_message_regex, const RpcOptions& rpc_options) {
const Status status = SendRpc(rpc_options);
EXPECT_FALSE(status.ok())
<< debug_location.file() << ":" << debug_location.line();
EXPECT_EQ(expected_status, status.error_code())
<< debug_location.file() << ":" << debug_location.line();
EXPECT_THAT(status.error_message(),
::testing::ContainsRegex(expected_message_regex))
<< debug_location.file() << ":" << debug_location.line();
}
void XdsEnd2endTest::CheckRpcSendFailure(
const grpc_core::DebugLocation& debug_location,
const CheckRpcSendFailureOptions& options) {

@ -806,7 +806,16 @@ class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType> {
const size_t times = 1,
const RpcOptions& rpc_options = RpcOptions());
// Options to use with CheckRpcSendFailure().
// Sends one RPC, which must fail with the specified status code and
// a message matching the specified regex.
void CheckRpcSendFailure(const grpc_core::DebugLocation& debug_location,
StatusCode expected_status,
absl::string_view expected_message_regex,
const RpcOptions& rpc_options = RpcOptions());
// DEPRECATED -- USE THE ABOVE VARIANT INSTEAD.
// TODO(roth): Change all existing callers to use the above variant
// instead and then remove this.
struct CheckRpcSendFailureOptions {
std::function<bool(size_t)> continue_predicate = [](size_t i) {
return i < 1;
@ -837,8 +846,6 @@ class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType> {
return *this;
}
};
// Sends RPCs and expects them to fail.
void CheckRpcSendFailure(
const grpc_core::DebugLocation& debug_location,
const CheckRpcSendFailureOptions& options = CheckRpcSendFailureOptions());

@ -971,9 +971,12 @@ TEST_P(RingHashTest, ReattemptWhenAllEndpointsUnreachable) {
{"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
ShutdownBackend(0);
CheckRpcSendFailure(DEBUG_LOCATION,
CheckRpcSendFailureOptions().set_rpc_options(
RpcOptions().set_metadata(std::move(metadata))));
CheckRpcSendFailure(
DEBUG_LOCATION, StatusCode::UNAVAILABLE,
"ring hash cannot find a connected subchannel; first failure: "
"(UNKNOWN: Failed to connect to remote host: Connection refused|"
"UNAVAILABLE: Failed to connect to remote host: FD shutdown)",
RpcOptions().set_metadata(std::move(metadata)));
StartBackend(0);
// Ensure we are actively connecting without any traffic.
EXPECT_TRUE(channel_->WaitForConnected(
@ -1003,20 +1006,31 @@ TEST_P(RingHashTest, TransientFailureSkipToAvailableReady) {
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
const auto rpc_options = RpcOptions()
.set_metadata(std::move(metadata))
.set_timeout_ms(kConnectionTimeoutMilliseconds);
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
gpr_log(GPR_INFO, "=== SENDING FIRST RPC ===");
CheckRpcSendFailure(
DEBUG_LOCATION,
CheckRpcSendFailureOptions().set_rpc_options(rpc_options));
DEBUG_LOCATION, StatusCode::UNAVAILABLE,
"ring hash cannot find a connected subchannel; first failure: "
"(UNKNOWN: Failed to connect to remote host: Connection refused|"
"UNAVAILABLE: Failed to connect to remote host: FD shutdown)",
rpc_options);
gpr_log(GPR_INFO, "=== DONE WITH FIRST RPC ===");
EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
// Bring up backend 0. The channel should become connected without
// any picks, because in TF, we are always trying to connect to at
// least one backend at all times.
gpr_log(GPR_INFO, "=== STARTING BACKEND 0 ===");
StartBackend(0);
gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL TO BECOME READY ===");
EXPECT_TRUE(channel_->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
// RPCs should go to backend 0.
gpr_log(GPR_INFO, "=== WAITING FOR BACKEND 0 ===");
WaitForBackend(DEBUG_LOCATION, 0, WaitForBackendOptions(), rpc_options);
EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
// Bring down backend 0 and bring up backend 1.
// Note the RPC contains a header value that will always be hashed to
// backend 0. So by purposely bringing down backend 0 and bringing up another
@ -1025,14 +1039,66 @@ TEST_P(RingHashTest, TransientFailureSkipToAvailableReady) {
// Since the the entries in the ring are pretty distributed and we have
// unused ports to fill the ring, it is almost guaranteed that the Picker
// will go through some non-READY entries and skip them as per design.
gpr_log(GPR_INFO, "=== SHUTTING DOWN BACKEND 0 ===");
ShutdownBackend(0);
gpr_log(GPR_INFO, "=== WAITING FOR STATE CHANGE ===");
EXPECT_TRUE(channel_->WaitForStateChange(
GRPC_CHANNEL_READY,
grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
gpr_log(GPR_INFO, "=== SENDING SECOND RPC ===");
CheckRpcSendFailure(
DEBUG_LOCATION,
CheckRpcSendFailureOptions().set_rpc_options(rpc_options));
DEBUG_LOCATION, StatusCode::UNAVAILABLE,
"ring hash cannot find a connected subchannel; first failure: "
"(UNKNOWN: Failed to connect to remote host: Connection refused|"
"UNAVAILABLE: Failed to connect to remote host: FD shutdown)",
rpc_options);
gpr_log(GPR_INFO, "=== STARTING BACKEND 1 ===");
StartBackend(1);
gpr_log(GPR_INFO, "=== WAITING FOR CHANNEL TO BECOME READY ===");
EXPECT_TRUE(channel_->WaitForConnected(
grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
gpr_log(GPR_INFO, "=== WAITING FOR BACKEND 1 ===");
WaitForBackend(DEBUG_LOCATION, 1, WaitForBackendOptions(), rpc_options);
gpr_log(GPR_INFO, "=== DONE ===");
}
// This tests a bug seen in the wild where ring_hash started with no
// endpoints and reported TRANSIENT_FAILURE, then got an update with
// endpoints and reported IDLE, but the picker update was squelched, so
// it failed to ever get reconnected.
TEST_P(RingHashTest, ReattemptWhenGoingFromTransientFailureToIdle) {
CreateAndStartBackends(1);
const uint32_t kConnectionTimeoutMilliseconds = 5000;
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
auto new_route_config = default_route_config_;
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// Send empty EDS update.
EdsResourceArgs args(
{{"locality0", std::vector<EdsResourceArgs::Endpoint>()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
// Channel should fail RPCs and go into TRANSIENT_FAILURE.
CheckRpcSendFailure(
DEBUG_LOCATION, StatusCode::UNAVAILABLE,
// TODO(roth): As part of https://github.com/grpc/grpc/issues/22883,
// figure out how to get a useful resolution note plumbed down to
// improve this message.
"empty address list: ",
RpcOptions().set_timeout_ms(kConnectionTimeoutMilliseconds));
EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
// Send EDS update with 1 backend.
args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// A wait_for_ready RPC should succeed, and the channel should report READY.
CheckRpcSendOk(DEBUG_LOCATION, 1,
RpcOptions()
.set_timeout_ms(kConnectionTimeoutMilliseconds)
.set_wait_for_ready(true));
EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
}
// Test unspported hash policy types are all ignored before a supported

Loading…
Cancel
Save