Add a check to flush stale TracedBuffer entries to prevent them growing too long. (#31588)

* Add a check to flush stale TracedBuffer entries to prevent them growing too long

* update

* fix tsan and sanity

* sanity

* review comments
pull/31666/head
Vignesh Babu 2 years ago committed by GitHub
parent 0b60025ba8
commit 1e2af690f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 31
      src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
  2. 6
      src/core/lib/event_engine/posix_engine/traced_buffer_list.h
  3. 32
      src/core/lib/iomgr/buffer_list.cc
  4. 6
      src/core/lib/iomgr/buffer_list.h
  5. 150
      test/core/event_engine/posix/traced_buffer_list_test.cc
  6. 158
      test/core/iomgr/buffer_list_test.cc

@ -198,6 +198,12 @@ void ExtractOptStatsFromCmsg(ConnectionMetrics* metrics,
}
} // namespace.
bool TracedBufferList::TracedBuffer::Finished(gpr_timespec ts) {
constexpr int kGrpcMaxPendingAckTimeMillis = 10000;
return gpr_time_to_millis(gpr_time_sub(ts, last_timestamp_)) >
kGrpcMaxPendingAckTimeMillis;
}
void TracedBufferList::AddNewEntry(int32_t seq_no, int fd, void* arg) {
TracedBuffer* new_elem = new TracedBuffer(seq_no, arg);
// Store the current time as the sendmsg time.
@ -209,6 +215,7 @@ void TracedBufferList::AddNewEntry(int32_t seq_no, int fd, void* arg) {
ExtractOptStatsFromTcpInfo(&(new_elem->ts_.sendmsg_time.metrics),
&(new_elem->ts_.info));
}
new_elem->last_timestamp_ = new_elem->ts_.sendmsg_time.time;
grpc_core::MutexLock lock(&mu_);
if (!head_) {
head_ = tail_ = new_elem;
@ -223,6 +230,7 @@ void TracedBufferList::ProcessTimestamp(struct sock_extended_err* serr,
struct scm_timestamping* tss) {
grpc_core::MutexLock lock(&mu_);
TracedBuffer* elem = head_;
TracedBuffer* prev = nullptr;
while (elem != nullptr) {
// The byte number refers to the sequence number of the last byte which this
// timestamp relates to.
@ -232,11 +240,13 @@ void TracedBufferList::ProcessTimestamp(struct sock_extended_err* serr,
FillGprFromTimestamp(&(elem->ts_.scheduled_time.time), &(tss->ts[0]));
ExtractOptStatsFromCmsg(&(elem->ts_.scheduled_time.metrics),
opt_stats);
elem->last_timestamp_ = elem->ts_.scheduled_time.time;
elem = elem->next_;
break;
case SCM_TSTAMP_SND:
FillGprFromTimestamp(&(elem->ts_.sent_time.time), &(tss->ts[0]));
ExtractOptStatsFromCmsg(&(elem->ts_.sent_time.metrics), opt_stats);
elem->last_timestamp_ = elem->ts_.sent_time.time;
elem = elem->next_;
break;
case SCM_TSTAMP_ACK:
@ -260,7 +270,26 @@ void TracedBufferList::ProcessTimestamp(struct sock_extended_err* serr,
break;
}
}
tail_ = !head_ ? head_ : tail_;
elem = head_;
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
while (elem != nullptr) {
if (!elem->Finished(now)) {
prev = elem;
elem = elem->next_;
continue;
}
if (prev != nullptr) {
prev->next_ = elem->next_;
delete elem;
elem = prev->next_;
} else {
head_ = elem->next_;
delete elem;
elem = head_;
}
}
tail_ = (head_ == nullptr) ? head_ : prev;
}
void TracedBufferList::Shutdown(void* remaining, absl::Status shutdown_err) {

@ -110,6 +110,8 @@ struct Timestamps {
class TracedBufferList {
public:
TracedBufferList() = default;
~TracedBufferList() = default;
// Add a new entry in the TracedBuffer list pointed to by head. Also saves
// sendmsg_time with the current timestamp.
void AddNewEntry(int32_t seq_no, int fd, void* arg);
@ -138,9 +140,13 @@ class TracedBufferList {
class TracedBuffer {
public:
TracedBuffer(uint32_t seq_no, void* arg) : seq_no_(seq_no), arg_(arg) {}
// Returns true if the TracedBuffer is considered stale at the given
// timestamp.
bool Finished(gpr_timespec ts);
private:
friend class TracedBufferList;
gpr_timespec last_timestamp_;
TracedBuffer* next_ = nullptr;
uint32_t seq_no_; /* The sequence number for the last byte in the buffer */
void* arg_; /* The arg to pass to timestamps_callback */

@ -21,6 +21,7 @@
#include "src/core/lib/iomgr/buffer_list.h"
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/port.h"
@ -195,6 +196,12 @@ int GetSocketTcpInfo(struct tcp_info* info, int fd) {
} // namespace.
bool TracedBufferList::TracedBuffer::Finished(gpr_timespec ts) {
constexpr int kGrpcMaxPendingAckTimeMillis = 10000;
return gpr_time_to_millis(gpr_time_sub(ts, last_timestamp_)) >
kGrpcMaxPendingAckTimeMillis;
}
void TracedBufferList::AddNewEntry(int32_t seq_no, int fd, void* arg) {
TracedBuffer* new_elem = new TracedBuffer(seq_no, arg);
// Store the current time as the sendmsg time.
@ -206,6 +213,7 @@ void TracedBufferList::AddNewEntry(int32_t seq_no, int fd, void* arg) {
ExtractOptStatsFromTcpInfo(&(new_elem->ts_.sendmsg_time.metrics),
&(new_elem->ts_.info));
}
new_elem->last_timestamp_ = new_elem->ts_.sendmsg_time.time;
MutexLock lock(&mu_);
if (!head_) {
head_ = tail_ = new_elem;
@ -220,6 +228,7 @@ void TracedBufferList::ProcessTimestamp(struct sock_extended_err* serr,
struct scm_timestamping* tss) {
MutexLock lock(&mu_);
TracedBuffer* elem = head_;
TracedBuffer* prev = nullptr;
while (elem != nullptr) {
// The byte number refers to the sequence number of the last byte which this
// timestamp relates to.
@ -229,11 +238,13 @@ void TracedBufferList::ProcessTimestamp(struct sock_extended_err* serr,
FillGprFromTimestamp(&(elem->ts_.scheduled_time.time), &(tss->ts[0]));
ExtractOptStatsFromCmsg(&(elem->ts_.scheduled_time.metrics),
opt_stats);
elem->last_timestamp_ = elem->ts_.scheduled_time.time;
elem = elem->next_;
break;
case SCM_TSTAMP_SND:
FillGprFromTimestamp(&(elem->ts_.sent_time.time), &(tss->ts[0]));
ExtractOptStatsFromCmsg(&(elem->ts_.sent_time.metrics), opt_stats);
elem->last_timestamp_ = elem->ts_.sent_time.time;
elem = elem->next_;
break;
case SCM_TSTAMP_ACK:
@ -257,7 +268,26 @@ void TracedBufferList::ProcessTimestamp(struct sock_extended_err* serr,
break;
}
}
tail_ = !head_ ? head_ : tail_;
elem = head_;
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
while (elem != nullptr) {
if (!elem->Finished(now)) {
prev = elem;
elem = elem->next_;
continue;
}
if (prev != nullptr) {
prev->next_ = elem->next_;
delete elem;
elem = prev->next_;
} else {
head_ = elem->next_;
delete elem;
elem = head_;
}
}
tail_ = (head_ == nullptr) ? head_ : prev;
}
void TracedBufferList::Shutdown(void* remaining, absl::Status shutdown_err) {

@ -23,6 +23,7 @@
#include "absl/types/optional.h"
#include <grpc/grpc.h>
#include <grpc/support/time.h>
#include "src/core/lib/gprpp/sync.h"
@ -111,6 +112,8 @@ struct Timestamps {
class TracedBufferList {
public:
TracedBufferList() = default;
~TracedBufferList() = default;
// Add a new entry in the TracedBuffer list pointed to by head. Also saves
// sendmsg_time with the current timestamp.
void AddNewEntry(int32_t seq_no, int fd, void* arg);
@ -140,8 +143,11 @@ class TracedBufferList {
public:
TracedBuffer(uint32_t seq_no, void* arg) : seq_no_(seq_no), arg_(arg) {}
bool Finished(gpr_timespec ts);
private:
friend class TracedBufferList;
gpr_timespec last_timestamp_;
TracedBuffer* next_ = nullptr;
uint32_t seq_no_; /* The sequence number for the last byte in the buffer */
void* arg_; /* The arg to pass to timestamps_callback */

@ -20,8 +20,9 @@
#include <grpc/grpc.h>
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/port.h"
#include "test/core/util/test_config.h"
#ifdef GRPC_LINUX_ERRQUEUE
@ -29,10 +30,37 @@
#define NUM_ELEM 5
extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
namespace grpc_event_engine {
namespace posix_engine {
namespace {
constexpr uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000;
gpr_timespec g_now;
gpr_timespec now_impl(gpr_clock_type clock_type) {
GPR_ASSERT(clock_type != GPR_TIMESPAN);
gpr_timespec ts = g_now;
ts.clock_type = clock_type;
return ts;
}
void InitGlobals() {
g_now = {1, 0, GPR_CLOCK_MONOTONIC};
grpc_core::TestOnlySetProcessEpoch(g_now);
gpr_now_impl = now_impl;
}
void AdvanceClockMillis(uint64_t millis) {
grpc_core::ExecCtx exec_ctx;
g_now = gpr_time_add(
g_now, gpr_time_from_millis(
grpc_core::Clamp(millis, uint64_t(1), kMaxAdvanceTimeMillis),
GPR_TIMESPAN));
exec_ctx.InvalidateNow();
}
void TestShutdownFlushesListVerifier(void* arg, Timestamps* /*ts*/,
absl::Status status) {
ASSERT_TRUE(status.ok());
@ -53,6 +81,18 @@ void TestVerifierCalledOnAckVerifier(void* arg, Timestamps* ts,
*done = 1;
}
void TestVerifierCalledOnDelayedAckVerifier(void* arg, Timestamps* ts,
absl::Status error) {
ASSERT_TRUE(error.ok());
ASSERT_NE(arg, nullptr);
ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
ASSERT_EQ(ts->acked_time.time.tv_sec, g_now.tv_sec);
ASSERT_EQ(ts->acked_time.time.tv_nsec, g_now.tv_nsec);
ASSERT_GT(ts->info.length, 0);
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
}
} // namespace
// Tests that all TracedBuffer elements in the list are flushed out on shutdown.
@ -119,12 +159,118 @@ TEST(BufferListTest, TestProcessTimestampAfterShutdown) {
ASSERT_EQ(verifier_called, 0);
}
TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) {
constexpr int kMaxPendingAckMillis = 10000;
struct sock_extended_err serr[3];
gpr_atm verifier_called[3];
struct scm_timestamping tss;
TcpSetWriteTimestampsCallback(TestVerifierCalledOnDelayedAckVerifier);
TracedBufferList tb_list;
serr[0].ee_data = 1;
serr[0].ee_info = SCM_TSTAMP_SCHED;
serr[1].ee_data = 1;
serr[1].ee_info = SCM_TSTAMP_SND;
serr[2].ee_data = 1;
serr[2].ee_info = SCM_TSTAMP_ACK;
gpr_atm_rel_store(&verifier_called[0], static_cast<gpr_atm>(0));
gpr_atm_rel_store(&verifier_called[1], static_cast<gpr_atm>(0));
gpr_atm_rel_store(&verifier_called[2], static_cast<gpr_atm>(0));
// Add 3 traced buffers
tb_list.AddNewEntry(1, 0, &verifier_called[0]);
tb_list.AddNewEntry(2, 0, &verifier_called[1]);
tb_list.AddNewEntry(3, 0, &verifier_called[2]);
AdvanceClockMillis(kMaxPendingAckMillis);
tss.ts[0].tv_sec = g_now.tv_sec;
tss.ts[0].tv_nsec = g_now.tv_nsec;
// Process SCHED Timestamp for 1st traced buffer.
tb_list.ProcessTimestamp(&serr[0], nullptr, &tss);
ASSERT_EQ(tb_list.Size(), 3);
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(0));
AdvanceClockMillis(kMaxPendingAckMillis);
tss.ts[0].tv_sec = g_now.tv_sec;
tss.ts[0].tv_nsec = g_now.tv_nsec;
// Process SND Timestamp for 1st traced buffer. The second and third traced
// buffers must have been flushed because the max pending ack time would have
// elapsed for them.
tb_list.ProcessTimestamp(&serr[1], nullptr, &tss);
ASSERT_EQ(tb_list.Size(), 1);
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(0));
AdvanceClockMillis(kMaxPendingAckMillis);
tss.ts[0].tv_sec = g_now.tv_sec;
tss.ts[0].tv_nsec = g_now.tv_nsec;
// Process ACK Timestamp for 1st traced buffer.
tb_list.ProcessTimestamp(&serr[2], nullptr, &tss);
ASSERT_EQ(tb_list.Size(), 0);
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(1));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(0));
tb_list.Shutdown(nullptr, absl::OkStatus());
}
TEST(BufferListTest, TestLongPendingAckForSomeTracedBuffers) {
constexpr int kNumTracedBuffers = 10;
constexpr int kMaxPendingAckMillis = 10000;
struct sock_extended_err serr[kNumTracedBuffers];
gpr_atm verifier_called[kNumTracedBuffers];
struct scm_timestamping tss;
tss.ts[0].tv_sec = 123;
tss.ts[0].tv_nsec = 456;
TcpSetWriteTimestampsCallback(TestVerifierCalledOnAckVerifier);
TracedBufferList tb_list;
for (int i = 0; i < kNumTracedBuffers; i++) {
serr[i].ee_data = i + 1;
serr[i].ee_info = SCM_TSTAMP_ACK;
gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0));
tb_list.AddNewEntry(i + 1, 0, &verifier_called[i]);
}
int elapsed_time_millis = 0;
int increment_millis = (2 * kMaxPendingAckMillis) / 10;
for (int i = 0; i < kNumTracedBuffers; i++) {
AdvanceClockMillis(increment_millis);
elapsed_time_millis += increment_millis;
tb_list.ProcessTimestamp(&serr[i], nullptr, &tss);
if (elapsed_time_millis > kMaxPendingAckMillis) {
// MaxPendingAckMillis has elapsed. the rest of tb_list must have been
// flushed now.
ASSERT_EQ(tb_list.Size(), 0);
if (elapsed_time_millis - kMaxPendingAckMillis == increment_millis) {
// The first ProcessTimestamp just after kMaxPendingAckMillis would have
// still successfully processed the head traced buffer entry and then
// discarded all the other remaining traced buffer entries. The first
// traced buffer entry would have been processed because the ACK
// timestamp was received for it.
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
static_cast<gpr_atm>(1));
} else {
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
static_cast<gpr_atm>(0));
}
} else {
ASSERT_EQ(tb_list.Size(), kNumTracedBuffers - (i + 1));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]), static_cast<gpr_atm>(1));
}
}
tb_list.Shutdown(nullptr, absl::OkStatus());
}
} // namespace posix_engine
} // namespace grpc_event_engine
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
grpc_event_engine::posix_engine::InitGlobals();
return RUN_ALL_TESTS();
}

@ -21,12 +21,44 @@
#include <gtest/gtest.h>
#include <grpc/grpc.h>
#include <grpc/support/time.h>
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/global_config_generic.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/internal_errqueue.h"
#include "src/core/lib/iomgr/port.h"
#include "test/core/util/test_config.h"
#ifdef GRPC_LINUX_ERRQUEUE
constexpr uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000;
extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
static gpr_timespec g_now;
gpr_timespec now_impl(gpr_clock_type clock_type) {
GPR_ASSERT(clock_type != GPR_TIMESPAN);
gpr_timespec ts = g_now;
ts.clock_type = clock_type;
return ts;
}
void InitGlobals() {
g_now = {1, 0, GPR_CLOCK_MONOTONIC};
grpc_core::TestOnlySetProcessEpoch(g_now);
gpr_now_impl = now_impl;
}
void AdvanceClockMillis(uint64_t millis) {
grpc_core::ExecCtx exec_ctx;
g_now = gpr_time_add(
g_now, gpr_time_from_millis(
grpc_core::Clamp(millis, uint64_t(1), kMaxAdvanceTimeMillis),
GPR_TIMESPAN));
exec_ctx.InvalidateNow();
}
static void TestShutdownFlushesListVerifier(void* arg,
grpc_core::Timestamps* /*ts*/,
grpc_error_handle error) {
@ -36,6 +68,19 @@ static void TestShutdownFlushesListVerifier(void* arg,
gpr_atm_rel_store(done, gpr_atm{1});
}
static void TestVerifierCalledOnDelayedAckVerifier(void* arg,
grpc_core::Timestamps* ts,
grpc_error_handle error) {
ASSERT_TRUE(error.ok());
ASSERT_NE(arg, nullptr);
ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
ASSERT_EQ(ts->acked_time.time.tv_sec, g_now.tv_sec);
ASSERT_EQ(ts->acked_time.time.tv_nsec, g_now.tv_nsec);
ASSERT_GT(ts->info.length, 0);
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
}
/** Tests that all TracedBuffer elements in the list are flushed out on
* shutdown.
* Also tests that arg is passed correctly.
@ -111,10 +156,117 @@ TEST(BufferListTest, Testrepeatedshutdown) {
tb_list.Shutdown(nullptr, absl::OkStatus());
}
TEST(BufferListTest, TestLongPendingAckForOneTracedBuffer) {
constexpr int kMaxPendingAckMillis = 10000;
struct sock_extended_err serr[3];
gpr_atm verifier_called[3];
struct grpc_core::scm_timestamping tss;
grpc_core::grpc_tcp_set_write_timestamps_callback(
TestVerifierCalledOnDelayedAckVerifier);
grpc_core::TracedBufferList tb_list;
serr[0].ee_data = 1;
serr[0].ee_info = grpc_core::SCM_TSTAMP_SCHED;
serr[1].ee_data = 1;
serr[1].ee_info = grpc_core::SCM_TSTAMP_SND;
serr[2].ee_data = 1;
serr[2].ee_info = grpc_core::SCM_TSTAMP_ACK;
gpr_atm_rel_store(&verifier_called[0], static_cast<gpr_atm>(0));
gpr_atm_rel_store(&verifier_called[1], static_cast<gpr_atm>(0));
gpr_atm_rel_store(&verifier_called[2], static_cast<gpr_atm>(0));
// Add 3 traced buffers
tb_list.AddNewEntry(1, 0, &verifier_called[0]);
tb_list.AddNewEntry(2, 0, &verifier_called[1]);
tb_list.AddNewEntry(3, 0, &verifier_called[2]);
AdvanceClockMillis(kMaxPendingAckMillis);
tss.ts[0].tv_sec = g_now.tv_sec;
tss.ts[0].tv_nsec = g_now.tv_nsec;
// Process SCHED Timestamp for 1st traced buffer.
tb_list.ProcessTimestamp(&serr[0], nullptr, &tss);
ASSERT_EQ(tb_list.Size(), 3);
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(0));
AdvanceClockMillis(kMaxPendingAckMillis);
tss.ts[0].tv_sec = g_now.tv_sec;
tss.ts[0].tv_nsec = g_now.tv_nsec;
// Process SND Timestamp for 1st traced buffer. The second and third traced
// buffers must have been flushed because the max pending ack time would have
// elapsed for them.
tb_list.ProcessTimestamp(&serr[1], nullptr, &tss);
ASSERT_EQ(tb_list.Size(), 1);
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(0));
AdvanceClockMillis(kMaxPendingAckMillis);
tss.ts[0].tv_sec = g_now.tv_sec;
tss.ts[0].tv_nsec = g_now.tv_nsec;
// Process ACK Timestamp for 1st traced buffer.
tb_list.ProcessTimestamp(&serr[2], nullptr, &tss);
ASSERT_EQ(tb_list.Size(), 0);
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[0]), static_cast<gpr_atm>(1));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[1]), static_cast<gpr_atm>(0));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[2]), static_cast<gpr_atm>(0));
tb_list.Shutdown(nullptr, absl::OkStatus());
}
TEST(BufferListTest, TestLongPendingAckForSomeTracedBuffers) {
constexpr int kNumTracedBuffers = 10;
constexpr int kMaxPendingAckMillis = 10000;
struct sock_extended_err serr[kNumTracedBuffers];
gpr_atm verifier_called[kNumTracedBuffers];
struct grpc_core::scm_timestamping tss;
tss.ts[0].tv_sec = 123;
tss.ts[0].tv_nsec = 456;
grpc_core::grpc_tcp_set_write_timestamps_callback(
TestVerifierCalledOnAckVerifier);
grpc_core::TracedBufferList tb_list;
for (int i = 0; i < kNumTracedBuffers; i++) {
serr[i].ee_data = i + 1;
serr[i].ee_info = grpc_core::SCM_TSTAMP_ACK;
gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0));
tb_list.AddNewEntry(i + 1, 0, &verifier_called[i]);
}
int elapsed_time_millis = 0;
int increment_millis = (2 * kMaxPendingAckMillis) / 10;
for (int i = 0; i < kNumTracedBuffers; i++) {
AdvanceClockMillis(increment_millis);
elapsed_time_millis += increment_millis;
tb_list.ProcessTimestamp(&serr[i], nullptr, &tss);
if (elapsed_time_millis > kMaxPendingAckMillis) {
// MaxPendingAckMillis has elapsed. the rest of tb_list must have been
// flushed now.
ASSERT_EQ(tb_list.Size(), 0);
if (elapsed_time_millis - kMaxPendingAckMillis == increment_millis) {
// The first ProcessTimestamp just after kMaxPendingAckMillis would have
// still successfully processed the head traced buffer entry and then
// discarded all the other remaining traced buffer entries. The first
// traced buffer entry would have been processed because the ACK
// timestamp was received for it.
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
static_cast<gpr_atm>(1));
} else {
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]),
static_cast<gpr_atm>(0));
}
} else {
ASSERT_EQ(tb_list.Size(), kNumTracedBuffers - (i + 1));
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]), static_cast<gpr_atm>(1));
}
}
tb_list.Shutdown(nullptr, absl::OkStatus());
}
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestGrpcScope grpc_scope;
InitGlobals();
return RUN_ALL_TESTS();
}

Loading…
Cancel
Save