[xds_fault_injection_e2e_test] attempt to fix crash on mac (#37670)

Attempt to work around the following test crash seen on mac:

https://btx.cloud.google.com/invocations/28cb507f-251f-404f-ba08-97722d668922/targets/%2F%2Ftest%2Fcpp%2Fend2end%2Fxds:xds_fault_injection_end2end_test;config=fecbec564c2f515f7347784d11564eda03ae4ef5a45558b64c9fed3461966ce8/log

It's not clear what's actually causing this crash, but given that `ClientContext` is not movable, we shouldn't be putting it directly in `std::vector<>` in the first place.  This changes the test framework's `SendConcurrentRpcs()` method to use `std::unique_ptr<>` in the vector instead.

Closes #37670

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37670 from markdroth:xds_fault_injection_e2e_test_flake 0f09707d77
PiperOrigin-RevId: 673173003
pull/37689/head
Mark D. Roth 3 months ago committed by Copybara-Service
parent c831e1de79
commit 9ae873d56d
  1. 34
      test/cpp/end2end/xds/xds_end2end_test_lib.cc
  2. 2
      test/cpp/end2end/xds/xds_end2end_test_lib.h
  3. 38
      test/cpp/end2end/xds/xds_fault_injection_end2end_test.cc

@ -714,33 +714,37 @@ Status XdsEnd2endTest::LongRunningRpc::GetStatus() {
return status_; return status_;
} }
std::vector<XdsEnd2endTest::ConcurrentRpc> XdsEnd2endTest::SendConcurrentRpcs( std::vector<std::unique_ptr<XdsEnd2endTest::ConcurrentRpc>>
XdsEnd2endTest::SendConcurrentRpcs(
const grpc_core::DebugLocation& debug_location, const grpc_core::DebugLocation& debug_location,
grpc::testing::EchoTestService::Stub* stub, size_t num_rpcs, grpc::testing::EchoTestService::Stub* stub, size_t num_rpcs,
const RpcOptions& rpc_options) { const RpcOptions& rpc_options) {
// Variables for RPCs. // Variables for RPCs.
std::vector<ConcurrentRpc> rpcs(num_rpcs); std::vector<std::unique_ptr<ConcurrentRpc>> rpcs;
rpcs.reserve(num_rpcs);
EchoRequest request; EchoRequest request;
// Variables for synchronization // Variables for synchronization
grpc_core::Mutex mu; grpc_core::Mutex mu;
grpc_core::CondVar cv; grpc_core::CondVar cv;
size_t completed = 0; size_t completed = 0;
// Set-off callback RPCs // Set-off callback RPCs
for (size_t i = 0; i < num_rpcs; i++) { for (size_t i = 0; i < num_rpcs; ++i) {
ConcurrentRpc* rpc = &rpcs[i]; auto rpc = std::make_unique<ConcurrentRpc>();
rpc_options.SetupRpc(&rpc->context, &request); rpc_options.SetupRpc(&rpc->context, &request);
grpc_core::Timestamp t0 = NowFromCycleCounter(); grpc_core::Timestamp t0 = NowFromCycleCounter();
stub->async()->Echo(&rpc->context, &request, &rpc->response, stub->async()->Echo(
[rpc, &mu, &completed, &cv, num_rpcs, t0](Status s) { &rpc->context, &request, &rpc->response,
rpc->status = s; [rpc = rpc.get(), &mu, &completed, &cv, num_rpcs, t0](Status s) {
rpc->elapsed_time = NowFromCycleCounter() - t0; rpc->status = s;
bool done; rpc->elapsed_time = NowFromCycleCounter() - t0;
{ bool done;
grpc_core::MutexLock lock(&mu); {
done = (++completed) == num_rpcs; grpc_core::MutexLock lock(&mu);
} done = (++completed) == num_rpcs;
if (done) cv.Signal(); }
}); if (done) cv.Signal();
});
rpcs.push_back(std::move(rpc));
} }
{ {
grpc_core::MutexLock lock(&mu); grpc_core::MutexLock lock(&mu);

@ -785,7 +785,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType>,
grpc_core::Duration elapsed_time; grpc_core::Duration elapsed_time;
EchoResponse response; EchoResponse response;
}; };
std::vector<ConcurrentRpc> SendConcurrentRpcs( std::vector<std::unique_ptr<ConcurrentRpc>> SendConcurrentRpcs(
const grpc_core::DebugLocation& debug_location, const grpc_core::DebugLocation& debug_location,
grpc::testing::EchoTestService::Stub* stub, size_t num_rpcs, grpc::testing::EchoTestService::Stub* stub, size_t num_rpcs,
const RpcOptions& rpc_options); const RpcOptions& rpc_options);

@ -240,12 +240,12 @@ TEST_P(FaultInjectionTest, XdsFaultInjectionPercentageDelay) {
// Send kNumRpcs RPCs and count the delays. // Send kNumRpcs RPCs and count the delays.
RpcOptions rpc_options = RpcOptions rpc_options =
RpcOptions().set_timeout(kRpcTimeout).set_skip_cancelled_check(true); RpcOptions().set_timeout(kRpcTimeout).set_skip_cancelled_check(true);
std::vector<ConcurrentRpc> rpcs = std::vector<std::unique_ptr<ConcurrentRpc>> rpcs =
SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options); SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options);
size_t num_delayed = 0; size_t num_delayed = 0;
for (auto& rpc : rpcs) { for (auto& rpc : rpcs) {
if (rpc.status.error_code() == StatusCode::OK) continue; if (rpc->status.error_code() == StatusCode::OK) continue;
EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, rpc.status.error_code()); EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, rpc->status.error_code());
++num_delayed; ++num_delayed;
} }
// The delay rate should be roughly equal to the expectation. // The delay rate should be roughly equal to the expectation.
@ -295,12 +295,12 @@ TEST_P(FaultInjectionTest, XdsFaultInjectionPercentageDelayViaHeaders) {
.set_metadata(metadata) .set_metadata(metadata)
.set_timeout(kRpcTimeout) .set_timeout(kRpcTimeout)
.set_skip_cancelled_check(true); .set_skip_cancelled_check(true);
std::vector<ConcurrentRpc> rpcs = std::vector<std::unique_ptr<ConcurrentRpc>> rpcs =
SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options); SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options);
size_t num_delayed = 0; size_t num_delayed = 0;
for (auto& rpc : rpcs) { for (auto& rpc : rpcs) {
if (rpc.status.error_code() == StatusCode::OK) continue; if (rpc->status.error_code() == StatusCode::OK) continue;
EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, rpc.status.error_code()); EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, rpc->status.error_code());
++num_delayed; ++num_delayed;
} }
// The delay rate should be roughly equal to the expectation. // The delay rate should be roughly equal to the expectation.
@ -381,12 +381,12 @@ TEST_P(FaultInjectionTest, XdsFaultInjectionAlwaysDelayPercentageAbort) {
// Send kNumRpcs RPCs and count the aborts. // Send kNumRpcs RPCs and count the aborts.
int num_aborted = 0; int num_aborted = 0;
RpcOptions rpc_options = RpcOptions().set_timeout(kRpcTimeout); RpcOptions rpc_options = RpcOptions().set_timeout(kRpcTimeout);
std::vector<ConcurrentRpc> rpcs = std::vector<std::unique_ptr<ConcurrentRpc>> rpcs =
SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options); SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options);
for (auto& rpc : rpcs) { for (auto& rpc : rpcs) {
EXPECT_GE(rpc.elapsed_time, kFixedDelay * grpc_test_slowdown_factor()); EXPECT_GE(rpc->elapsed_time, kFixedDelay * grpc_test_slowdown_factor());
if (rpc.status.error_code() == StatusCode::OK) continue; if (rpc->status.error_code() == StatusCode::OK) continue;
EXPECT_EQ("Fault injected", rpc.status.error_message()); EXPECT_EQ("Fault injected", rpc->status.error_message());
++num_aborted; ++num_aborted;
} }
// The abort rate should be roughly equal to the expectation. // The abort rate should be roughly equal to the expectation.
@ -438,12 +438,12 @@ TEST_P(FaultInjectionTest,
// Send kNumRpcs RPCs and count the aborts. // Send kNumRpcs RPCs and count the aborts.
int num_aborted = 0; int num_aborted = 0;
RpcOptions rpc_options = RpcOptions().set_timeout(kRpcTimeout); RpcOptions rpc_options = RpcOptions().set_timeout(kRpcTimeout);
std::vector<ConcurrentRpc> rpcs = std::vector<std::unique_ptr<ConcurrentRpc>> rpcs =
SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options); SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options);
for (auto& rpc : rpcs) { for (auto& rpc : rpcs) {
EXPECT_GE(rpc.elapsed_time, kFixedDelay * grpc_test_slowdown_factor()); EXPECT_GE(rpc->elapsed_time, kFixedDelay * grpc_test_slowdown_factor());
if (rpc.status.error_code() == StatusCode::OK) continue; if (rpc->status.error_code() == StatusCode::OK) continue;
EXPECT_EQ("Fault injected", rpc.status.error_message()); EXPECT_EQ("Fault injected", rpc->status.error_message());
++num_aborted; ++num_aborted;
} }
// The abort rate should be roughly equal to the expectation. // The abort rate should be roughly equal to the expectation.
@ -481,11 +481,11 @@ TEST_P(FaultInjectionTest, XdsFaultInjectionMaxFault) {
// active faults quota. // active faults quota.
int num_delayed = 0; int num_delayed = 0;
RpcOptions rpc_options = RpcOptions().set_timeout(kRpcTimeout); RpcOptions rpc_options = RpcOptions().set_timeout(kRpcTimeout);
std::vector<ConcurrentRpc> rpcs = std::vector<std::unique_ptr<ConcurrentRpc>> rpcs =
SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options); SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options);
for (auto& rpc : rpcs) { for (auto& rpc : rpcs) {
if (rpc.status.error_code() == StatusCode::OK) continue; if (rpc->status.error_code() == StatusCode::OK) continue;
EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, rpc.status.error_code()); EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, rpc->status.error_code());
++num_delayed; ++num_delayed;
} }
// Only kMaxFault number of RPC should be fault injected. // Only kMaxFault number of RPC should be fault injected.
@ -495,8 +495,8 @@ TEST_P(FaultInjectionTest, XdsFaultInjectionMaxFault) {
num_delayed = 0; num_delayed = 0;
rpcs = SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options); rpcs = SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options);
for (auto& rpc : rpcs) { for (auto& rpc : rpcs) {
if (rpc.status.error_code() == StatusCode::OK) continue; if (rpc->status.error_code() == StatusCode::OK) continue;
EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, rpc.status.error_code()); EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, rpc->status.error_code());
++num_delayed; ++num_delayed;
} }
// Only kMaxFault number of RPC should be fault injected. If the max fault // Only kMaxFault number of RPC should be fault injected. If the max fault

Loading…
Cancel
Save