[fix] don't leak when a range times out unacked (#34064)

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/34078/head
Yousuk Seung 2 years ago committed by GitHub
parent 64a318acd4
commit 48aa2ff1e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
  2. 2
      src/core/lib/iomgr/buffer_list.cc
  3. 87
      test/core/event_engine/posix/traced_buffer_list_test.cc
  4. 83
      test/core/iomgr/buffer_list_test.cc

@ -279,6 +279,8 @@ void TracedBufferList::ProcessTimestamp(struct sock_extended_err* serr,
elem = elem->next_;
continue;
}
g_timestamps_callback(elem->arg_, &(elem->ts_),
absl::DeadlineExceededError("Ack timed out"));
if (prev != nullptr) {
prev->next_ = elem->next_;
delete elem;

@ -278,6 +278,8 @@ void TracedBufferList::ProcessTimestamp(struct sock_extended_err* serr,
elem = elem->next_;
continue;
}
g_timestamps_callback(elem->arg_, &(elem->ts_),
absl::DeadlineExceededError("Ack timed out"));
if (prev != nullptr) {
prev->next_ = elem->next_;
delete elem;

@ -73,30 +73,6 @@ void TestShutdownFlushesListVerifier(void* arg, Timestamps* /*ts*/,
*done = 1;
}
void TestVerifierCalledOnAckVerifier(void* arg, Timestamps* ts,
absl::Status status) {
ASSERT_TRUE(status.ok());
ASSERT_NE(arg, nullptr);
ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
ASSERT_EQ(ts->acked_time.time.tv_sec, 123);
ASSERT_EQ(ts->acked_time.time.tv_nsec, 456);
ASSERT_GT(ts->info.length, 0);
int* done = reinterpret_cast<int*>(arg);
*done = 1;
}
void TestVerifierCalledOnDelayedAckVerifier(void* arg, Timestamps* ts,
absl::Status error) {
ASSERT_TRUE(error.ok());
ASSERT_NE(arg, nullptr);
ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
ASSERT_EQ(ts->acked_time.time.tv_sec, g_now.tv_sec);
ASSERT_EQ(ts->acked_time.time.tv_nsec, g_now.tv_nsec);
ASSERT_GT(ts->info.length, 0);
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
}
} // namespace
// Tests that all TracedBuffer elements in the list are flushed out on shutdown.
@ -124,7 +100,17 @@ TEST(BufferListTest, TestVerifierCalledOnAck) {
struct scm_timestamping tss;
tss.ts[0].tv_sec = 123;
tss.ts[0].tv_nsec = 456;
TcpSetWriteTimestampsCallback(TestVerifierCalledOnAckVerifier);
TcpSetWriteTimestampsCallback(
[](void* arg, Timestamps* ts, absl::Status status) {
ASSERT_TRUE(status.ok());
ASSERT_NE(arg, nullptr);
ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
ASSERT_EQ(ts->acked_time.time.tv_sec, 123);
ASSERT_EQ(ts->acked_time.time.tv_nsec, 456);
ASSERT_GT(ts->info.length, 0);
int* done = reinterpret_cast<int*>(arg);
*done = 1;
});
TracedBufferList traced_buffers;
int verifier_called = 0;
traced_buffers.AddNewEntry(213, 0, &verifier_called);
@ -168,7 +154,6 @@ TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) {
struct sock_extended_err serr[3];
gpr_atm verifier_called[3];
struct scm_timestamping tss;
TcpSetWriteTimestampsCallback(TestVerifierCalledOnDelayedAckVerifier);
TracedBufferList tb_list;
serr[0].ee_data = 1;
serr[0].ee_info = SCM_TSTAMP_SCHED;
@ -190,6 +175,9 @@ TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) {
tss.ts[0].tv_nsec = g_now.tv_nsec;
// Process SCHED Timestamp for 1st traced buffer.
// Nothing should be flushed.
TcpSetWriteTimestampsCallback(
[](void*, Timestamps*, absl::Status) { ASSERT_TRUE(false); });
tb_list.ProcessTimestamp(&serr[0], nullptr, &tss);
ASSERT_EQ(tb_list.Size(), 3);
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
@ -201,24 +189,41 @@ TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) {
tss.ts[0].tv_nsec = g_now.tv_nsec;
// Process SND Timestamp for 1st traced buffer. The second and third traced
// buffers must have been flushed because the max pending ack time would have
// buffers must be flushed because the max pending ack time would have
// elapsed for them.
TcpSetWriteTimestampsCallback([](void* arg, Timestamps*, absl::Status error) {
ASSERT_EQ(error, absl::DeadlineExceededError("Ack timed out"));
ASSERT_NE(arg, nullptr);
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
});
tb_list.ProcessTimestamp(&serr[1], nullptr, &tss);
ASSERT_EQ(tb_list.Size(), 1);
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(1));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(1));
AdvanceClockMillis(kMaxPendingAckMillis);
tss.ts[0].tv_sec = g_now.tv_sec;
tss.ts[0].tv_nsec = g_now.tv_nsec;
// Process ACK Timestamp for 1st traced buffer.
TcpSetWriteTimestampsCallback(
[](void* arg, Timestamps* ts, absl::Status error) {
ASSERT_TRUE(error.ok());
ASSERT_NE(arg, nullptr);
ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
ASSERT_EQ(ts->acked_time.time.tv_sec, g_now.tv_sec);
ASSERT_EQ(ts->acked_time.time.tv_nsec, g_now.tv_nsec);
ASSERT_GT(ts->info.length, 0);
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
gpr_atm_rel_store(done, static_cast<gpr_atm>(2));
});
tb_list.ProcessTimestamp(&serr[2], nullptr, &tss);
ASSERT_EQ(tb_list.Size(), 0);
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(1));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(2));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(1));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(1));
tb_list.Shutdown(nullptr, absl::OkStatus());
}
@ -231,7 +236,21 @@ TEST(BufferListTest, TestLongPendingAckForSomeTracedBuffers) {
struct scm_timestamping tss;
tss.ts[0].tv_sec = 123;
tss.ts[0].tv_nsec = 456;
TcpSetWriteTimestampsCallback(TestVerifierCalledOnAckVerifier);
TcpSetWriteTimestampsCallback(
[](void* arg, Timestamps* ts, absl::Status status) {
ASSERT_NE(arg, nullptr);
if (status.ok()) {
ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
ASSERT_EQ(ts->acked_time.time.tv_sec, 123);
ASSERT_EQ(ts->acked_time.time.tv_nsec, 456);
ASSERT_GT(ts->info.length, 0);
*(reinterpret_cast<int*>(arg)) = 1;
} else if (status == absl::DeadlineExceededError("Ack timed out")) {
*(reinterpret_cast<int*>(arg)) = 2;
} else {
ASSERT_TRUE(false);
}
});
TracedBufferList tb_list;
for (int i = 0; i < kNumTracedBuffers; i++) {
serr[i].ee_data = i + 1;
@ -259,7 +278,7 @@ TEST(BufferListTest, TestLongPendingAckForSomeTracedBuffers) {
static_cast<gpr_atm>(1));
} else {
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
static_cast<gpr_atm>(0));
static_cast<gpr_atm>(2));
}
} else {
ASSERT_EQ(tb_list.Size(), kNumTracedBuffers - (i + 1));

@ -59,35 +59,18 @@ void AdvanceClockMillis(uint64_t millis) {
exec_ctx.InvalidateNow();
}
static void TestShutdownFlushesListVerifier(void* arg,
grpc_core::Timestamps* /*ts*/,
grpc_error_handle error) {
ASSERT_TRUE(error.ok());
ASSERT_NE(arg, nullptr);
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
gpr_atm_rel_store(done, gpr_atm{1});
}
static void TestVerifierCalledOnDelayedAckVerifier(void* arg,
grpc_core::Timestamps* ts,
grpc_error_handle error) {
ASSERT_TRUE(error.ok());
ASSERT_NE(arg, nullptr);
ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
ASSERT_EQ(ts->acked_time.time.tv_sec, g_now.tv_sec);
ASSERT_EQ(ts->acked_time.time.tv_nsec, g_now.tv_nsec);
ASSERT_GT(ts->info.length, 0);
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
}
/// Tests that all TracedBuffer elements in the list are flushed out on
/// shutdown.
/// Also tests that arg is passed correctly.
///
TEST(BufferListTest, Testshutdownflusheslist) {
grpc_core::grpc_tcp_set_write_timestamps_callback(
TestShutdownFlushesListVerifier);
[](void* arg, grpc_core::Timestamps* /*ts*/, grpc_error_handle error) {
ASSERT_TRUE(error.ok());
ASSERT_NE(arg, nullptr);
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
gpr_atm_rel_store(done, gpr_atm{1});
});
grpc_core::TracedBufferList tb_list;
#define NUM_ELEM 5
gpr_atm verifier_called[NUM_ELEM];
@ -161,8 +144,6 @@ TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) {
struct sock_extended_err serr[3];
gpr_atm verifier_called[3];
struct grpc_core::scm_timestamping tss;
grpc_core::grpc_tcp_set_write_timestamps_callback(
TestVerifierCalledOnDelayedAckVerifier);
grpc_core::TracedBufferList tb_list;
serr[0].ee_data = 1;
serr[0].ee_info = grpc_core::SCM_TSTAMP_SCHED;
@ -184,6 +165,11 @@ TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) {
tss.ts[0].tv_nsec = g_now.tv_nsec;
// Process SCHED Timestamp for 1st traced buffer.
// Nothing should be flushed.
grpc_core::grpc_tcp_set_write_timestamps_callback(
[](void*, grpc_core::Timestamps*, grpc_error_handle) {
ASSERT_TRUE(false);
});
tb_list.ProcessTimestamp(&serr[0], nullptr, &tss);
ASSERT_EQ(tb_list.Size(), 3);
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
@ -195,24 +181,42 @@ TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) {
tss.ts[0].tv_nsec = g_now.tv_nsec;
// Process SND Timestamp for 1st traced buffer. The second and third traced
// buffers must have been flushed because the max pending ack time would have
// buffers must be flushed because the max pending ack time would have
// elapsed for them.
grpc_core::grpc_tcp_set_write_timestamps_callback(
[](void* arg, grpc_core::Timestamps*, grpc_error_handle error) {
ASSERT_EQ(error, absl::DeadlineExceededError("Ack timed out"));
ASSERT_NE(arg, nullptr);
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
});
tb_list.ProcessTimestamp(&serr[1], nullptr, &tss);
ASSERT_EQ(tb_list.Size(), 1);
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(1));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(1));
AdvanceClockMillis(kMaxPendingAckMillis);
tss.ts[0].tv_sec = g_now.tv_sec;
tss.ts[0].tv_nsec = g_now.tv_nsec;
// Process ACK Timestamp for 1st traced buffer.
grpc_core::grpc_tcp_set_write_timestamps_callback(
[](void* arg, grpc_core::Timestamps* ts, grpc_error_handle error) {
ASSERT_TRUE(error.ok());
ASSERT_NE(arg, nullptr);
ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
ASSERT_EQ(ts->acked_time.time.tv_sec, g_now.tv_sec);
ASSERT_EQ(ts->acked_time.time.tv_nsec, g_now.tv_nsec);
ASSERT_GT(ts->info.length, 0);
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
gpr_atm_rel_store(done, static_cast<gpr_atm>(2));
});
tb_list.ProcessTimestamp(&serr[2], nullptr, &tss);
ASSERT_EQ(tb_list.Size(), 0);
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(1));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(2));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(1));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(1));
tb_list.Shutdown(nullptr, absl::OkStatus());
}
@ -226,7 +230,20 @@ TEST(BufferListTest, TestLongPendingAckForSomeTracedBuffers) {
tss.ts[0].tv_sec = 123;
tss.ts[0].tv_nsec = 456;
grpc_core::grpc_tcp_set_write_timestamps_callback(
TestVerifierCalledOnAckVerifier);
[](void* arg, grpc_core::Timestamps* ts, grpc_error_handle error) {
ASSERT_NE(arg, nullptr);
if (error.ok()) {
ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
ASSERT_EQ(ts->acked_time.time.tv_sec, 123);
ASSERT_EQ(ts->acked_time.time.tv_nsec, 456);
ASSERT_GT(ts->info.length, 0);
*(reinterpret_cast<int*>(arg)) = 1;
} else if (error == absl::DeadlineExceededError("Ack timed out")) {
*(reinterpret_cast<int*>(arg)) = 2;
} else {
ASSERT_TRUE(false);
}
});
grpc_core::TracedBufferList tb_list;
for (int i = 0; i < kNumTracedBuffers; i++) {
serr[i].ee_data = i + 1;
@ -254,7 +271,7 @@ TEST(BufferListTest, TestLongPendingAckForSomeTracedBuffers) {
static_cast<gpr_atm>(1));
} else {
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
static_cast<gpr_atm>(0));
static_cast<gpr_atm>(2));
}
} else {
ASSERT_EQ(tb_list.Size(), kNumTracedBuffers - (i + 1));

Loading…
Cancel
Save