diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.cc b/test/cpp/end2end/xds/xds_end2end_test_lib.cc index 88dd6af93c6..72905d99d70 100644 --- a/test/cpp/end2end/xds/xds_end2end_test_lib.cc +++ b/test/cpp/end2end/xds/xds_end2end_test_lib.cc @@ -714,33 +714,37 @@ Status XdsEnd2endTest::LongRunningRpc::GetStatus() { return status_; } -std::vector XdsEnd2endTest::SendConcurrentRpcs( +std::vector> +XdsEnd2endTest::SendConcurrentRpcs( const grpc_core::DebugLocation& debug_location, grpc::testing::EchoTestService::Stub* stub, size_t num_rpcs, const RpcOptions& rpc_options) { // Variables for RPCs. - std::vector rpcs(num_rpcs); + std::vector> rpcs; + rpcs.reserve(num_rpcs); EchoRequest request; // Variables for synchronization grpc_core::Mutex mu; grpc_core::CondVar cv; size_t completed = 0; // Set-off callback RPCs - for (size_t i = 0; i < num_rpcs; i++) { - ConcurrentRpc* rpc = &rpcs[i]; + for (size_t i = 0; i < num_rpcs; ++i) { + auto rpc = std::make_unique(); rpc_options.SetupRpc(&rpc->context, &request); grpc_core::Timestamp t0 = NowFromCycleCounter(); - stub->async()->Echo(&rpc->context, &request, &rpc->response, - [rpc, &mu, &completed, &cv, num_rpcs, t0](Status s) { - rpc->status = s; - rpc->elapsed_time = NowFromCycleCounter() - t0; - bool done; - { - grpc_core::MutexLock lock(&mu); - done = (++completed) == num_rpcs; - } - if (done) cv.Signal(); - }); + stub->async()->Echo( + &rpc->context, &request, &rpc->response, + [rpc = rpc.get(), &mu, &completed, &cv, num_rpcs, t0](Status s) { + rpc->status = s; + rpc->elapsed_time = NowFromCycleCounter() - t0; + bool done; + { + grpc_core::MutexLock lock(&mu); + done = (++completed) == num_rpcs; + } + if (done) cv.Signal(); + }); + rpcs.push_back(std::move(rpc)); } { grpc_core::MutexLock lock(&mu); diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.h b/test/cpp/end2end/xds/xds_end2end_test_lib.h index 68991fc881c..214d59de75a 100644 --- a/test/cpp/end2end/xds/xds_end2end_test_lib.h +++ b/test/cpp/end2end/xds/xds_end2end_test_lib.h @@ -785,7 +785,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam, grpc_core::Duration elapsed_time; EchoResponse response; }; - std::vector SendConcurrentRpcs( + std::vector> SendConcurrentRpcs( const grpc_core::DebugLocation& debug_location, grpc::testing::EchoTestService::Stub* stub, size_t num_rpcs, const RpcOptions& rpc_options); diff --git a/test/cpp/end2end/xds/xds_fault_injection_end2end_test.cc b/test/cpp/end2end/xds/xds_fault_injection_end2end_test.cc index 35653fd8748..7334afd0eda 100644 --- a/test/cpp/end2end/xds/xds_fault_injection_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_fault_injection_end2end_test.cc @@ -240,12 +240,12 @@ TEST_P(FaultInjectionTest, XdsFaultInjectionPercentageDelay) { // Send kNumRpcs RPCs and count the delays. RpcOptions rpc_options = RpcOptions().set_timeout(kRpcTimeout).set_skip_cancelled_check(true); - std::vector rpcs = + std::vector> rpcs = SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options); size_t num_delayed = 0; for (auto& rpc : rpcs) { - if (rpc.status.error_code() == StatusCode::OK) continue; - EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, rpc.status.error_code()); + if (rpc->status.error_code() == StatusCode::OK) continue; + EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, rpc->status.error_code()); ++num_delayed; } // The delay rate should be roughly equal to the expectation. @@ -295,12 +295,12 @@ TEST_P(FaultInjectionTest, XdsFaultInjectionPercentageDelayViaHeaders) { .set_metadata(metadata) .set_timeout(kRpcTimeout) .set_skip_cancelled_check(true); - std::vector rpcs = + std::vector> rpcs = SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options); size_t num_delayed = 0; for (auto& rpc : rpcs) { - if (rpc.status.error_code() == StatusCode::OK) continue; - EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, rpc.status.error_code()); + if (rpc->status.error_code() == StatusCode::OK) continue; + EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, rpc->status.error_code()); ++num_delayed; } // The delay rate should be roughly equal to the expectation. @@ -381,12 +381,12 @@ TEST_P(FaultInjectionTest, XdsFaultInjectionAlwaysDelayPercentageAbort) { // Send kNumRpcs RPCs and count the aborts. int num_aborted = 0; RpcOptions rpc_options = RpcOptions().set_timeout(kRpcTimeout); - std::vector rpcs = + std::vector> rpcs = SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options); for (auto& rpc : rpcs) { - EXPECT_GE(rpc.elapsed_time, kFixedDelay * grpc_test_slowdown_factor()); - if (rpc.status.error_code() == StatusCode::OK) continue; - EXPECT_EQ("Fault injected", rpc.status.error_message()); + EXPECT_GE(rpc->elapsed_time, kFixedDelay * grpc_test_slowdown_factor()); + if (rpc->status.error_code() == StatusCode::OK) continue; + EXPECT_EQ("Fault injected", rpc->status.error_message()); ++num_aborted; } // The abort rate should be roughly equal to the expectation. @@ -438,12 +438,12 @@ TEST_P(FaultInjectionTest, // Send kNumRpcs RPCs and count the aborts. int num_aborted = 0; RpcOptions rpc_options = RpcOptions().set_timeout(kRpcTimeout); - std::vector rpcs = + std::vector> rpcs = SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options); for (auto& rpc : rpcs) { - EXPECT_GE(rpc.elapsed_time, kFixedDelay * grpc_test_slowdown_factor()); - if (rpc.status.error_code() == StatusCode::OK) continue; - EXPECT_EQ("Fault injected", rpc.status.error_message()); + EXPECT_GE(rpc->elapsed_time, kFixedDelay * grpc_test_slowdown_factor()); + if (rpc->status.error_code() == StatusCode::OK) continue; + EXPECT_EQ("Fault injected", rpc->status.error_message()); ++num_aborted; } // The abort rate should be roughly equal to the expectation. @@ -481,11 +481,11 @@ TEST_P(FaultInjectionTest, XdsFaultInjectionMaxFault) { // active faults quota. int num_delayed = 0; RpcOptions rpc_options = RpcOptions().set_timeout(kRpcTimeout); - std::vector rpcs = + std::vector> rpcs = SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options); for (auto& rpc : rpcs) { - if (rpc.status.error_code() == StatusCode::OK) continue; - EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, rpc.status.error_code()); + if (rpc->status.error_code() == StatusCode::OK) continue; + EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, rpc->status.error_code()); ++num_delayed; } // Only kMaxFault number of RPC should be fault injected. @@ -495,8 +495,8 @@ TEST_P(FaultInjectionTest, XdsFaultInjectionMaxFault) { num_delayed = 0; rpcs = SendConcurrentRpcs(DEBUG_LOCATION, stub_.get(), kNumRpcs, rpc_options); for (auto& rpc : rpcs) { - if (rpc.status.error_code() == StatusCode::OK) continue; - EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, rpc.status.error_code()); + if (rpc->status.error_code() == StatusCode::OK) continue; + EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, rpc->status.error_code()); ++num_delayed; } // Only kMaxFault number of RPC should be fault injected. If the max fault