Use getsockopt to get the tcp_info struct with sendmsg timestamp

pull/17757/head
Yash Tibrewal 6 years ago
parent 1ec65a2c9b
commit 862faf55ba
  1. 90
      src/core/lib/iomgr/buffer_list.cc
  2. 4
      src/core/lib/iomgr/buffer_list.h
  3. 67
      src/core/lib/iomgr/internal_errqueue.h
  4. 14
      src/core/lib/iomgr/tcp_posix.cc
  5. 4
      test/core/iomgr/buffer_list_test.cc

@ -30,27 +30,6 @@
#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/memory.h"
namespace grpc_core { namespace grpc_core {
void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no,
void* arg) {
GPR_DEBUG_ASSERT(head != nullptr);
TracedBuffer* new_elem = New<TracedBuffer>(seq_no, arg);
/* Store the current time as the sendmsg time. */
new_elem->ts_.sendmsg_time.time = gpr_now(GPR_CLOCK_REALTIME);
new_elem->ts_.scheduled_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
new_elem->ts_.sent_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
new_elem->ts_.acked_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
if (*head == nullptr) {
*head = new_elem;
return;
}
/* Append at the end. */
TracedBuffer* ptr = *head;
while (ptr->next_ != nullptr) {
ptr = ptr->next_;
}
ptr->next_ = new_elem;
}
namespace { namespace {
/** Fills gpr_timespec gts based on values from timespec ts */ /** Fills gpr_timespec gts based on values from timespec ts */
void fill_gpr_from_timestamp(gpr_timespec* gts, const struct timespec* ts) { void fill_gpr_from_timestamp(gpr_timespec* gts, const struct timespec* ts) {
@ -79,9 +58,41 @@ T read_unaligned(const void* ptr) {
return val; return val;
} }
/** Adds opt stats statistics from the given control message to the connection /* Extracts opt stats from the tcp_info struct \a info to \a metrics */
* metrics. */ void extract_opt_stats_from_tcp_info(ConnectionMetrics* metrics,
void ExtractOptStats(ConnectionMetrics* metrics, const cmsghdr* opt_stats) { const grpc_core::tcp_info* info) {
if (info == nullptr) {
return;
}
if (info->length > offsetof(grpc_core::tcp_info, tcpi_sndbuf_limited)) {
metrics->recurring_retrans.set(info->tcpi_retransmits);
metrics->is_delivery_rate_app_limited =
info->tcpi_delivery_rate_app_limited;
metrics->congestion_window.set(info->tcpi_snd_cwnd);
metrics->reordering.set(info->tcpi_reordering);
metrics->packet_retx.set(info->tcpi_total_retrans);
metrics->pacing_rate.set(info->tcpi_pacing_rate);
metrics->data_notsent.set(info->tcpi_notsent_bytes);
if (info->tcpi_min_rtt != UINT32_MAX) {
metrics->min_rtt.set(info->tcpi_min_rtt);
}
metrics->packet_sent.set(info->tcpi_data_segs_out);
metrics->delivery_rate.set(info->tcpi_delivery_rate);
metrics->busy_usec.set(info->tcpi_busy_time);
metrics->rwnd_limited_usec.set(info->tcpi_rwnd_limited);
metrics->sndbuf_limited_usec.set(info->tcpi_sndbuf_limited);
}
if (info->length > offsetof(grpc_core::tcp_info, tcpi_dsack_dups)) {
metrics->data_sent.set(info->tcpi_bytes_sent);
metrics->data_retx.set(info->tcpi_bytes_retrans);
metrics->packet_spurious_retx.set(info->tcpi_dsack_dups);
}
}
/** Extracts opt stats from the given control message \a opt_stats to the
* connection metrics \a metrics */
void extract_opt_stats_from_cmsg(ConnectionMetrics* metrics,
const cmsghdr* opt_stats) {
if (opt_stats == nullptr) { if (opt_stats == nullptr) {
return; return;
} }
@ -176,6 +187,28 @@ void ExtractOptStats(ConnectionMetrics* metrics, const cmsghdr* opt_stats) {
} }
} /* namespace */ } /* namespace */
void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no,
const grpc_core::tcp_info* info, void* arg) {
GPR_DEBUG_ASSERT(head != nullptr);
TracedBuffer* new_elem = New<TracedBuffer>(seq_no, arg);
/* Store the current time as the sendmsg time. */
new_elem->ts_.sendmsg_time.time = gpr_now(GPR_CLOCK_REALTIME);
new_elem->ts_.scheduled_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
new_elem->ts_.sent_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
new_elem->ts_.acked_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
extract_opt_stats_from_tcp_info(&new_elem->ts_.sendmsg_time.metrics, info);
if (*head == nullptr) {
*head = new_elem;
return;
}
/* Append at the end. */
TracedBuffer* ptr = *head;
while (ptr->next_ != nullptr) {
ptr = ptr->next_;
}
ptr->next_ = new_elem;
}
void TracedBuffer::ProcessTimestamp(TracedBuffer** head, void TracedBuffer::ProcessTimestamp(TracedBuffer** head,
struct sock_extended_err* serr, struct sock_extended_err* serr,
struct cmsghdr* opt_stats, struct cmsghdr* opt_stats,
@ -191,17 +224,20 @@ void TracedBuffer::ProcessTimestamp(TracedBuffer** head,
case SCM_TSTAMP_SCHED: case SCM_TSTAMP_SCHED:
fill_gpr_from_timestamp(&(elem->ts_.scheduled_time.time), fill_gpr_from_timestamp(&(elem->ts_.scheduled_time.time),
&(tss->ts[0])); &(tss->ts[0]));
ExtractOptStats(&(elem->ts_.scheduled_time.metrics), opt_stats); extract_opt_stats_from_cmsg(&(elem->ts_.scheduled_time.metrics),
opt_stats);
elem = elem->next_; elem = elem->next_;
break; break;
case SCM_TSTAMP_SND: case SCM_TSTAMP_SND:
fill_gpr_from_timestamp(&(elem->ts_.sent_time.time), &(tss->ts[0])); fill_gpr_from_timestamp(&(elem->ts_.sent_time.time), &(tss->ts[0]));
ExtractOptStats(&(elem->ts_.sent_time.metrics), opt_stats); extract_opt_stats_from_cmsg(&(elem->ts_.sent_time.metrics),
opt_stats);
elem = elem->next_; elem = elem->next_;
break; break;
case SCM_TSTAMP_ACK: case SCM_TSTAMP_ACK:
fill_gpr_from_timestamp(&(elem->ts_.acked_time.time), &(tss->ts[0])); fill_gpr_from_timestamp(&(elem->ts_.acked_time.time), &(tss->ts[0]));
ExtractOptStats(&(elem->ts_.acked_time.metrics), opt_stats); extract_opt_stats_from_cmsg(&(elem->ts_.acked_time.metrics),
opt_stats);
/* Got all timestamps. Do the callback and free this TracedBuffer. /* Got all timestamps. Do the callback and free this TracedBuffer.
* The thing below can be passed by value if we don't want the * The thing below can be passed by value if we don't want the
* restriction on the lifetime. */ * restriction on the lifetime. */

@ -71,6 +71,8 @@ struct ConnectionMetrics {
Optional<uint64_t> data_retx; Optional<uint64_t> data_retx;
/* Total bytes sent so far. */ /* Total bytes sent so far. */
Optional<uint64_t> data_sent; Optional<uint64_t> data_sent;
/* Total bytes in write queue but not sent. */
Optional<uint64_t> data_notsent;
/* Pacing rate of the connection in Bps */ /* Pacing rate of the connection in Bps */
Optional<uint64_t> pacing_rate; Optional<uint64_t> pacing_rate;
/* Minimum RTT observed in usec. */ /* Minimum RTT observed in usec. */
@ -129,7 +131,7 @@ class TracedBuffer {
/** Add a new entry in the TracedBuffer list pointed to by head. Also saves /** Add a new entry in the TracedBuffer list pointed to by head. Also saves
* sendmsg_time with the current timestamp. */ * sendmsg_time with the current timestamp. */
static void AddNewEntry(grpc_core::TracedBuffer** head, uint32_t seq_no, static void AddNewEntry(grpc_core::TracedBuffer** head, uint32_t seq_no,
void* arg); const grpc_core::tcp_info* info, void* arg);
/** Processes a received timestamp based on sock_extended_err and /** Processes a received timestamp based on sock_extended_err and
* scm_timestamping structures. It will invoke the timestamps callback if the * scm_timestamping structures. It will invoke the timestamps callback if the

@ -105,6 +105,73 @@ enum TCPOptStats {
TCP_NLA_REORD_SEEN, /* reordering events seen */ TCP_NLA_REORD_SEEN, /* reordering events seen */
TCP_NLA_SRTT, /* smoothed RTT in usecs */ TCP_NLA_SRTT, /* smoothed RTT in usecs */
}; };
/* tcp_info from from linux/tcp.h */
struct tcp_info {
uint8_t tcpi_state;
uint8_t tcpi_ca_state;
uint8_t tcpi_retransmits;
uint8_t tcpi_probes;
uint8_t tcpi_backoff;
uint8_t tcpi_options;
uint8_t tcpi_snd_wscale : 4, tcpi_rcv_wscale : 4;
uint8_t tcpi_delivery_rate_app_limited : 1;
uint32_t tcpi_rto;
uint32_t tcpi_ato;
uint32_t tcpi_snd_mss;
uint32_t tcpi_rcv_mss;
uint32_t tcpi_unacked;
uint32_t tcpi_sacked;
uint32_t tcpi_lost;
uint32_t tcpi_retrans;
uint32_t tcpi_fackets;
/* Times. */
uint32_t tcpi_last_data_sent;
uint32_t tcpi_last_ack_sent; /* Not remembered, sorry. */
uint32_t tcpi_last_data_recv;
uint32_t tcpi_last_ack_recv;
/* Metrics. */
uint32_t tcpi_pmtu;
uint32_t tcpi_rcv_ssthresh;
uint32_t tcpi_rtt;
uint32_t tcpi_rttvar;
uint32_t tcpi_snd_ssthresh;
uint32_t tcpi_snd_cwnd;
uint32_t tcpi_advmss;
uint32_t tcpi_reordering;
uint32_t tcpi_rcv_rtt;
uint32_t tcpi_rcv_space;
uint32_t tcpi_total_retrans;
uint64_t tcpi_pacing_rate;
uint64_t tcpi_max_pacing_rate;
uint64_t tcpi_bytes_acked; /* RFC4898 tcpEStatsAppHCThruOctetsAcked */
uint64_t tcpi_bytes_received; /* RFC4898 tcpEStatsAppHCThruOctetsReceived */
uint32_t tcpi_segs_out; /* RFC4898 tcpEStatsPerfSegsOut */
uint32_t tcpi_segs_in; /* RFC4898 tcpEStatsPerfSegsIn */
uint32_t tcpi_notsent_bytes;
uint32_t tcpi_min_rtt;
uint32_t tcpi_data_segs_in; /* RFC4898 tcpEStatsDataSegsIn */
uint32_t tcpi_data_segs_out; /* RFC4898 tcpEStatsDataSegsOut */
uint64_t tcpi_delivery_rate;
uint64_t tcpi_busy_time; /* Time (usec) busy sending data */
uint64_t tcpi_rwnd_limited; /* Time (usec) limited by receive window */
uint64_t tcpi_sndbuf_limited; /* Time (usec) limited by send buffer */
uint32_t tcpi_delivered;
uint32_t tcpi_delivered_ce;
uint64_t tcpi_bytes_sent; /* RFC4898 tcpEStatsPerfHCDataOctetsOut */
uint64_t tcpi_bytes_retrans; /* RFC4898 tcpEStatsPerfOctetsRetrans */
uint32_t tcpi_dsack_dups; /* RFC4898 tcpEStatsStackDSACKDups */
uint32_t tcpi_reord_seen; /* reordering events seen */
socklen_t length; /* Length of struct returned by kernel */
};
#ifndef TCP_INFO
#define TCP_INFO 11
#endif
#endif /* GRPC_LINUX_ERRQUEUE */ #endif /* GRPC_LINUX_ERRQUEUE */
/* Returns true if kernel is capable of supporting errqueue and timestamping. /* Returns true if kernel is capable of supporting errqueue and timestamping.

@ -593,6 +593,12 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error); static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error);
#ifdef GRPC_LINUX_ERRQUEUE #ifdef GRPC_LINUX_ERRQUEUE
static int get_socket_tcp_info(grpc_core::tcp_info* info, int fd) {
info->length = sizeof(*info) - sizeof(socklen_t);
memset(info, 0, sizeof(*info));
return getsockopt(fd, IPPROTO_TCP, TCP_INFO, info, &(info->length));
}
static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
size_t sending_length, size_t sending_length,
ssize_t* sent_length) { ssize_t* sent_length) {
@ -629,9 +635,15 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
/* Only save timestamps if all the bytes were taken by sendmsg. */ /* Only save timestamps if all the bytes were taken by sendmsg. */
if (sending_length == static_cast<size_t>(length)) { if (sending_length == static_cast<size_t>(length)) {
gpr_mu_lock(&tcp->tb_mu); gpr_mu_lock(&tcp->tb_mu);
grpc_core::tcp_info info;
auto* info_ptr = &info;
if (get_socket_tcp_info(info_ptr, tcp->fd) != 0) {
/* Failed to get tcp_info */
info_ptr = nullptr;
}
grpc_core::TracedBuffer::AddNewEntry( grpc_core::TracedBuffer::AddNewEntry(
&tcp->tb_head, static_cast<uint32_t>(tcp->bytes_counter + length), &tcp->tb_head, static_cast<uint32_t>(tcp->bytes_counter + length),
tcp->outgoing_buffer_arg); info_ptr, tcp->outgoing_buffer_arg);
gpr_mu_unlock(&tcp->tb_mu); gpr_mu_unlock(&tcp->tb_mu);
tcp->outgoing_buffer_arg = nullptr; tcp->outgoing_buffer_arg = nullptr;
} }

@ -48,7 +48,7 @@ static void TestShutdownFlushesList() {
for (auto i = 0; i < NUM_ELEM; i++) { for (auto i = 0; i < NUM_ELEM; i++) {
gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0)); gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0));
grpc_core::TracedBuffer::AddNewEntry( grpc_core::TracedBuffer::AddNewEntry(
&list, i, static_cast<void*>(&verifier_called[i])); &list, i, nullptr, static_cast<void*>(&verifier_called[i]));
} }
grpc_core::TracedBuffer::Shutdown(&list, nullptr, GRPC_ERROR_NONE); grpc_core::TracedBuffer::Shutdown(&list, nullptr, GRPC_ERROR_NONE);
GPR_ASSERT(list == nullptr); GPR_ASSERT(list == nullptr);
@ -84,7 +84,7 @@ static void TestVerifierCalledOnAck() {
grpc_core::TracedBuffer* list = nullptr; grpc_core::TracedBuffer* list = nullptr;
gpr_atm verifier_called; gpr_atm verifier_called;
gpr_atm_rel_store(&verifier_called, static_cast<gpr_atm>(0)); gpr_atm_rel_store(&verifier_called, static_cast<gpr_atm>(0));
grpc_core::TracedBuffer::AddNewEntry(&list, 213, &verifier_called); grpc_core::TracedBuffer::AddNewEntry(&list, 213, nullptr, &verifier_called);
grpc_core::TracedBuffer::ProcessTimestamp(&list, &serr, nullptr, &tss); grpc_core::TracedBuffer::ProcessTimestamp(&list, &serr, nullptr, &tss);
GPR_ASSERT(gpr_atm_acq_load(&verifier_called) == static_cast<gpr_atm>(1)); GPR_ASSERT(gpr_atm_acq_load(&verifier_called) == static_cast<gpr_atm>(1));
GPR_ASSERT(list == nullptr); GPR_ASSERT(list == nullptr);

Loading…
Cancel
Save