From 48aa2ff1e53516d620829ac08300ffcd91549ece Mon Sep 17 00:00:00 2001 From: Yousuk Seung Date: Tue, 15 Aug 2023 19:04:05 -0700 Subject: [PATCH] [fix] don't leak when a range times out unacked (#34064) --- .../posix_engine/traced_buffer_list.cc | 2 + src/core/lib/iomgr/buffer_list.cc | 2 + .../posix/traced_buffer_list_test.cc | 87 +++++++++++-------- test/core/iomgr/buffer_list_test.cc | 83 +++++++++++------- 4 files changed, 107 insertions(+), 67 deletions(-) diff --git a/src/core/lib/event_engine/posix_engine/traced_buffer_list.cc b/src/core/lib/event_engine/posix_engine/traced_buffer_list.cc index 947299ce1b7..b68f2559f4b 100644 --- a/src/core/lib/event_engine/posix_engine/traced_buffer_list.cc +++ b/src/core/lib/event_engine/posix_engine/traced_buffer_list.cc @@ -279,6 +279,8 @@ void TracedBufferList::ProcessTimestamp(struct sock_extended_err* serr, elem = elem->next_; continue; } + g_timestamps_callback(elem->arg_, &(elem->ts_), + absl::DeadlineExceededError("Ack timed out")); if (prev != nullptr) { prev->next_ = elem->next_; delete elem; diff --git a/src/core/lib/iomgr/buffer_list.cc b/src/core/lib/iomgr/buffer_list.cc index 11ebc9cc773..b5b13c7c434 100644 --- a/src/core/lib/iomgr/buffer_list.cc +++ b/src/core/lib/iomgr/buffer_list.cc @@ -278,6 +278,8 @@ void TracedBufferList::ProcessTimestamp(struct sock_extended_err* serr, elem = elem->next_; continue; } + g_timestamps_callback(elem->arg_, &(elem->ts_), + absl::DeadlineExceededError("Ack timed out")); if (prev != nullptr) { prev->next_ = elem->next_; delete elem; diff --git a/test/core/event_engine/posix/traced_buffer_list_test.cc b/test/core/event_engine/posix/traced_buffer_list_test.cc index bd816d4d239..afb3a9e7cb4 100644 --- a/test/core/event_engine/posix/traced_buffer_list_test.cc +++ b/test/core/event_engine/posix/traced_buffer_list_test.cc @@ -73,30 +73,6 @@ void TestShutdownFlushesListVerifier(void* arg, Timestamps* /*ts*/, *done = 1; } -void TestVerifierCalledOnAckVerifier(void* arg, Timestamps* ts, - absl::Status status) { - ASSERT_TRUE(status.ok()); - ASSERT_NE(arg, nullptr); - ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME); - ASSERT_EQ(ts->acked_time.time.tv_sec, 123); - ASSERT_EQ(ts->acked_time.time.tv_nsec, 456); - ASSERT_GT(ts->info.length, 0); - int* done = reinterpret_cast(arg); - *done = 1; -} - -void TestVerifierCalledOnDelayedAckVerifier(void* arg, Timestamps* ts, - absl::Status error) { - ASSERT_TRUE(error.ok()); - ASSERT_NE(arg, nullptr); - ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME); - ASSERT_EQ(ts->acked_time.time.tv_sec, g_now.tv_sec); - ASSERT_EQ(ts->acked_time.time.tv_nsec, g_now.tv_nsec); - ASSERT_GT(ts->info.length, 0); - gpr_atm* done = reinterpret_cast(arg); - gpr_atm_rel_store(done, static_cast(1)); -} - } // namespace // Tests that all TracedBuffer elements in the list are flushed out on shutdown. @@ -124,7 +100,17 @@ TEST(BufferListTest, TestVerifierCalledOnAck) { struct scm_timestamping tss; tss.ts[0].tv_sec = 123; tss.ts[0].tv_nsec = 456; - TcpSetWriteTimestampsCallback(TestVerifierCalledOnAckVerifier); + TcpSetWriteTimestampsCallback( + [](void* arg, Timestamps* ts, absl::Status status) { + ASSERT_TRUE(status.ok()); + ASSERT_NE(arg, nullptr); + ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME); + ASSERT_EQ(ts->acked_time.time.tv_sec, 123); + ASSERT_EQ(ts->acked_time.time.tv_nsec, 456); + ASSERT_GT(ts->info.length, 0); + int* done = reinterpret_cast(arg); + *done = 1; + }); TracedBufferList traced_buffers; int verifier_called = 0; traced_buffers.AddNewEntry(213, 0, &verifier_called); @@ -168,7 +154,6 @@ TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) { struct sock_extended_err serr[3]; gpr_atm verifier_called[3]; struct scm_timestamping tss; - TcpSetWriteTimestampsCallback(TestVerifierCalledOnDelayedAckVerifier); TracedBufferList tb_list; serr[0].ee_data = 1; serr[0].ee_info = SCM_TSTAMP_SCHED; @@ -190,6 +175,9 @@ TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) { tss.ts[0].tv_nsec = g_now.tv_nsec; // Process SCHED Timestamp for 1st traced buffer. + // Nothing should be flushed. + TcpSetWriteTimestampsCallback( + [](void*, Timestamps*, absl::Status) { ASSERT_TRUE(false); }); tb_list.ProcessTimestamp(&serr[0], nullptr, &tss); ASSERT_EQ(tb_list.Size(), 3); ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast(0)); @@ -201,24 +189,41 @@ TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) { tss.ts[0].tv_nsec = g_now.tv_nsec; // Process SND Timestamp for 1st traced buffer. The second and third traced - // buffers must have been flushed because the max pending ack time would have + // buffers must be flushed because the max pending ack time would have // elapsed for them. + TcpSetWriteTimestampsCallback([](void* arg, Timestamps*, absl::Status error) { + ASSERT_EQ(error, absl::DeadlineExceededError("Ack timed out")); + ASSERT_NE(arg, nullptr); + gpr_atm* done = reinterpret_cast(arg); + gpr_atm_rel_store(done, static_cast(1)); + }); tb_list.ProcessTimestamp(&serr[1], nullptr, &tss); ASSERT_EQ(tb_list.Size(), 1); ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast(0)); - ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast(0)); - ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast(0)); + ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast(1)); + ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast(1)); AdvanceClockMillis(kMaxPendingAckMillis); tss.ts[0].tv_sec = g_now.tv_sec; tss.ts[0].tv_nsec = g_now.tv_nsec; // Process ACK Timestamp for 1st traced buffer. + TcpSetWriteTimestampsCallback( + [](void* arg, Timestamps* ts, absl::Status error) { + ASSERT_TRUE(error.ok()); + ASSERT_NE(arg, nullptr); + ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME); + ASSERT_EQ(ts->acked_time.time.tv_sec, g_now.tv_sec); + ASSERT_EQ(ts->acked_time.time.tv_nsec, g_now.tv_nsec); + ASSERT_GT(ts->info.length, 0); + gpr_atm* done = reinterpret_cast(arg); + gpr_atm_rel_store(done, static_cast(2)); + }); tb_list.ProcessTimestamp(&serr[2], nullptr, &tss); ASSERT_EQ(tb_list.Size(), 0); - ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast(1)); - ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast(0)); - ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast(0)); + ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast(2)); + ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast(1)); + ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast(1)); tb_list.Shutdown(nullptr, absl::OkStatus()); } @@ -231,7 +236,21 @@ TEST(BufferListTest, TestLongPendingAckForSomeTracedBuffers) { struct scm_timestamping tss; tss.ts[0].tv_sec = 123; tss.ts[0].tv_nsec = 456; - TcpSetWriteTimestampsCallback(TestVerifierCalledOnAckVerifier); + TcpSetWriteTimestampsCallback( + [](void* arg, Timestamps* ts, absl::Status status) { + ASSERT_NE(arg, nullptr); + if (status.ok()) { + ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME); + ASSERT_EQ(ts->acked_time.time.tv_sec, 123); + ASSERT_EQ(ts->acked_time.time.tv_nsec, 456); + ASSERT_GT(ts->info.length, 0); + *(reinterpret_cast(arg)) = 1; + } else if (status == absl::DeadlineExceededError("Ack timed out")) { + *(reinterpret_cast(arg)) = 2; + } else { + ASSERT_TRUE(false); + } + }); TracedBufferList tb_list; for (int i = 0; i < kNumTracedBuffers; i++) { serr[i].ee_data = i + 1; @@ -259,7 +278,7 @@ TEST(BufferListTest, TestLongPendingAckForSomeTracedBuffers) { static_cast(1)); } else { ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]), - static_cast(0)); + static_cast(2)); } } else { ASSERT_EQ(tb_list.Size(), kNumTracedBuffers - (i + 1)); diff --git a/test/core/iomgr/buffer_list_test.cc b/test/core/iomgr/buffer_list_test.cc index 2396ec5ac15..df46aba58d5 100644 --- a/test/core/iomgr/buffer_list_test.cc +++ b/test/core/iomgr/buffer_list_test.cc @@ -59,35 +59,18 @@ void AdvanceClockMillis(uint64_t millis) { exec_ctx.InvalidateNow(); } -static void TestShutdownFlushesListVerifier(void* arg, - grpc_core::Timestamps* /*ts*/, - grpc_error_handle error) { - ASSERT_TRUE(error.ok()); - ASSERT_NE(arg, nullptr); - gpr_atm* done = reinterpret_cast(arg); - gpr_atm_rel_store(done, gpr_atm{1}); -} - -static void TestVerifierCalledOnDelayedAckVerifier(void* arg, - grpc_core::Timestamps* ts, - grpc_error_handle error) { - ASSERT_TRUE(error.ok()); - ASSERT_NE(arg, nullptr); - ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME); - ASSERT_EQ(ts->acked_time.time.tv_sec, g_now.tv_sec); - ASSERT_EQ(ts->acked_time.time.tv_nsec, g_now.tv_nsec); - ASSERT_GT(ts->info.length, 0); - gpr_atm* done = reinterpret_cast(arg); - gpr_atm_rel_store(done, static_cast(1)); -} - /// Tests that all TracedBuffer elements in the list are flushed out on /// shutdown. /// Also tests that arg is passed correctly. /// TEST(BufferListTest, Testshutdownflusheslist) { grpc_core::grpc_tcp_set_write_timestamps_callback( - TestShutdownFlushesListVerifier); + [](void* arg, grpc_core::Timestamps* /*ts*/, grpc_error_handle error) { + ASSERT_TRUE(error.ok()); + ASSERT_NE(arg, nullptr); + gpr_atm* done = reinterpret_cast(arg); + gpr_atm_rel_store(done, gpr_atm{1}); + }); grpc_core::TracedBufferList tb_list; #define NUM_ELEM 5 gpr_atm verifier_called[NUM_ELEM]; @@ -161,8 +144,6 @@ TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) { struct sock_extended_err serr[3]; gpr_atm verifier_called[3]; struct grpc_core::scm_timestamping tss; - grpc_core::grpc_tcp_set_write_timestamps_callback( - TestVerifierCalledOnDelayedAckVerifier); grpc_core::TracedBufferList tb_list; serr[0].ee_data = 1; serr[0].ee_info = grpc_core::SCM_TSTAMP_SCHED; @@ -184,6 +165,11 @@ TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) { tss.ts[0].tv_nsec = g_now.tv_nsec; // Process SCHED Timestamp for 1st traced buffer. + // Nothing should be flushed. + grpc_core::grpc_tcp_set_write_timestamps_callback( + [](void*, grpc_core::Timestamps*, grpc_error_handle) { + ASSERT_TRUE(false); + }); tb_list.ProcessTimestamp(&serr[0], nullptr, &tss); ASSERT_EQ(tb_list.Size(), 3); ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast(0)); @@ -195,24 +181,42 @@ TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) { tss.ts[0].tv_nsec = g_now.tv_nsec; // Process SND Timestamp for 1st traced buffer. The second and third traced - // buffers must have been flushed because the max pending ack time would have + // buffers must be flushed because the max pending ack time would have // elapsed for them. + grpc_core::grpc_tcp_set_write_timestamps_callback( + [](void* arg, grpc_core::Timestamps*, grpc_error_handle error) { + ASSERT_EQ(error, absl::DeadlineExceededError("Ack timed out")); + ASSERT_NE(arg, nullptr); + gpr_atm* done = reinterpret_cast(arg); + gpr_atm_rel_store(done, static_cast(1)); + }); tb_list.ProcessTimestamp(&serr[1], nullptr, &tss); ASSERT_EQ(tb_list.Size(), 1); ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast(0)); - ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast(0)); - ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast(0)); + ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast(1)); + ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast(1)); AdvanceClockMillis(kMaxPendingAckMillis); tss.ts[0].tv_sec = g_now.tv_sec; tss.ts[0].tv_nsec = g_now.tv_nsec; // Process ACK Timestamp for 1st traced buffer. + grpc_core::grpc_tcp_set_write_timestamps_callback( + [](void* arg, grpc_core::Timestamps* ts, grpc_error_handle error) { + ASSERT_TRUE(error.ok()); + ASSERT_NE(arg, nullptr); + ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME); + ASSERT_EQ(ts->acked_time.time.tv_sec, g_now.tv_sec); + ASSERT_EQ(ts->acked_time.time.tv_nsec, g_now.tv_nsec); + ASSERT_GT(ts->info.length, 0); + gpr_atm* done = reinterpret_cast(arg); + gpr_atm_rel_store(done, static_cast(2)); + }); tb_list.ProcessTimestamp(&serr[2], nullptr, &tss); ASSERT_EQ(tb_list.Size(), 0); - ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast(1)); - ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast(0)); - ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast(0)); + ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast(2)); + ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast(1)); + ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast(1)); tb_list.Shutdown(nullptr, absl::OkStatus()); } @@ -226,7 +230,20 @@ TEST(BufferListTest, TestLongPendingAckForSomeTracedBuffers) { tss.ts[0].tv_sec = 123; tss.ts[0].tv_nsec = 456; grpc_core::grpc_tcp_set_write_timestamps_callback( - TestVerifierCalledOnAckVerifier); + [](void* arg, grpc_core::Timestamps* ts, grpc_error_handle error) { + ASSERT_NE(arg, nullptr); + if (error.ok()) { + ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME); + ASSERT_EQ(ts->acked_time.time.tv_sec, 123); + ASSERT_EQ(ts->acked_time.time.tv_nsec, 456); + ASSERT_GT(ts->info.length, 0); + *(reinterpret_cast(arg)) = 1; + } else if (error == absl::DeadlineExceededError("Ack timed out")) { + *(reinterpret_cast(arg)) = 2; + } else { + ASSERT_TRUE(false); + } + }); grpc_core::TracedBufferList tb_list; for (int i = 0; i < kNumTracedBuffers; i++) { serr[i].ee_data = i + 1; @@ -254,7 +271,7 @@ TEST(BufferListTest, TestLongPendingAckForSomeTracedBuffers) { static_cast(1)); } else { ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]), - static_cast(0)); + static_cast(2)); } } else { ASSERT_EQ(tb_list.Size(), kNumTracedBuffers - (i + 1));