C++ize TracedBuffer Interface and eliminate lock contention during getsockopt operation (#31565)

* C++ize TracedBuffer Interface and fix bug to eliminate lock contention during getsockopt operation

* fix

* fix sanity

* fix sanity

* review comments
pull/31575/head
Vignesh Babu 2 years ago committed by GitHub
parent 1b00515f90
commit 40ec6850d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      src/core/lib/event_engine/posix_engine/posix_endpoint.cc
  2. 3
      src/core/lib/event_engine/posix_engine/posix_endpoint.h
  3. 71
      src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
  4. 22
      src/core/lib/event_engine/posix_engine/traced_buffer_list.h
  5. 220
      src/core/lib/iomgr/buffer_list.cc
  6. 102
      src/core/lib/iomgr/buffer_list.h
  7. 35
      src/core/lib/iomgr/tcp_posix.cc
  8. 30
      test/core/iomgr/buffer_list_test.cc

@ -759,13 +759,7 @@ struct cmsghdr* PosixEndpointImpl::ProcessTimestamp(msghdr* msg,
gpr_log(GPR_ERROR, "Unexpected control message");
return cmsg;
}
// The error handling can potentially be done on another thread so we need to
// protect the traced buffer list. A lock free list might be better. Using a
// simple mutex for now.
{
grpc_core::MutexLock lock(&traced_buffer_mu_);
traced_buffers_.ProcessTimestamp(serr, opt_stats, tss);
}
traced_buffers_.ProcessTimestamp(serr, opt_stats, tss);
return next_cmsg;
}
@ -820,10 +814,8 @@ bool PosixEndpointImpl::WriteWithTimestamps(struct msghdr* msg,
*sent_length = length;
// Only save timestamps if all the bytes were taken by sendmsg.
if (sending_length == static_cast<size_t>(length)) {
traced_buffer_mu_.Lock();
traced_buffers_.AddNewEntry(static_cast<uint32_t>(bytes_counter_ + length),
fd_, outgoing_buffer_arg_);
traced_buffer_mu_.Unlock();
outgoing_buffer_arg_ = nullptr;
}
return true;
@ -861,10 +853,8 @@ void PosixEndpointImpl::UnrefMaybePutZerocopySendRecord(
// release operations needed can be performed on the arg.
void PosixEndpointImpl::TcpShutdownTracedBufferList() {
if (outgoing_buffer_arg_ != nullptr) {
traced_buffer_mu_.Lock();
traced_buffers_.Shutdown(outgoing_buffer_arg_,
absl::InternalError("TracedBuffer list shutdown"));
traced_buffer_mu_.Unlock();
outgoing_buffer_arg_ = nullptr;
}
}

@ -521,7 +521,6 @@ class PosixEndpointImpl : public grpc_core::RefCounted<PosixEndpointImpl> {
struct cmsghdr* ProcessTimestamp(msghdr* msg, struct cmsghdr* cmsg);
#endif // GRPC_LINUX_ERRQUEUE
grpc_core::Mutex read_mu_;
grpc_core::Mutex traced_buffer_mu_;
PosixSocketWrapper sock_;
int fd_;
bool is_first_read_ = true;
@ -579,7 +578,7 @@ class PosixEndpointImpl : public grpc_core::RefCounted<PosixEndpointImpl> {
// A hint from upper layers specifying the minimum number of bytes that need
// to be read to make meaningful progress.
int min_progress_size_ = 1;
TracedBufferList traced_buffers_ ABSL_GUARDED_BY(traced_buffer_mu_);
TracedBufferList traced_buffers_;
// The handle is owned by the PosixEndpointImpl object.
EventHandle* handle_;
PosixEventPoller* poller_;

@ -28,6 +28,7 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_LINUX_ERRQUEUE
@ -198,49 +199,59 @@ void ExtractOptStatsFromCmsg(ConnectionMetrics* metrics,
} // namespace.
void TracedBufferList::AddNewEntry(int32_t seq_no, int fd, void* arg) {
buffer_list_.emplace_back(seq_no, arg);
TracedBuffer& new_elem = buffer_list_.back();
TracedBuffer* new_elem = new TracedBuffer(seq_no, arg);
// Store the current time as the sendmsg time.
new_elem.ts_.sendmsg_time.time = gpr_now(GPR_CLOCK_REALTIME);
new_elem.ts_.scheduled_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
new_elem.ts_.sent_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
new_elem.ts_.acked_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
if (GetSocketTcpInfo(&new_elem.ts_.info, fd) == 0) {
ExtractOptStatsFromTcpInfo(&new_elem.ts_.sendmsg_time.metrics,
&new_elem.ts_.info);
new_elem->ts_.sendmsg_time.time = gpr_now(GPR_CLOCK_REALTIME);
new_elem->ts_.scheduled_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
new_elem->ts_.sent_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
new_elem->ts_.acked_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
if (GetSocketTcpInfo(&(new_elem->ts_.info), fd) == 0) {
ExtractOptStatsFromTcpInfo(&(new_elem->ts_.sendmsg_time.metrics),
&(new_elem->ts_.info));
}
grpc_core::MutexLock lock(&mu_);
if (!head_) {
head_ = tail_ = new_elem;
} else {
tail_->next_ = new_elem;
tail_ = new_elem;
}
}
void TracedBufferList::ProcessTimestamp(struct sock_extended_err* serr,
struct cmsghdr* opt_stats,
struct scm_timestamping* tss) {
auto it = buffer_list_.begin();
while (it != buffer_list_.end()) {
TracedBuffer& elem = (*it);
grpc_core::MutexLock lock(&mu_);
TracedBuffer* elem = head_;
while (elem != nullptr) {
// The byte number refers to the sequence number of the last byte which this
// timestamp relates to.
if (serr->ee_data >= elem.seq_no_) {
if (serr->ee_data >= elem->seq_no_) {
switch (serr->ee_info) {
case SCM_TSTAMP_SCHED:
FillGprFromTimestamp(&(elem.ts_.scheduled_time.time), &(tss->ts[0]));
ExtractOptStatsFromCmsg(&(elem.ts_.scheduled_time.metrics),
FillGprFromTimestamp(&(elem->ts_.scheduled_time.time), &(tss->ts[0]));
ExtractOptStatsFromCmsg(&(elem->ts_.scheduled_time.metrics),
opt_stats);
++it;
elem = elem->next_;
break;
case SCM_TSTAMP_SND:
FillGprFromTimestamp(&(elem.ts_.sent_time.time), &(tss->ts[0]));
ExtractOptStatsFromCmsg(&(elem.ts_.sent_time.metrics), opt_stats);
++it;
FillGprFromTimestamp(&(elem->ts_.sent_time.time), &(tss->ts[0]));
ExtractOptStatsFromCmsg(&(elem->ts_.sent_time.metrics), opt_stats);
elem = elem->next_;
break;
case SCM_TSTAMP_ACK:
FillGprFromTimestamp(&(elem.ts_.acked_time.time), &(tss->ts[0]));
ExtractOptStatsFromCmsg(&(elem.ts_.acked_time.metrics), opt_stats);
FillGprFromTimestamp(&(elem->ts_.acked_time.time), &(tss->ts[0]));
ExtractOptStatsFromCmsg(&(elem->ts_.acked_time.metrics), opt_stats);
// Got all timestamps. Do the callback and free this TracedBuffer. The
// thing below can be passed by value if we don't want the restriction
// on the lifetime.
g_timestamps_callback(elem.arg_, &(elem.ts_), absl::OkStatus());
it = buffer_list_.erase(it);
g_timestamps_callback(elem->arg_, &(elem->ts_), absl::OkStatus());
// Safe to update head_ to elem->next_ because the list is ordered by
// seq_no. Thus if elem is to be deleted, it has to be the first
// element in the list.
head_ = elem->next_;
delete elem;
elem = head_;
break;
default:
abort();
@ -249,17 +260,21 @@ void TracedBufferList::ProcessTimestamp(struct sock_extended_err* serr,
break;
}
}
tail_ = !head_ ? head_ : tail_;
}
void TracedBufferList::Shutdown(void* remaining, absl::Status shutdown_err) {
while (!buffer_list_.empty()) {
TracedBuffer& elem = buffer_list_.front();
g_timestamps_callback(elem.arg_, &(elem.ts_), shutdown_err);
buffer_list_.pop_front();
grpc_core::MutexLock lock(&mu_);
while (head_) {
TracedBuffer* elem = head_;
g_timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err);
head_ = head_->next_;
delete elem;
}
if (remaining != nullptr) {
g_timestamps_callback(remaining, nullptr, shutdown_err);
}
tail_ = head_;
}
void TcpSetWriteTimestampsCallback(

@ -28,6 +28,7 @@
#include <grpc/impl/codegen/gpr_types.h>
#include "src/core/lib/event_engine/posix_engine/internal_errqueue.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/port.h"
namespace grpc_event_engine {
@ -120,7 +121,17 @@ class TracedBufferList {
void ProcessTimestamp(struct sock_extended_err* serr,
struct cmsghdr* opt_stats,
struct scm_timestamping* tss);
int Size() { return buffer_list_.size(); }
// The Size() operation is slow and is used only in tests.
int Size() {
grpc_core::MutexLock lock(&mu_);
int size = 0;
TracedBuffer* curr = head_;
while (curr) {
++size;
curr = curr->next_;
}
return size;
}
// Cleans the list by calling the callback for each traced buffer in the list
// with timestamps that it has.
void Shutdown(void* /*remaining*/, absl::Status /*shutdown_err*/);
@ -132,15 +143,18 @@ class TracedBufferList {
private:
friend class TracedBufferList;
TracedBuffer* next_ = nullptr;
uint32_t seq_no_; /* The sequence number for the last byte in the buffer */
void* arg_; /* The arg to pass to timestamps_callback */
Timestamps ts_; /* The timestamps corresponding to this buffer */
};
grpc_core::Mutex mu_;
// TracedBuffers are ordered by sequence number and would need to be processed
// in a FIFO order starting with the smallest sequence number. To enable this,
// they are stored in a std::list which allows easy appends and forward
// iteration operations.
std::list<TracedBuffer> buffer_list_;
// they are stored in a singly linked with head and tail pointers which allows
// easy appends and forward iteration operations.
TracedBuffer* head_ = nullptr;
TracedBuffer* tail_ = nullptr;
};
#else /* GRPC_LINUX_ERRQUEUE */

@ -22,6 +22,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_LINUX_ERRQUEUE
@ -29,72 +30,70 @@
#include <string.h>
#include <time.h>
#include "src/core/lib/gprpp/memory.h"
namespace grpc_core {
namespace {
/** Fills gpr_timespec gts based on values from timespec ts */
void fill_gpr_from_timestamp(gpr_timespec* gts, const struct timespec* ts) {
// Fills gpr_timespec gts based on values from timespec ts.
void FillGprFromTimestamp(gpr_timespec* gts, const struct timespec* ts) {
gts->tv_sec = ts->tv_sec;
gts->tv_nsec = static_cast<int32_t>(ts->tv_nsec);
gts->clock_type = GPR_CLOCK_REALTIME;
}
void default_timestamps_callback(void* /*arg*/, Timestamps* /*ts*/,
grpc_error_handle /*shudown_err*/) {
void DefaultTimestampsCallback(void* /*arg*/, Timestamps* /*ts*/,
absl::Status /*shudown_err*/) {
gpr_log(GPR_DEBUG, "Timestamps callback has not been registered");
}
/** The saved callback function that will be invoked when we get all the
* timestamps that we are going to get for a TracedBuffer. */
void (*timestamps_callback)(void*, Timestamps*,
grpc_error_handle shutdown_err) =
default_timestamps_callback;
// The saved callback function that will be invoked when we get all the
// timestamps that we are going to get for a TracedBuffer.
void (*g_timestamps_callback)(void*, Timestamps*,
grpc_error_handle shutdown_err) =
DefaultTimestampsCallback;
/* Used to extract individual opt stats from cmsg, so as to avoid troubles with
* unaligned reads */
// Used to extract individual opt stats from cmsg, so as to avoid troubles with
// unaligned reads.
template <typename T>
T read_unaligned(const void* ptr) {
T ReadUnaligned(const void* ptr) {
T val;
memcpy(&val, ptr, sizeof(val));
return val;
}
/* Extracts opt stats from the tcp_info struct \a info to \a metrics */
void extract_opt_stats_from_tcp_info(ConnectionMetrics* metrics,
const tcp_info* info) {
// Extracts opt stats from the tcp_info struct \a info to \a metrics
void ExtractOptStatsFromTcpInfo(ConnectionMetrics* metrics,
const tcp_info* info) {
if (info == nullptr) {
return;
}
if (info->length > offsetof(tcp_info, tcpi_sndbuf_limited)) {
metrics->recurring_retrans.emplace(info->tcpi_retransmits);
metrics->is_delivery_rate_app_limited.emplace(
info->tcpi_delivery_rate_app_limited);
metrics->congestion_window.emplace(info->tcpi_snd_cwnd);
metrics->reordering.emplace(info->tcpi_reordering);
metrics->packet_retx.emplace(info->tcpi_total_retrans);
metrics->pacing_rate.emplace(info->tcpi_pacing_rate);
metrics->data_notsent.emplace(info->tcpi_notsent_bytes);
metrics->recurring_retrans = info->tcpi_retransmits;
metrics->is_delivery_rate_app_limited =
info->tcpi_delivery_rate_app_limited;
metrics->congestion_window = info->tcpi_snd_cwnd;
metrics->reordering = info->tcpi_reordering;
metrics->packet_retx = info->tcpi_total_retrans;
metrics->pacing_rate = info->tcpi_pacing_rate;
metrics->data_notsent = info->tcpi_notsent_bytes;
if (info->tcpi_min_rtt != UINT32_MAX) {
metrics->min_rtt.emplace(info->tcpi_min_rtt);
metrics->min_rtt = info->tcpi_min_rtt;
}
metrics->packet_sent.emplace(info->tcpi_data_segs_out);
metrics->delivery_rate.emplace(info->tcpi_delivery_rate);
metrics->busy_usec.emplace(info->tcpi_busy_time);
metrics->rwnd_limited_usec.emplace(info->tcpi_rwnd_limited);
metrics->sndbuf_limited_usec.emplace(info->tcpi_sndbuf_limited);
metrics->packet_sent = info->tcpi_data_segs_out;
metrics->delivery_rate = info->tcpi_delivery_rate;
metrics->busy_usec = info->tcpi_busy_time;
metrics->rwnd_limited_usec = info->tcpi_rwnd_limited;
metrics->sndbuf_limited_usec = info->tcpi_sndbuf_limited;
}
if (info->length > offsetof(tcp_info, tcpi_dsack_dups)) {
metrics->data_sent.emplace(info->tcpi_bytes_sent);
metrics->data_retx.emplace(info->tcpi_bytes_retrans);
metrics->packet_spurious_retx.emplace(info->tcpi_dsack_dups);
metrics->data_sent = info->tcpi_bytes_sent;
metrics->data_retx = info->tcpi_bytes_retrans;
metrics->packet_spurious_retx = info->tcpi_dsack_dups;
}
}
/** Extracts opt stats from the given control message \a opt_stats to the
* connection metrics \a metrics */
void extract_opt_stats_from_cmsg(ConnectionMetrics* metrics,
const cmsghdr* opt_stats) {
// Extracts opt stats from the given control message \a opt_stats to the
// connection metrics \a metrics.
void ExtractOptStatsFromCmsg(ConnectionMetrics* metrics,
const cmsghdr* opt_stats) {
if (opt_stats == nullptr) {
return;
}
@ -108,80 +107,79 @@ void extract_opt_stats_from_cmsg(ConnectionMetrics* metrics,
const void* val = data + offset + NLA_HDRLEN;
switch (attr->nla_type) {
case TCP_NLA_BUSY: {
metrics->busy_usec.emplace(read_unaligned<uint64_t>(val));
metrics->busy_usec = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_RWND_LIMITED: {
metrics->rwnd_limited_usec.emplace(read_unaligned<uint64_t>(val));
metrics->rwnd_limited_usec = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_SNDBUF_LIMITED: {
metrics->sndbuf_limited_usec.emplace(read_unaligned<uint64_t>(val));
metrics->sndbuf_limited_usec = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_PACING_RATE: {
metrics->pacing_rate.emplace(read_unaligned<uint64_t>(val));
metrics->pacing_rate = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_DELIVERY_RATE: {
metrics->delivery_rate.emplace(read_unaligned<uint64_t>(val));
metrics->delivery_rate = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_DELIVERY_RATE_APP_LMT: {
metrics->is_delivery_rate_app_limited.emplace(
read_unaligned<uint8_t>(val));
metrics->is_delivery_rate_app_limited = ReadUnaligned<uint8_t>(val);
break;
}
case TCP_NLA_SND_CWND: {
metrics->congestion_window.emplace(read_unaligned<uint32_t>(val));
metrics->congestion_window = ReadUnaligned<uint32_t>(val);
break;
}
case TCP_NLA_MIN_RTT: {
metrics->min_rtt.emplace(read_unaligned<uint32_t>(val));
metrics->min_rtt = ReadUnaligned<uint32_t>(val);
break;
}
case TCP_NLA_SRTT: {
metrics->srtt.emplace(read_unaligned<uint32_t>(val));
metrics->srtt = ReadUnaligned<uint32_t>(val);
break;
}
case TCP_NLA_RECUR_RETRANS: {
metrics->recurring_retrans.emplace(read_unaligned<uint8_t>(val));
metrics->recurring_retrans = ReadUnaligned<uint8_t>(val);
break;
}
case TCP_NLA_BYTES_SENT: {
metrics->data_sent.emplace(read_unaligned<uint64_t>(val));
metrics->data_sent = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_DATA_SEGS_OUT: {
metrics->packet_sent.emplace(read_unaligned<uint64_t>(val));
metrics->packet_sent = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_TOTAL_RETRANS: {
metrics->packet_retx.emplace(read_unaligned<uint64_t>(val));
metrics->packet_retx = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_DELIVERED: {
metrics->packet_delivered.emplace(read_unaligned<uint32_t>(val));
metrics->packet_delivered = ReadUnaligned<uint32_t>(val);
break;
}
case TCP_NLA_DELIVERED_CE: {
metrics->packet_delivered_ce.emplace(read_unaligned<uint32_t>(val));
metrics->packet_delivered_ce = ReadUnaligned<uint32_t>(val);
break;
}
case TCP_NLA_BYTES_RETRANS: {
metrics->data_retx.emplace(read_unaligned<uint64_t>(val));
metrics->data_retx = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_DSACK_DUPS: {
metrics->packet_spurious_retx.emplace(read_unaligned<uint32_t>(val));
metrics->packet_spurious_retx = ReadUnaligned<uint32_t>(val);
break;
}
case TCP_NLA_REORDERING: {
metrics->reordering.emplace(read_unaligned<uint32_t>(val));
metrics->reordering = ReadUnaligned<uint32_t>(val);
break;
}
case TCP_NLA_SND_SSTHRESH: {
metrics->snd_ssthresh.emplace(read_unaligned<uint32_t>(val));
metrics->snd_ssthresh = ReadUnaligned<uint32_t>(val);
break;
}
}
@ -189,75 +187,68 @@ void extract_opt_stats_from_cmsg(ConnectionMetrics* metrics,
}
}
int get_socket_tcp_info(tcp_info* info, int fd) {
int GetSocketTcpInfo(struct tcp_info* info, int fd) {
memset(info, 0, sizeof(*info));
info->length = offsetof(tcp_info, length);
return getsockopt(fd, IPPROTO_TCP, TCP_INFO, info, &(info->length));
}
} /* namespace */
void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no, int fd,
void* arg) {
GPR_DEBUG_ASSERT(head != nullptr);
} // namespace.
void TracedBufferList::AddNewEntry(int32_t seq_no, int fd, void* arg) {
TracedBuffer* new_elem = new TracedBuffer(seq_no, arg);
/* Store the current time as the sendmsg time. */
// Store the current time as the sendmsg time.
new_elem->ts_.sendmsg_time.time = gpr_now(GPR_CLOCK_REALTIME);
new_elem->ts_.scheduled_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
new_elem->ts_.sent_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
new_elem->ts_.acked_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
if (get_socket_tcp_info(&new_elem->ts_.info, fd) == 0) {
extract_opt_stats_from_tcp_info(&new_elem->ts_.sendmsg_time.metrics,
&new_elem->ts_.info);
if (GetSocketTcpInfo(&(new_elem->ts_.info), fd) == 0) {
ExtractOptStatsFromTcpInfo(&(new_elem->ts_.sendmsg_time.metrics),
&(new_elem->ts_.info));
}
if (*head == nullptr) {
*head = new_elem;
return;
MutexLock lock(&mu_);
if (!head_) {
head_ = tail_ = new_elem;
} else {
tail_->next_ = new_elem;
tail_ = new_elem;
}
/* Append at the end. */
TracedBuffer* ptr = *head;
while (ptr->next_ != nullptr) {
ptr = ptr->next_;
}
ptr->next_ = new_elem;
}
void TracedBuffer::ProcessTimestamp(TracedBuffer** head,
struct sock_extended_err* serr,
struct cmsghdr* opt_stats,
struct scm_timestamping* tss) {
GPR_DEBUG_ASSERT(head != nullptr);
TracedBuffer* elem = *head;
TracedBuffer* next = nullptr;
void TracedBufferList::ProcessTimestamp(struct sock_extended_err* serr,
struct cmsghdr* opt_stats,
struct scm_timestamping* tss) {
MutexLock lock(&mu_);
TracedBuffer* elem = head_;
while (elem != nullptr) {
/* The byte number refers to the sequence number of the last byte which this
* timestamp relates to. */
// The byte number refers to the sequence number of the last byte which this
// timestamp relates to.
if (serr->ee_data >= elem->seq_no_) {
switch (serr->ee_info) {
case SCM_TSTAMP_SCHED:
fill_gpr_from_timestamp(&(elem->ts_.scheduled_time.time),
&(tss->ts[0]));
extract_opt_stats_from_cmsg(&(elem->ts_.scheduled_time.metrics),
opt_stats);
FillGprFromTimestamp(&(elem->ts_.scheduled_time.time), &(tss->ts[0]));
ExtractOptStatsFromCmsg(&(elem->ts_.scheduled_time.metrics),
opt_stats);
elem = elem->next_;
break;
case SCM_TSTAMP_SND:
fill_gpr_from_timestamp(&(elem->ts_.sent_time.time), &(tss->ts[0]));
extract_opt_stats_from_cmsg(&(elem->ts_.sent_time.metrics),
opt_stats);
FillGprFromTimestamp(&(elem->ts_.sent_time.time), &(tss->ts[0]));
ExtractOptStatsFromCmsg(&(elem->ts_.sent_time.metrics), opt_stats);
elem = elem->next_;
break;
case SCM_TSTAMP_ACK:
fill_gpr_from_timestamp(&(elem->ts_.acked_time.time), &(tss->ts[0]));
extract_opt_stats_from_cmsg(&(elem->ts_.acked_time.metrics),
opt_stats);
/* Got all timestamps. Do the callback and free this TracedBuffer.
* The thing below can be passed by value if we don't want the
* restriction on the lifetime. */
timestamps_callback(elem->arg_, &(elem->ts_), absl::OkStatus());
next = elem->next_;
delete static_cast<TracedBuffer*>(elem);
*head = elem = next;
FillGprFromTimestamp(&(elem->ts_.acked_time.time), &(tss->ts[0]));
ExtractOptStatsFromCmsg(&(elem->ts_.acked_time.metrics), opt_stats);
// Got all timestamps. Do the callback and free this TracedBuffer. The
// thing below can be passed by value if we don't want the restriction
// on the lifetime.
g_timestamps_callback(elem->arg_, &(elem->ts_), absl::OkStatus());
// Safe to update head_ to elem->next_ because the list is ordered by
// seq_no. Thus if elem is to be deleted, it has to be the first
// element in the list.
head_ = elem->next_;
delete elem;
elem = head_;
break;
default:
abort();
@ -266,27 +257,26 @@ void TracedBuffer::ProcessTimestamp(TracedBuffer** head,
break;
}
}
tail_ = !head_ ? head_ : tail_;
}
void TracedBuffer::Shutdown(TracedBuffer** head, void* remaining,
grpc_error_handle shutdown_err) {
GPR_DEBUG_ASSERT(head != nullptr);
TracedBuffer* elem = *head;
while (elem != nullptr) {
timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err);
auto* next = elem->next_;
void TracedBufferList::Shutdown(void* remaining, absl::Status shutdown_err) {
MutexLock lock(&mu_);
while (head_) {
TracedBuffer* elem = head_;
g_timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err);
head_ = head_->next_;
delete elem;
elem = next;
}
*head = nullptr;
if (remaining != nullptr) {
timestamps_callback(remaining, nullptr, shutdown_err);
g_timestamps_callback(remaining, nullptr, shutdown_err);
}
tail_ = head_;
}
void grpc_tcp_set_write_timestamps_callback(
void (*fn)(void*, Timestamps*, grpc_error_handle error)) {
timestamps_callback = fn;
g_timestamps_callback = fn;
}
} /* namespace grpc_core */
@ -301,6 +291,6 @@ void grpc_tcp_set_write_timestamps_callback(
(void)fn;
gpr_log(GPR_DEBUG, "Timestamps callback is not enabled for this platform");
}
} /* namespace grpc_core */
} // namespace grpc_core
#endif /* GRPC_LINUX_ERRQUEUE */

@ -25,7 +25,7 @@
#include <grpc/support/time.h>
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/internal_errqueue.h"
#include "src/core/lib/iomgr/port.h"
@ -100,54 +100,72 @@ struct Timestamps {
#endif /* GRPC_LINUX_ERRQUEUE */
};
/** TracedBuffer is a class to keep track of timestamps for a specific buffer in
* the TCP layer. We are only tracking timestamps for Linux kernels and hence
* this class would only be used by Linux platforms. For all other platforms,
* TracedBuffer would be an empty class.
*
* The timestamps collected are according to Timestamps declared
* above.
*
* A TracedBuffer list is kept track of using the head element of the list. If
* the head element of the list is nullptr, then the list is empty.
*/
// TracedBuffer is a class to keep track of timestamps for a specific buffer in
// the TCP layer. We are only tracking timestamps for Linux kernels and hence
// this class would only be used by Linux platforms. For all other platforms,
// TracedBuffer would be an empty class.
// The timestamps collected are according to Timestamps declared above A
// TracedBuffer list is kept track of using the head element of the list. If
// *the head element of the list is nullptr, then the list is empty.
#ifdef GRPC_LINUX_ERRQUEUE
class TracedBuffer {
class TracedBufferList {
public:
/** Use AddNewEntry function instead of using this directly. */
TracedBuffer(uint32_t seq_no, void* arg)
: seq_no_(seq_no), arg_(arg), next_(nullptr) {}
/** Add a new entry in the TracedBuffer list pointed to by head. Also saves
* sendmsg_time with the current timestamp. */
static void AddNewEntry(TracedBuffer** head, uint32_t seq_no, int fd,
void* arg);
/** Processes a received timestamp based on sock_extended_err and
* scm_timestamping structures. It will invoke the timestamps callback if the
* timestamp type is SCM_TSTAMP_ACK. */
static void ProcessTimestamp(TracedBuffer** head,
struct sock_extended_err* serr,
struct cmsghdr* opt_stats,
struct scm_timestamping* tss);
/** Cleans the list by calling the callback for each traced buffer in the list
* with timestamps that it has. */
static void Shutdown(TracedBuffer** head, void* remaining,
grpc_error_handle shutdown_err);
// Add a new entry in the TracedBuffer list pointed to by head. Also saves
// sendmsg_time with the current timestamp.
void AddNewEntry(int32_t seq_no, int fd, void* arg);
// Processes a received timestamp based on sock_extended_err and
// scm_timestamping structures. It will invoke the timestamps callback if the
// timestamp type is SCM_TSTAMP_ACK.
void ProcessTimestamp(struct sock_extended_err* serr,
struct cmsghdr* opt_stats,
struct scm_timestamping* tss);
// The Size() operation is slow and is used only in tests.
int Size() {
MutexLock lock(&mu_);
int size = 0;
TracedBuffer* curr = head_;
while (curr) {
++size;
curr = curr->next_;
}
return size;
}
// Cleans the list by calling the callback for each traced buffer in the list
// with timestamps that it has.
void Shutdown(void* /*remaining*/, absl::Status /*shutdown_err*/);
private:
uint32_t seq_no_; /* The sequence number for the last byte in the buffer */
void* arg_; /* The arg to pass to timestamps_callback */
Timestamps ts_; /* The timestamps corresponding to this buffer */
TracedBuffer* next_; /* The next TracedBuffer in the list */
class TracedBuffer {
public:
TracedBuffer(uint32_t seq_no, void* arg) : seq_no_(seq_no), arg_(arg) {}
private:
friend class TracedBufferList;
TracedBuffer* next_ = nullptr;
uint32_t seq_no_; /* The sequence number for the last byte in the buffer */
void* arg_; /* The arg to pass to timestamps_callback */
Timestamps ts_; /* The timestamps corresponding to this buffer */
};
Mutex mu_;
// TracedBuffers are ordered by sequence number and would need to be processed
// in a FIFO order starting with the smallest sequence number. To enable this,
// they are stored in a singly linked with head and tail pointers which allows
// easy appends and forward iteration operations.
TracedBuffer* head_ = nullptr;
TracedBuffer* tail_ = nullptr;
};
#else /* GRPC_LINUX_ERRQUEUE */
class TracedBuffer {
// TracedBufferList implementation is a no-op for this platform.
class TracedBufferList {
public:
/* Phony shutdown function */
static void Shutdown(TracedBuffer** /*head*/, void* /*remaining*/,
grpc_error_handle /*shutdown_err*/) {}
void AddNewEntry(int32_t /*seq_no*/, int /*fd*/, void* /*arg*/) {}
void ProcessTimestamp(struct sock_extended_err* /*serr*/,
struct cmsghdr* /*opt_stats*/,
struct scm_timestamping* /*tss*/) {}
int Size() { return 0; }
void Shutdown(void* /*remaining*/, absl::Status /*shutdown_err*/) {}
};
#endif /* GRPC_LINUX_ERRQUEUE */

@ -518,8 +518,7 @@ struct grpc_tcp {
grpc_core::MemoryOwner memory_owner;
grpc_core::MemoryAllocator::Reservation self_reservation;
grpc_core::TracedBuffer* tb_head; /* List of traced buffers */
gpr_mu tb_mu; /* Lock for access to list of traced buffers */
grpc_core::TracedBufferList tb_list; /* List of traced buffers */
/* grpc_endpoint_write takes an argument which if non-null means that the
* transport layer wants the TCP layer to collect timestamps for this write.
@ -735,13 +734,9 @@ static void tcp_free(grpc_tcp* tcp) {
grpc_fd_orphan(tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
"tcp_unref_orphan");
grpc_slice_buffer_destroy(&tcp->last_read_buffer);
/* The lock is not really necessary here, since all refs have been released */
gpr_mu_lock(&tcp->tb_mu);
grpc_core::TracedBuffer::Shutdown(&tcp->tb_head, tcp->outgoing_buffer_arg,
GRPC_ERROR_CREATE("endpoint destroyed"));
gpr_mu_unlock(&tcp->tb_mu);
tcp->tb_list.Shutdown(tcp->outgoing_buffer_arg,
GRPC_ERROR_CREATE("endpoint destroyed"));
tcp->outgoing_buffer_arg = nullptr;
gpr_mu_destroy(&tcp->tb_mu);
delete tcp;
}
@ -1278,11 +1273,8 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
*sent_length = length;
/* Only save timestamps if all the bytes were taken by sendmsg. */
if (sending_length == static_cast<size_t>(length)) {
gpr_mu_lock(&tcp->tb_mu);
grpc_core::TracedBuffer::AddNewEntry(
&tcp->tb_head, static_cast<uint32_t>(tcp->bytes_counter + length),
tcp->fd, tcp->outgoing_buffer_arg);
gpr_mu_unlock(&tcp->tb_mu);
tcp->tb_list.AddNewEntry(static_cast<uint32_t>(tcp->bytes_counter + length),
tcp->fd, tcp->outgoing_buffer_arg);
tcp->outgoing_buffer_arg = nullptr;
}
return true;
@ -1376,13 +1368,7 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
gpr_log(GPR_ERROR, "Unexpected control message");
return cmsg;
}
/* The error handling can potentially be done on another thread so we need
* to protect the traced buffer list. A lock free list might be better. Using
* a simple mutex for now. */
gpr_mu_lock(&tcp->tb_mu);
grpc_core::TracedBuffer::ProcessTimestamp(&tcp->tb_head, serr, opt_stats,
tss);
gpr_mu_unlock(&tcp->tb_mu);
tcp->tb_list.ProcessTimestamp(serr, opt_stats, tss);
return next_cmsg;
}
@ -1520,11 +1506,8 @@ static void tcp_handle_error(void* /*arg*/ /* grpc_tcp */,
* release operations needed can be performed on the arg */
void tcp_shutdown_buffer_list(grpc_tcp* tcp) {
if (tcp->outgoing_buffer_arg) {
gpr_mu_lock(&tcp->tb_mu);
grpc_core::TracedBuffer::Shutdown(
&tcp->tb_head, tcp->outgoing_buffer_arg,
GRPC_ERROR_CREATE("TracedBuffer list shutdown"));
gpr_mu_unlock(&tcp->tb_mu);
tcp->tb_list.Shutdown(tcp->outgoing_buffer_arg,
GRPC_ERROR_CREATE("TracedBuffer list shutdown"));
tcp->outgoing_buffer_arg = nullptr;
}
}
@ -2004,8 +1987,6 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
tcp->em_fd = em_fd;
grpc_slice_buffer_init(&tcp->last_read_buffer);
gpr_mu_init(&tcp->tb_mu);
tcp->tb_head = nullptr;
GRPC_CLOSURE_INIT(&tcp->read_done_closure, tcp_handle_read, tcp,
grpc_schedule_on_exec_ctx);
if (grpc_event_engine_run_in_background()) {

@ -43,16 +43,14 @@ static void TestShutdownFlushesListVerifier(void* arg,
TEST(BufferListTest, Testshutdownflusheslist) {
grpc_core::grpc_tcp_set_write_timestamps_callback(
TestShutdownFlushesListVerifier);
grpc_core::TracedBuffer* list = nullptr;
grpc_core::TracedBufferList tb_list;
#define NUM_ELEM 5
gpr_atm verifier_called[NUM_ELEM];
for (auto i = 0; i < NUM_ELEM; i++) {
gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0));
grpc_core::TracedBuffer::AddNewEntry(
&list, i, 0, static_cast<void*>(&verifier_called[i]));
tb_list.AddNewEntry(i, 0, static_cast<void*>(&verifier_called[i]));
}
grpc_core::TracedBuffer::Shutdown(&list, nullptr, absl::OkStatus());
ASSERT_EQ(list, nullptr);
tb_list.Shutdown(nullptr, absl::OkStatus());
for (auto i = 0; i < NUM_ELEM; i++) {
ASSERT_EQ(gpr_atm_acq_load(&verifier_called[i]), static_cast<gpr_atm>(1));
}
@ -82,14 +80,13 @@ TEST(BufferListTest, Testverifiercalledonack) {
tss.ts[0].tv_nsec = 456;
grpc_core::grpc_tcp_set_write_timestamps_callback(
TestVerifierCalledOnAckVerifier);
grpc_core::TracedBuffer* list = nullptr;
grpc_core::TracedBufferList tb_list;
gpr_atm verifier_called;
gpr_atm_rel_store(&verifier_called, static_cast<gpr_atm>(0));
grpc_core::TracedBuffer::AddNewEntry(&list, 213, 0, &verifier_called);
grpc_core::TracedBuffer::ProcessTimestamp(&list, &serr, nullptr, &tss);
tb_list.AddNewEntry(213, 0, &verifier_called);
tb_list.ProcessTimestamp(&serr, nullptr, &tss);
ASSERT_EQ(gpr_atm_acq_load(&verifier_called), static_cast<gpr_atm>(1));
ASSERT_EQ(list, nullptr);
grpc_core::TracedBuffer::Shutdown(&list, nullptr, absl::OkStatus());
tb_list.Shutdown(nullptr, absl::OkStatus());
}
/** Tests that shutdown can be called repeatedly.
@ -103,16 +100,15 @@ TEST(BufferListTest, Testrepeatedshutdown) {
tss.ts[0].tv_nsec = 456;
grpc_core::grpc_tcp_set_write_timestamps_callback(
TestVerifierCalledOnAckVerifier);
grpc_core::TracedBuffer* list = nullptr;
grpc_core::TracedBufferList tb_list;
gpr_atm verifier_called;
gpr_atm_rel_store(&verifier_called, static_cast<gpr_atm>(0));
grpc_core::TracedBuffer::AddNewEntry(&list, 213, 0, &verifier_called);
grpc_core::TracedBuffer::ProcessTimestamp(&list, &serr, nullptr, &tss);
tb_list.AddNewEntry(213, 0, &verifier_called);
tb_list.ProcessTimestamp(&serr, nullptr, &tss);
ASSERT_EQ(gpr_atm_acq_load(&verifier_called), static_cast<gpr_atm>(1));
ASSERT_EQ(list, nullptr);
grpc_core::TracedBuffer::Shutdown(&list, nullptr, absl::OkStatus());
grpc_core::TracedBuffer::Shutdown(&list, nullptr, absl::OkStatus());
grpc_core::TracedBuffer::Shutdown(&list, nullptr, absl::OkStatus());
tb_list.Shutdown(nullptr, absl::OkStatus());
tb_list.Shutdown(nullptr, absl::OkStatus());
tb_list.Shutdown(nullptr, absl::OkStatus());
}
int main(int argc, char** argv) {

Loading…
Cancel
Save