diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc index 98036fc6ada..9780b85479c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc +++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc @@ -202,7 +202,10 @@ class RingHash : public LoadBalancingPolicy { // The index parameter indicates the index into the list of the subchannel // whose status report triggered the call to // UpdateRingHashConnectivityStateLocked(). - void UpdateRingHashConnectivityStateLocked(size_t index); + // connection_attempt_complete is true if the subchannel just + // finished a connection attempt. + void UpdateRingHashConnectivityStateLocked( + size_t index, bool connection_attempt_complete); // Create a new ring from this subchannel list. RefCountedPtr MakeRing(); @@ -212,6 +215,10 @@ class RingHash : public LoadBalancingPolicy { size_t num_ready_ = 0; size_t num_connecting_ = 0; size_t num_transient_failure_ = 0; + + // The index of the subchannel currently doing an internally + // triggered connection attempt, if any. + absl::optional internally_triggered_connection_index_; }; class Ring : public RefCounted { @@ -532,7 +539,8 @@ void RingHash::RingHashSubchannelList::StartWatchingLocked() { // Pretend we're getting this update from the last subchannel, so that // if we need to proactively start connecting, we'll start from the // first subchannel. - UpdateRingHashConnectivityStateLocked(num_subchannels() - 1); + UpdateRingHashConnectivityStateLocked(num_subchannels() - 1, + /*connection_attempt_complete=*/false); } void RingHash::RingHashSubchannelList::UpdateStateCountersLocked( @@ -563,7 +571,7 @@ void RingHash::RingHashSubchannelList::UpdateStateCountersLocked( } void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked( - size_t index) { + size_t index, bool connection_attempt_complete) { RingHash* p = static_cast(policy()); // Only set connectivity state if this is the current subchannel list. if (p->subchannel_list_.get() != this) return; @@ -624,8 +632,22 @@ void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked( // Note that we do the same thing when the policy is in state // CONNECTING, just to ensure that we don't remain in CONNECTING state // indefinitely if there are no new picks coming in. - if (start_connection_attempt) { + if (internally_triggered_connection_index_.has_value() && + *internally_triggered_connection_index_ == index && + connection_attempt_complete) { + internally_triggered_connection_index_.reset(); + } + if (start_connection_attempt && + !internally_triggered_connection_index_.has_value()) { size_t next_index = (index + 1) % num_subchannels(); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { + gpr_log(GPR_INFO, + "[RH %p] triggering internal connection attempt for subchannel " + "%p, subchannel_list %p (index %" PRIuPTR " of %" PRIuPTR ")", + p, subchannel(next_index)->subchannel(), this, next_index, + num_subchannels()); + } + internally_triggered_connection_index_ = next_index; subchannel(next_index)->subchannel()->AttemptToConnect(); } } @@ -705,7 +727,10 @@ void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked( UpdateConnectivityStateLocked(connectivity_state); // Update the RH policy's connectivity state, creating new picker and new // ring. - subchannel_list()->UpdateRingHashConnectivityStateLocked(Index()); + bool connection_attempt_complete = + connectivity_state != GRPC_CHANNEL_CONNECTING; + subchannel_list()->UpdateRingHashConnectivityStateLocked( + Index(), connection_attempt_complete); } // diff --git a/test/cpp/end2end/connection_delay_injector.cc b/test/cpp/end2end/connection_delay_injector.cc index b9ad06a7363..30ce66eb8f7 100644 --- a/test/cpp/end2end/connection_delay_injector.cc +++ b/test/cpp/end2end/connection_delay_injector.cc @@ -76,35 +76,31 @@ void ConnectionAttemptInjector::AttemptConnection( } // -// ConnectionDelayInjector +// ConnectionAttemptInjector::InjectedDelay // -class ConnectionDelayInjector::InjectedDelay { - public: - InjectedDelay(grpc_core::Duration duration, grpc_closure* closure, - grpc_endpoint** ep, grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* addr, - grpc_core::Timestamp deadline) - : attempt_(closure, ep, interested_parties, channel_args, addr, - deadline) { - GRPC_CLOSURE_INIT(&timer_callback_, TimerCallback, this, nullptr); - grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now(); - duration = std::min(duration, deadline - now); - grpc_timer_init(&timer_, now + duration, &timer_callback_); - } +ConnectionAttemptInjector::InjectedDelay::InjectedDelay( + grpc_core::Duration duration, grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, grpc_core::Timestamp deadline) + : attempt_(closure, ep, interested_parties, channel_args, addr, deadline) { + GRPC_CLOSURE_INIT(&timer_callback_, TimerCallback, this, nullptr); + grpc_core::Timestamp now = grpc_core::ExecCtx::Get()->Now(); + duration = std::min(duration, deadline - now); + grpc_timer_init(&timer_, now + duration, &timer_callback_); +} - private: - static void TimerCallback(void* arg, grpc_error_handle /*error*/) { - auto* self = static_cast(arg); - self->attempt_.Resume(); - delete self; - } +void ConnectionAttemptInjector::InjectedDelay::TimerCallback( + void* arg, grpc_error_handle /*error*/) { + auto* self = static_cast(arg); + self->BeforeResumingAction(); + self->attempt_.Resume(); + delete self; +} - QueuedAttempt attempt_; - grpc_timer timer_; - grpc_closure timer_callback_; -}; +// +// ConnectionDelayInjector +// void ConnectionDelayInjector::HandleConnection( grpc_closure* closure, grpc_endpoint** ep, diff --git a/test/cpp/end2end/connection_delay_injector.h b/test/cpp/end2end/connection_delay_injector.h index 42c41fd2ea0..6de089ce52d 100644 --- a/test/cpp/end2end/connection_delay_injector.h +++ b/test/cpp/end2end/connection_delay_injector.h @@ -103,6 +103,28 @@ class ConnectionAttemptInjector { grpc_core::Timestamp deadline_; }; + // Injects a delay before continuing a connection attempt. + class InjectedDelay { + public: + virtual ~InjectedDelay() = default; + + InjectedDelay(grpc_core::Duration duration, grpc_closure* closure, + grpc_endpoint** ep, grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, + grpc_core::Timestamp deadline); + + private: + // Subclasses can override to perform an action when the attempt resumes. + virtual void BeforeResumingAction() {} + + static void TimerCallback(void* arg, grpc_error_handle /*error*/); + + QueuedAttempt attempt_; + grpc_timer timer_; + grpc_closure timer_callback_; + }; + static void AttemptConnection(grpc_closure* closure, grpc_endpoint** ep, grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, @@ -123,8 +145,6 @@ class ConnectionDelayInjector : public ConnectionAttemptInjector { grpc_core::Timestamp deadline) override; private: - class InjectedDelay; - grpc_core::Duration duration_; }; diff --git a/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc b/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc index 562e9d66a9c..1c0d3183800 100644 --- a/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc @@ -688,6 +688,191 @@ TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) { EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count()); } +// Tests that when we trigger internal connection attempts without +// picks, we do so for only one subchannel at a time. +TEST_P(RingHashTest, ContinuesConnectingWithoutPicksOneSubchannelAtATime) { + // Create EDS resource. + CreateAndStartBackends(1); + auto non_existant_endpoint0 = MakeNonExistantEndpoint(); + auto non_existant_endpoint1 = MakeNonExistantEndpoint(); + auto non_existant_endpoint2 = MakeNonExistantEndpoint(); + EdsResourceArgs args({{"locality0", + {non_existant_endpoint0, non_existant_endpoint1, + non_existant_endpoint2, CreateEndpoint(0)}}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // Change CDS resource to use RING_HASH. + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancer_->ads_service()->SetCdsResource(cluster); + // Add hash policy to RDS resource. + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("address_hash"); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + // A connection injector that ensures that only one subchannel is + // connecting at a time. + class ConnectionInjector : public ConnectionAttemptInjector { + public: + ConnectionInjector(int port0, int port1, int port2, int good_port) + : port0_(port0), port1_(port1), port2_(port2), good_port_(good_port) {} + + void HandleConnection(grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, + grpc_core::Timestamp deadline) override { + { + grpc_core::MutexLock lock(&mu_); + const int port = grpc_sockaddr_get_port(addr); + gpr_log(GPR_INFO, "==> HandleConnection(): state_=%d, port=%d", state_, + port); + switch (state_) { + case kInit: + EXPECT_NE(port, port1_); + EXPECT_NE(port, port2_); + EXPECT_NE(port, good_port_); + if (port == port0_) { + gpr_log(GPR_INFO, "*** DELAYING ENDPOINT 0"); + new DelayedAttempt(this, closure, ep, interested_parties, + channel_args, addr, deadline); + state_ = kDelayedEndpoint0; + cond_.Signal(); + return; + } + break; + case kResumedEndpoint0: + EXPECT_NE(port, port0_); + EXPECT_NE(port, port2_); + EXPECT_NE(port, good_port_); + if (port == port1_) { + gpr_log(GPR_INFO, "*** DELAYING ENDPOINT 1"); + new DelayedAttempt(this, closure, ep, interested_parties, + channel_args, addr, deadline); + state_ = kDelayedEndpoint1; + return; + } else { + gpr_log(GPR_INFO, "*** UNEXPECTED PORT"); + } + break; + case kResumedEndpoint1: + EXPECT_NE(port, port0_); + EXPECT_NE(port, port1_); + EXPECT_NE(port, good_port_); + if (port == port2_) { + gpr_log(GPR_INFO, "*** DELAYING ENDPOINT 2"); + new DelayedAttempt(this, closure, ep, interested_parties, + channel_args, addr, deadline); + state_ = kDelayedEndpoint2; + return; + } else { + gpr_log(GPR_INFO, "*** UNEXPECTED PORT"); + } + break; + case kResumedEndpoint2: + EXPECT_NE(port, port0_); + EXPECT_NE(port, port1_); + EXPECT_NE(port, port2_); + if (port == good_port_) { + gpr_log(GPR_INFO, "*** DONE WITH ALL UNREACHABLE ENDPOINTS"); + state_ = kDone; + } + break; + case kDelayedEndpoint0: + case kDelayedEndpoint1: + case kDelayedEndpoint2: + ASSERT_THAT(port, ::testing::AllOf(::testing::Ne(port0_), + ::testing::Ne(port1_), + ::testing::Ne(port2_), + ::testing::Ne(good_port_))) + << "started second connection attempt in parallel"; + break; + case kDone: + break; + } + } + AttemptConnection(closure, ep, interested_parties, channel_args, addr, + deadline); + } + + void WaitForFirstPortSeen() { + grpc_core::MutexLock lock(&mu_); + while (state_ == kInit) { + cond_.Wait(&mu_); + } + } + + private: + class DelayedAttempt : public InjectedDelay { + public: + DelayedAttempt(ConnectionInjector* parent, grpc_closure* closure, + grpc_endpoint** ep, grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* addr, + grpc_core::Timestamp deadline) + : InjectedDelay( + grpc_core::Duration::Seconds(1 * grpc_test_slowdown_factor()), + closure, ep, interested_parties, channel_args, addr, deadline), + parent_(parent) {} + + private: + void BeforeResumingAction() override { + grpc_core::MutexLock lock(&parent_->mu_); + if (parent_->state_ == kDelayedEndpoint0) { + gpr_log(GPR_INFO, "*** RESUMING ENDPOINT 0"); + parent_->state_ = kResumedEndpoint0; + } else if (parent_->state_ == kDelayedEndpoint1) { + gpr_log(GPR_INFO, "*** RESUMING ENDPOINT 1"); + parent_->state_ = kResumedEndpoint1; + } else if (parent_->state_ == kDelayedEndpoint2) { + gpr_log(GPR_INFO, "*** RESUMING ENDPOINT 2"); + parent_->state_ = kResumedEndpoint2; + } + } + + ConnectionInjector* parent_; + }; + + const int port0_; + const int port1_; + const int port2_; + const int good_port_; + + grpc_core::Mutex mu_; + grpc_core::CondVar cond_; + enum { + kInit, + kDelayedEndpoint0, + kResumedEndpoint0, + kDelayedEndpoint1, + kResumedEndpoint1, + kDelayedEndpoint2, + kResumedEndpoint2, + kDone, + } state_ ABSL_GUARDED_BY(mu_) = kInit; + }; + ConnectionInjector connection_injector( + non_existant_endpoint0.port, non_existant_endpoint1.port, + non_existant_endpoint2.port, backends_[0]->port()); + // A long-running RPC, just used to send the RPC in another thread. + LongRunningRpc rpc; + std::vector> metadata = { + {"address_hash", CreateMetadataValueThatHashesToBackendPort( + non_existant_endpoint0.port)}}; + rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata( + std::move(metadata))); + // Wait for the RPC to trigger the first connection attempt, then cancel it. + connection_injector.WaitForFirstPortSeen(); + rpc.CancelRpc(); + // Wait for channel to become connected without any pending RPC. + EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(10))); + // RPC should have been cancelled. + EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code()); + // Make sure the backend did not get any requests. + EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count()); +} + // Test that when the first pick is down leading to a transient failure, we // will move on to the next ring hash entry. TEST_P(RingHashTest, TransientFailureCheckNextOne) {