xds_ring_hash_end2end_test: fix flake in ContinuesConnectingWithoutPicks (#29461)

pull/29470/head
Mark D. Roth 3 years ago committed by GitHub
parent 659a02aba0
commit 0ba3c59672
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc

@ -648,8 +648,11 @@ TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) {
seen_port_, port); seen_port_, port);
if (!seen_port_ && port == port_) { if (!seen_port_ && port == port_) {
gpr_log(GPR_INFO, "*** SEEN P0 CONNECTION ATTEMPT"); gpr_log(GPR_INFO, "*** SEEN P0 CONNECTION ATTEMPT");
queued_p0_attempt_ = absl::make_unique<QueuedAttempt>(
closure, ep, interested_parties, channel_args, addr, deadline);
seen_port_ = true; seen_port_ = true;
cond_.Signal(); cond_.Signal();
return;
} }
} }
AttemptConnection(closure, ep, interested_parties, channel_args, addr, AttemptConnection(closure, ep, interested_parties, channel_args, addr,
@ -663,12 +666,25 @@ TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) {
} }
} }
// Invoked by the test when the RPC has been cancelled and it's ready
// to allow the connection attempt to proceed.
void CompleteP0ConnectionAttempt() {
grpc_core::ExecCtx exec_ctx;
std::unique_ptr<QueuedAttempt> attempt;
{
grpc_core::MutexLock lock(&mu_);
attempt = std::move(queued_p0_attempt_);
}
attempt->Resume();
}
private: private:
const int port_; const int port_;
grpc_core::Mutex mu_; grpc_core::Mutex mu_;
grpc_core::CondVar cond_; grpc_core::CondVar cond_;
bool seen_port_ ABSL_GUARDED_BY(mu_) = false; bool seen_port_ ABSL_GUARDED_BY(mu_) = false;
std::unique_ptr<QueuedAttempt> queued_p0_attempt_ ABSL_GUARDED_BY(mu_);
}; };
ConnectionInjector connection_injector(non_existant_endpoint.port); ConnectionInjector connection_injector(non_existant_endpoint.port);
connection_injector.Start(); connection_injector.Start();
@ -679,13 +695,14 @@ TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) {
CreateMetadataValueThatHashesToBackendPort(non_existant_endpoint.port)}}; CreateMetadataValueThatHashesToBackendPort(non_existant_endpoint.port)}};
rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata( rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
std::move(metadata))); std::move(metadata)));
// Wait for the RPC to trigger the P0 connection attempt, then cancel it. // Wait for the RPC to trigger the P0 connection attempt, then cancel it,
// and then allow the connection attempt to complete.
connection_injector.WaitForP0ConnectionAttempt(); connection_injector.WaitForP0ConnectionAttempt();
rpc.CancelRpc(); rpc.CancelRpc();
EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
connection_injector.CompleteP0ConnectionAttempt();
// Wait for channel to become connected without any pending RPC. // Wait for channel to become connected without any pending RPC.
EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(5))); EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(5)));
// RPC should have been cancelled.
EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
// Make sure the backend did not get any requests. // Make sure the backend did not get any requests.
EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count()); EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count());
} }

Loading…
Cancel
Save