ring_hash: trigger internal connection attemps on only one subchannel at a time (#29422)

* ring_hash: trigger internal connection attemps on only one subchannel at a time

* simplify logic

* add test

* clang-format
pull/28928/head^2
Mark D. Roth 3 years ago committed by GitHub
parent f28695351e
commit 50df29bdec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 35
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  2. 46
      test/cpp/end2end/connection_delay_injector.cc
  3. 24
      test/cpp/end2end/connection_delay_injector.h
  4. 185
      test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc

@ -202,7 +202,10 @@ class RingHash : public LoadBalancingPolicy {
// The index parameter indicates the index into the list of the subchannel
// whose status report triggered the call to
// UpdateRingHashConnectivityStateLocked().
void UpdateRingHashConnectivityStateLocked(size_t index);
// connection_attempt_complete is true if the subchannel just
// finished a connection attempt.
void UpdateRingHashConnectivityStateLocked(
size_t index, bool connection_attempt_complete);
// Create a new ring from this subchannel list.
RefCountedPtr<Ring> MakeRing();
@ -212,6 +215,10 @@ class RingHash : public LoadBalancingPolicy {
size_t num_ready_ = 0;
size_t num_connecting_ = 0;
size_t num_transient_failure_ = 0;
// The index of the subchannel currently doing an internally
// triggered connection attempt, if any.
absl::optional<size_t> internally_triggered_connection_index_;
};
class Ring : public RefCounted<Ring> {
@ -532,7 +539,8 @@ void RingHash::RingHashSubchannelList::StartWatchingLocked() {
// Pretend we're getting this update from the last subchannel, so that
// if we need to proactively start connecting, we'll start from the
// first subchannel.
UpdateRingHashConnectivityStateLocked(num_subchannels() - 1);
UpdateRingHashConnectivityStateLocked(num_subchannels() - 1,
/*connection_attempt_complete=*/false);
}
void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
@ -563,7 +571,7 @@ void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
}
void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked(
size_t index) {
size_t index, bool connection_attempt_complete) {
RingHash* p = static_cast<RingHash*>(policy());
// Only set connectivity state if this is the current subchannel list.
if (p->subchannel_list_.get() != this) return;
@ -624,8 +632,22 @@ void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked(
// Note that we do the same thing when the policy is in state
// CONNECTING, just to ensure that we don't remain in CONNECTING state
// indefinitely if there are no new picks coming in.
if (start_connection_attempt) {
if (internally_triggered_connection_index_.has_value() &&
*internally_triggered_connection_index_ == index &&
connection_attempt_complete) {
internally_triggered_connection_index_.reset();
}
if (start_connection_attempt &&
!internally_triggered_connection_index_.has_value()) {
size_t next_index = (index + 1) % num_subchannels();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO,
"[RH %p] triggering internal connection attempt for subchannel "
"%p, subchannel_list %p (index %" PRIuPTR " of %" PRIuPTR ")",
p, subchannel(next_index)->subchannel(), this, next_index,
num_subchannels());
}
internally_triggered_connection_index_ = next_index;
subchannel(next_index)->subchannel()->AttemptToConnect();
}
}
@ -705,7 +727,10 @@ void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
UpdateConnectivityStateLocked(connectivity_state);
// Update the RH policy's connectivity state, creating new picker and new
// ring.
subchannel_list()->UpdateRingHashConnectivityStateLocked(Index());
bool connection_attempt_complete =
connectivity_state != GRPC_CHANNEL_CONNECTING;
subchannel_list()->UpdateRingHashConnectivityStateLocked(
Index(), connection_attempt_complete);
}
//

@ -76,35 +76,31 @@ void ConnectionAttemptInjector::AttemptConnection(
}
//
// ConnectionDelayInjector
// ConnectionAttemptInjector::InjectedDelay
//
class ConnectionDelayInjector::InjectedDelay {
public:
InjectedDelay(grpc_core::Duration duration, grpc_closure* closure,
grpc_endpoint** ep, grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline)
: attempt_(closure, ep, interested_parties, channel_args, addr,
deadline) {
GRPC_CLOSURE_INIT(&timer_callback_, TimerCallback, this, nullptr);
grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now();
duration = std::min(duration, deadline - now);
grpc_timer_init(&timer_, now + duration, &timer_callback_);
}
ConnectionAttemptInjector::InjectedDelay::InjectedDelay(
grpc_core::Duration duration, grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline)
: attempt_(closure, ep, interested_parties, channel_args, addr, deadline) {
GRPC_CLOSURE_INIT(&timer_callback_, TimerCallback, this, nullptr);
grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now();
duration = std::min(duration, deadline - now);
grpc_timer_init(&timer_, now + duration, &timer_callback_);
}
private:
static void TimerCallback(void* arg, grpc_error_handle /*error*/) {
auto* self = static_cast<InjectedDelay*>(arg);
self->attempt_.Resume();
delete self;
}
void ConnectionAttemptInjector::InjectedDelay::TimerCallback(
void* arg, grpc_error_handle /*error*/) {
auto* self = static_cast<InjectedDelay*>(arg);
self->BeforeResumingAction();
self->attempt_.Resume();
delete self;
}
QueuedAttempt attempt_;
grpc_timer timer_;
grpc_closure timer_callback_;
};
//
// ConnectionDelayInjector
//
void ConnectionDelayInjector::HandleConnection(
grpc_closure* closure, grpc_endpoint** ep,

@ -103,6 +103,28 @@ class ConnectionAttemptInjector {
grpc_core::Timestamp deadline_;
};
// Injects a delay before continuing a connection attempt.
class InjectedDelay {
public:
virtual ~InjectedDelay() = default;
InjectedDelay(grpc_core::Duration duration, grpc_closure* closure,
grpc_endpoint** ep, grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
private:
// Subclasses can override to perform an action when the attempt resumes.
virtual void BeforeResumingAction() {}
static void TimerCallback(void* arg, grpc_error_handle /*error*/);
QueuedAttempt attempt_;
grpc_timer timer_;
grpc_closure timer_callback_;
};
static void AttemptConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
@ -123,8 +145,6 @@ class ConnectionDelayInjector : public ConnectionAttemptInjector {
grpc_core::Timestamp deadline) override;
private:
class InjectedDelay;
grpc_core::Duration duration_;
};

@ -688,6 +688,191 @@ TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) {
EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count());
}
// Tests that when we trigger internal connection attempts without
// picks, we do so for only one subchannel at a time.
TEST_P(RingHashTest, ContinuesConnectingWithoutPicksOneSubchannelAtATime) {
// Create EDS resource.
CreateAndStartBackends(1);
auto non_existant_endpoint0 = MakeNonExistantEndpoint();
auto non_existant_endpoint1 = MakeNonExistantEndpoint();
auto non_existant_endpoint2 = MakeNonExistantEndpoint();
EdsResourceArgs args({{"locality0",
{non_existant_endpoint0, non_existant_endpoint1,
non_existant_endpoint2, CreateEndpoint(0)}}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Change CDS resource to use RING_HASH.
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::RING_HASH);
balancer_->ads_service()->SetCdsResource(cluster);
// Add hash policy to RDS resource.
auto new_route_config = default_route_config_;
auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* hash_policy = route->mutable_route()->add_hash_policy();
hash_policy->mutable_header()->set_header_name("address_hash");
SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
new_route_config);
// A connection injector that ensures that only one subchannel is
// connecting at a time.
class ConnectionInjector : public ConnectionAttemptInjector {
public:
ConnectionInjector(int port0, int port1, int port2, int good_port)
: port0_(port0), port1_(port1), port2_(port2), good_port_(good_port) {}
void HandleConnection(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) override {
{
grpc_core::MutexLock lock(&mu_);
const int port = grpc_sockaddr_get_port(addr);
gpr_log(GPR_INFO, "==> HandleConnection(): state_=%d, port=%d", state_,
port);
switch (state_) {
case kInit:
EXPECT_NE(port, port1_);
EXPECT_NE(port, port2_);
EXPECT_NE(port, good_port_);
if (port == port0_) {
gpr_log(GPR_INFO, "*** DELAYING ENDPOINT 0");
new DelayedAttempt(this, closure, ep, interested_parties,
channel_args, addr, deadline);
state_ = kDelayedEndpoint0;
cond_.Signal();
return;
}
break;
case kResumedEndpoint0:
EXPECT_NE(port, port0_);
EXPECT_NE(port, port2_);
EXPECT_NE(port, good_port_);
if (port == port1_) {
gpr_log(GPR_INFO, "*** DELAYING ENDPOINT 1");
new DelayedAttempt(this, closure, ep, interested_parties,
channel_args, addr, deadline);
state_ = kDelayedEndpoint1;
return;
} else {
gpr_log(GPR_INFO, "*** UNEXPECTED PORT");
}
break;
case kResumedEndpoint1:
EXPECT_NE(port, port0_);
EXPECT_NE(port, port1_);
EXPECT_NE(port, good_port_);
if (port == port2_) {
gpr_log(GPR_INFO, "*** DELAYING ENDPOINT 2");
new DelayedAttempt(this, closure, ep, interested_parties,
channel_args, addr, deadline);
state_ = kDelayedEndpoint2;
return;
} else {
gpr_log(GPR_INFO, "*** UNEXPECTED PORT");
}
break;
case kResumedEndpoint2:
EXPECT_NE(port, port0_);
EXPECT_NE(port, port1_);
EXPECT_NE(port, port2_);
if (port == good_port_) {
gpr_log(GPR_INFO, "*** DONE WITH ALL UNREACHABLE ENDPOINTS");
state_ = kDone;
}
break;
case kDelayedEndpoint0:
case kDelayedEndpoint1:
case kDelayedEndpoint2:
ASSERT_THAT(port, ::testing::AllOf(::testing::Ne(port0_),
::testing::Ne(port1_),
::testing::Ne(port2_),
::testing::Ne(good_port_)))
<< "started second connection attempt in parallel";
break;
case kDone:
break;
}
}
AttemptConnection(closure, ep, interested_parties, channel_args, addr,
deadline);
}
void WaitForFirstPortSeen() {
grpc_core::MutexLock lock(&mu_);
while (state_ == kInit) {
cond_.Wait(&mu_);
}
}
private:
class DelayedAttempt : public InjectedDelay {
public:
DelayedAttempt(ConnectionInjector* parent, grpc_closure* closure,
grpc_endpoint** ep, grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline)
: InjectedDelay(
grpc_core::Duration::Seconds(1 * grpc_test_slowdown_factor()),
closure, ep, interested_parties, channel_args, addr, deadline),
parent_(parent) {}
private:
void BeforeResumingAction() override {
grpc_core::MutexLock lock(&parent_->mu_);
if (parent_->state_ == kDelayedEndpoint0) {
gpr_log(GPR_INFO, "*** RESUMING ENDPOINT 0");
parent_->state_ = kResumedEndpoint0;
} else if (parent_->state_ == kDelayedEndpoint1) {
gpr_log(GPR_INFO, "*** RESUMING ENDPOINT 1");
parent_->state_ = kResumedEndpoint1;
} else if (parent_->state_ == kDelayedEndpoint2) {
gpr_log(GPR_INFO, "*** RESUMING ENDPOINT 2");
parent_->state_ = kResumedEndpoint2;
}
}
ConnectionInjector* parent_;
};
const int port0_;
const int port1_;
const int port2_;
const int good_port_;
grpc_core::Mutex mu_;
grpc_core::CondVar cond_;
enum {
kInit,
kDelayedEndpoint0,
kResumedEndpoint0,
kDelayedEndpoint1,
kResumedEndpoint1,
kDelayedEndpoint2,
kResumedEndpoint2,
kDone,
} state_ ABSL_GUARDED_BY(mu_) = kInit;
};
ConnectionInjector connection_injector(
non_existant_endpoint0.port, non_existant_endpoint1.port,
non_existant_endpoint2.port, backends_[0]->port());
// A long-running RPC, just used to send the RPC in another thread.
LongRunningRpc rpc;
std::vector<std::pair<std::string, std::string>> metadata = {
{"address_hash", CreateMetadataValueThatHashesToBackendPort(
non_existant_endpoint0.port)}};
rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
std::move(metadata)));
// Wait for the RPC to trigger the first connection attempt, then cancel it.
connection_injector.WaitForFirstPortSeen();
rpc.CancelRpc();
// Wait for channel to become connected without any pending RPC.
EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(10)));
// RPC should have been cancelled.
EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
// Make sure the backend did not get any requests.
EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count());
}
// Test that when the first pick is down leading to a transient failure, we
// will move on to the next ring hash entry.
TEST_P(RingHashTest, TransientFailureCheckNextOne) {

Loading…
Cancel
Save