Fork iomgr traced buffer list implementation for posix event engine (#30539)

* Forking iomgr internal_errqueue defines for posix event engine

* fix BUILD file

* start

* update

* sanity

* regenerate_projects

* Automated change: Fix sanity tests

* review comments

* review comments

* Automated change: Fix sanity tests

* fix BUILD

* add no_windows tag

* minor typo

* typo

* review comments

* regenerate projects

* fix BUILD issue

* iwyu fixes

Co-authored-by: Vignesh2208 <Vignesh2208@users.noreply.github.com>
pull/30573/head
Vignesh Babu 3 years ago committed by GitHub
parent 468fe2d835
commit 009dadbb74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 35
      BUILD
  2. 42
      CMakeLists.txt
  3. 18
      build_autogenerated.yaml
  4. 73
      src/core/lib/event_engine/posix_engine/internal_errqueue.cc
  5. 179
      src/core/lib/event_engine/posix_engine/internal_errqueue.h
  6. 286
      src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
  7. 169
      src/core/lib/event_engine/posix_engine/traced_buffer_list.h
  8. 16
      test/core/event_engine/posix/BUILD
  9. 133
      test/core/event_engine/posix/traced_buffer_list_test.cc
  10. 22
      tools/run_tests/generated/tests.json

35
BUILD

@ -2564,6 +2564,41 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "posix_event_engine_internal_errqueue",
srcs = [
"src/core/lib/event_engine/posix_engine/internal_errqueue.cc",
],
hdrs = [
"src/core/lib/event_engine/posix_engine/internal_errqueue.h",
],
deps = [
"gpr",
"iomgr_port",
],
)
grpc_cc_library(
name = "posix_event_engine_traced_buffer_list",
srcs = [
"src/core/lib/event_engine/posix_engine/traced_buffer_list.cc",
],
hdrs = [
"src/core/lib/event_engine/posix_engine/traced_buffer_list.h",
],
external_deps = [
"absl/functional:any_invocable",
"absl/status",
"absl/types:optional",
],
deps = [
"gpr",
"gpr_codegen",
"iomgr_port",
"posix_event_engine_internal_errqueue",
],
)
grpc_cc_library(
name = "event_engine_utils",
srcs = ["src/core/lib/event_engine/utils.cc"],

42
CMakeLists.txt generated

@ -1210,6 +1210,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx tls_security_connector_test)
add_dependencies(buildtests_cxx tls_test)
add_dependencies(buildtests_cxx too_many_pings_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx traced_buffer_list_test)
endif()
add_dependencies(buildtests_cxx transport_security_common_api_test)
add_dependencies(buildtests_cxx transport_security_test)
add_dependencies(buildtests_cxx transport_stream_receiver_test)
@ -18792,6 +18795,45 @@ target_link_libraries(too_many_pings_test
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(traced_buffer_list_test
src/core/lib/event_engine/posix_engine/internal_errqueue.cc
src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
test/core/event_engine/posix/traced_buffer_list_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(traced_buffer_list_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(traced_buffer_list_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)

@ -10342,6 +10342,24 @@ targets:
deps:
- grpc++_test_config
- grpc++_test_util
- name: traced_buffer_list_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/event_engine/posix_engine/internal_errqueue.h
- src/core/lib/event_engine/posix_engine/traced_buffer_list.h
src:
- src/core/lib/event_engine/posix_engine/internal_errqueue.cc
- src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
- test/core/event_engine/posix/traced_buffer_list_test.cc
deps:
- grpc_test_util
platforms:
- linux
- posix
- mac
uses_polling: false
- name: transport_security_common_api_test
gtest: true
build: test

@ -0,0 +1,73 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/posix_engine/internal_errqueue.h"
#include <grpc/support/log.h>
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_TCP
#include <errno.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <string.h>
#include <sys/utsname.h>
#include <cstddef>
namespace grpc_event_engine {
namespace posix_engine {
#ifdef GRPC_LINUX_ERRQUEUE
int GetSocketTcpInfo(struct tcp_info* info, int fd) {
memset(info, 0, sizeof(*info));
info->length = offsetof(tcp_info, length);
return getsockopt(fd, IPPROTO_TCP, TCP_INFO, info, &(info->length));
}
#endif
bool KernelSupportsErrqueue() {
static const bool errqueue_supported = []() {
#ifdef GRPC_LINUX_ERRQUEUE
// Both-compile time and run-time linux kernel versions should be at
// least 4.0.0
struct utsname buffer;
if (uname(&buffer) != 0) {
gpr_log(GPR_ERROR, "uname: %s", strerror(errno));
return false;
}
char* release = buffer.release;
if (release == nullptr) {
return false;
}
if (strtol(release, nullptr, 10) >= 4) {
return true;
} else {
gpr_log(GPR_DEBUG, "ERRQUEUE support not enabled");
}
#endif // GRPC_LINUX_ERRQUEUE
return false;
}();
return errqueue_supported;
}
} // namespace posix_engine
} // namespace grpc_event_engine
#endif // GRPC_POSIX_SOCKET_TCP

@ -0,0 +1,179 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_INTERNAL_ERRQUEUE_H
#define GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_INTERNAL_ERRQUEUE_H
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_TCP
#include <time.h>
#ifdef GRPC_LINUX_ERRQUEUE
#include <linux/errqueue.h> // IWYU pragma: keep
#include <sys/socket.h>
#endif /* GRPC_LINUX_ERRQUEUE */
namespace grpc_event_engine {
namespace posix_engine {
#ifdef GRPC_LINUX_ERRQUEUE
// Redefining scm_timestamping in the same way that <linux/errqueue.h> defines
// it, so that code compiles on systems that don't have it.
struct scm_timestamping {
struct timespec ts[3];
};
// Also redefine timestamp types
// The timestamp type for when the driver passed skb to NIC, or HW.
constexpr int SCM_TSTAMP_SND = 0;
// The timestamp type for when data entered the packet scheduler.
constexpr int SCM_TSTAMP_SCHED = 1;
// The timestamp type for when data acknowledged by peer.
constexpr int SCM_TSTAMP_ACK = 2;
// Control message type containing OPT_STATS
#ifndef SCM_TIMESTAMPING_OPT_STATS
#define SCM_TIMESTAMPING_OPT_STATS 54
#endif
// Redefine required constants from <linux/net_tstamp.h>
constexpr uint32_t SOF_TIMESTAMPING_TX_SOFTWARE = 1u << 1;
constexpr uint32_t SOF_TIMESTAMPING_SOFTWARE = 1u << 4;
constexpr uint32_t SOF_TIMESTAMPING_OPT_ID = 1u << 7;
constexpr uint32_t SOF_TIMESTAMPING_TX_SCHED = 1u << 8;
constexpr uint32_t SOF_TIMESTAMPING_TX_ACK = 1u << 9;
constexpr uint32_t SOF_TIMESTAMPING_OPT_TSONLY = 1u << 11;
constexpr uint32_t SOF_TIMESTAMPING_OPT_STATS = 1u << 12;
constexpr uint32_t kTimestampingSocketOptions =
SOF_TIMESTAMPING_SOFTWARE | SOF_TIMESTAMPING_OPT_ID |
SOF_TIMESTAMPING_OPT_TSONLY | SOF_TIMESTAMPING_OPT_STATS;
constexpr uint32_t kTimestampingRecordingOptions =
SOF_TIMESTAMPING_TX_SCHED | SOF_TIMESTAMPING_TX_SOFTWARE |
SOF_TIMESTAMPING_TX_ACK;
// Netlink attribute types used for TCP opt stats.
enum TCPOptStats {
TCP_NLA_PAD,
TCP_NLA_BUSY, /* Time (usec) busy sending data. */
TCP_NLA_RWND_LIMITED, /* Time (usec) limited by receive window. */
TCP_NLA_SNDBUF_LIMITED, /* Time (usec) limited by send buffer. */
TCP_NLA_DATA_SEGS_OUT, /* Data pkts sent including retransmission. */
TCP_NLA_TOTAL_RETRANS, /* Data pkts retransmitted. */
TCP_NLA_PACING_RATE, /* Pacing rate in Bps. */
TCP_NLA_DELIVERY_RATE, /* Delivery rate in Bps. */
TCP_NLA_SND_CWND, /* Sending congestion window. */
TCP_NLA_REORDERING, /* Reordering metric. */
TCP_NLA_MIN_RTT, /* minimum RTT. */
TCP_NLA_RECUR_RETRANS, /* Recurring retransmits for the current pkt. */
TCP_NLA_DELIVERY_RATE_APP_LMT, /* Delivery rate application limited? */
TCP_NLA_SNDQ_SIZE, /* Data (bytes) pending in send queue */
TCP_NLA_CA_STATE, /* ca_state of socket */
TCP_NLA_SND_SSTHRESH, /* Slow start size threshold */
TCP_NLA_DELIVERED, /* Data pkts delivered incl. out-of-order */
TCP_NLA_DELIVERED_CE, /* Like above but only ones w/ CE marks */
TCP_NLA_BYTES_SENT, /* Data bytes sent including retransmission */
TCP_NLA_BYTES_RETRANS, /* Data bytes retransmitted */
TCP_NLA_DSACK_DUPS, /* DSACK blocks received */
TCP_NLA_REORD_SEEN, /* reordering events seen */
TCP_NLA_SRTT, /* smoothed RTT in usecs */
};
/* tcp_info from from linux/tcp.h */
struct tcp_info {
uint8_t tcpi_state;
uint8_t tcpi_ca_state;
uint8_t tcpi_retransmits;
uint8_t tcpi_probes;
uint8_t tcpi_backoff;
uint8_t tcpi_options;
uint8_t tcpi_snd_wscale : 4, tcpi_rcv_wscale : 4;
uint8_t tcpi_delivery_rate_app_limited : 1;
uint32_t tcpi_rto;
uint32_t tcpi_ato;
uint32_t tcpi_snd_mss;
uint32_t tcpi_rcv_mss;
uint32_t tcpi_unacked;
uint32_t tcpi_sacked;
uint32_t tcpi_lost;
uint32_t tcpi_retrans;
uint32_t tcpi_fackets;
/* Times. */
uint32_t tcpi_last_data_sent;
uint32_t tcpi_last_ack_sent; /* Not remembered, sorry. */
uint32_t tcpi_last_data_recv;
uint32_t tcpi_last_ack_recv;
/* Metrics. */
uint32_t tcpi_pmtu;
uint32_t tcpi_rcv_ssthresh;
uint32_t tcpi_rtt;
uint32_t tcpi_rttvar;
uint32_t tcpi_snd_ssthresh;
uint32_t tcpi_snd_cwnd;
uint32_t tcpi_advmss;
uint32_t tcpi_reordering;
uint32_t tcpi_rcv_rtt;
uint32_t tcpi_rcv_space;
uint32_t tcpi_total_retrans;
uint64_t tcpi_pacing_rate;
uint64_t tcpi_max_pacing_rate;
uint64_t tcpi_bytes_acked; /* RFC4898 tcpEStatsAppHCThruOctetsAcked */
uint64_t tcpi_bytes_received; /* RFC4898 tcpEStatsAppHCThruOctetsReceived */
uint32_t tcpi_segs_out; /* RFC4898 tcpEStatsPerfSegsOut */
uint32_t tcpi_segs_in; /* RFC4898 tcpEStatsPerfSegsIn */
uint32_t tcpi_notsent_bytes;
uint32_t tcpi_min_rtt;
uint32_t tcpi_data_segs_in; /* RFC4898 tcpEStatsDataSegsIn */
uint32_t tcpi_data_segs_out; /* RFC4898 tcpEStatsDataSegsOut */
uint64_t tcpi_delivery_rate;
uint64_t tcpi_busy_time; /* Time (usec) busy sending data */
uint64_t tcpi_rwnd_limited; /* Time (usec) limited by receive window */
uint64_t tcpi_sndbuf_limited; /* Time (usec) limited by send buffer */
uint32_t tcpi_delivered;
uint32_t tcpi_delivered_ce;
uint64_t tcpi_bytes_sent; /* RFC4898 tcpEStatsPerfHCDataOctetsOut */
uint64_t tcpi_bytes_retrans; /* RFC4898 tcpEStatsPerfOctetsRetrans */
uint32_t tcpi_dsack_dups; /* RFC4898 tcpEStatsStackDSACKDups */
uint32_t tcpi_reord_seen; /* reordering events seen */
socklen_t length; /* Length of struct returned by kernel */
};
#ifndef TCP_INFO
#define TCP_INFO 11
#endif
int GetSocketTcpInfo(tcp_info* info, int fd);
#endif /* GRPC_LINUX_ERRQUEUE */
// Returns true if kernel is capable of supporting errqueue and timestamping.
// Currently allowing only linux kernels above 4.0.0
bool KernelSupportsErrqueue();
} // namespace posix_engine
} // namespace grpc_event_engine
#endif /* GRPC_POSIX_SOCKET_TCP */
#endif /* GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_INTERNAL_ERRQUEUE_H */

@ -0,0 +1,286 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/posix_engine/traced_buffer_list.h"
#include <stddef.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <utility>
#include "absl/functional/any_invocable.h"
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_LINUX_ERRQUEUE
#include <linux/errqueue.h> // IWYU pragma: keep
#include <linux/netlink.h>
#include <sys/socket.h> // IWYU pragma: keep
namespace grpc_event_engine {
namespace posix_engine {
namespace {
// Fills gpr_timespec gts based on values from timespec ts.
void FillGprFromTimestamp(gpr_timespec* gts, const struct timespec* ts) {
gts->tv_sec = ts->tv_sec;
gts->tv_nsec = static_cast<int32_t>(ts->tv_nsec);
gts->clock_type = GPR_CLOCK_REALTIME;
}
void DefaultTimestampsCallback(void* /*arg*/, Timestamps* /*ts*/,
absl::Status /*shudown_err*/) {
gpr_log(GPR_DEBUG, "Timestamps callback has not been registered");
}
// The saved callback function that will be invoked when we get all the
// timestamps that we are going to get for a TracedBuffer.
absl::AnyInvocable<void(void*, Timestamps*, absl::Status)>
g_timestamps_callback =
[]() -> absl::AnyInvocable<void(void*, Timestamps*, absl::Status)> {
return DefaultTimestampsCallback;
}();
// Used to extract individual opt stats from cmsg, so as to avoid troubles with
// unaligned reads.
template <typename T>
T ReadUnaligned(const void* ptr) {
T val;
memcpy(&val, ptr, sizeof(val));
return val;
}
// Extracts opt stats from the tcp_info struct \a info to \a metrics
void ExtractOptStatsFromTcpInfo(ConnectionMetrics* metrics,
const tcp_info* info) {
if (info == nullptr) {
return;
}
if (info->length > offsetof(tcp_info, tcpi_sndbuf_limited)) {
metrics->recurring_retrans = info->tcpi_retransmits;
metrics->is_delivery_rate_app_limited =
info->tcpi_delivery_rate_app_limited;
metrics->congestion_window = info->tcpi_snd_cwnd;
metrics->reordering = info->tcpi_reordering;
metrics->packet_retx = info->tcpi_total_retrans;
metrics->pacing_rate = info->tcpi_pacing_rate;
metrics->data_notsent = info->tcpi_notsent_bytes;
if (info->tcpi_min_rtt != UINT32_MAX) {
metrics->min_rtt = info->tcpi_min_rtt;
}
metrics->packet_sent = info->tcpi_data_segs_out;
metrics->delivery_rate = info->tcpi_delivery_rate;
metrics->busy_usec = info->tcpi_busy_time;
metrics->rwnd_limited_usec = info->tcpi_rwnd_limited;
metrics->sndbuf_limited_usec = info->tcpi_sndbuf_limited;
}
if (info->length > offsetof(tcp_info, tcpi_dsack_dups)) {
metrics->data_sent = info->tcpi_bytes_sent;
metrics->data_retx = info->tcpi_bytes_retrans;
metrics->packet_spurious_retx = info->tcpi_dsack_dups;
}
}
// Extracts opt stats from the given control message \a opt_stats to the
// connection metrics \a metrics.
void ExtractOptStatsFromCmsg(ConnectionMetrics* metrics,
const cmsghdr* opt_stats) {
if (opt_stats == nullptr) {
return;
}
const auto* data = CMSG_DATA(opt_stats);
constexpr int64_t cmsg_hdr_len = CMSG_ALIGN(sizeof(struct cmsghdr));
const int64_t len = opt_stats->cmsg_len - cmsg_hdr_len;
int64_t offset = 0;
while (offset < len) {
const auto* attr = reinterpret_cast<const nlattr*>(data + offset);
const void* val = data + offset + NLA_HDRLEN;
switch (attr->nla_type) {
case TCP_NLA_BUSY: {
metrics->busy_usec = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_RWND_LIMITED: {
metrics->rwnd_limited_usec = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_SNDBUF_LIMITED: {
metrics->sndbuf_limited_usec = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_PACING_RATE: {
metrics->pacing_rate = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_DELIVERY_RATE: {
metrics->delivery_rate = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_DELIVERY_RATE_APP_LMT: {
metrics->is_delivery_rate_app_limited = ReadUnaligned<uint8_t>(val);
break;
}
case TCP_NLA_SND_CWND: {
metrics->congestion_window = ReadUnaligned<uint32_t>(val);
break;
}
case TCP_NLA_MIN_RTT: {
metrics->min_rtt = ReadUnaligned<uint32_t>(val);
break;
}
case TCP_NLA_SRTT: {
metrics->srtt = ReadUnaligned<uint32_t>(val);
break;
}
case TCP_NLA_RECUR_RETRANS: {
metrics->recurring_retrans = ReadUnaligned<uint8_t>(val);
break;
}
case TCP_NLA_BYTES_SENT: {
metrics->data_sent = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_DATA_SEGS_OUT: {
metrics->packet_sent = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_TOTAL_RETRANS: {
metrics->packet_retx = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_DELIVERED: {
metrics->packet_delivered = ReadUnaligned<uint32_t>(val);
break;
}
case TCP_NLA_DELIVERED_CE: {
metrics->packet_delivered_ce = ReadUnaligned<uint32_t>(val);
break;
}
case TCP_NLA_BYTES_RETRANS: {
metrics->data_retx = ReadUnaligned<uint64_t>(val);
break;
}
case TCP_NLA_DSACK_DUPS: {
metrics->packet_spurious_retx = ReadUnaligned<uint32_t>(val);
break;
}
case TCP_NLA_REORDERING: {
metrics->reordering = ReadUnaligned<uint32_t>(val);
break;
}
case TCP_NLA_SND_SSTHRESH: {
metrics->snd_ssthresh = ReadUnaligned<uint32_t>(val);
break;
}
}
offset += NLA_ALIGN(attr->nla_len);
}
}
} // namespace.
void TracedBufferList::AddNewEntry(int32_t seq_no, int fd, void* arg) {
buffer_list_.emplace_back(seq_no, arg);
TracedBuffer& new_elem = buffer_list_.back();
// Store the current time as the sendmsg time.
new_elem.ts_.sendmsg_time.time = gpr_now(GPR_CLOCK_REALTIME);
new_elem.ts_.scheduled_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
new_elem.ts_.sent_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
new_elem.ts_.acked_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
if (GetSocketTcpInfo(&new_elem.ts_.info, fd) == 0) {
ExtractOptStatsFromTcpInfo(&new_elem.ts_.sendmsg_time.metrics,
&new_elem.ts_.info);
}
}
void TracedBufferList::ProcessTimestamp(struct sock_extended_err* serr,
struct cmsghdr* opt_stats,
struct scm_timestamping* tss) {
auto it = buffer_list_.begin();
while (it != buffer_list_.end()) {
TracedBuffer& elem = (*it);
// The byte number refers to the sequence number of the last byte which this
// timestamp relates to.
if (serr->ee_data >= elem.seq_no_) {
switch (serr->ee_info) {
case SCM_TSTAMP_SCHED:
FillGprFromTimestamp(&(elem.ts_.scheduled_time.time), &(tss->ts[0]));
ExtractOptStatsFromCmsg(&(elem.ts_.scheduled_time.metrics),
opt_stats);
++it;
break;
case SCM_TSTAMP_SND:
FillGprFromTimestamp(&(elem.ts_.sent_time.time), &(tss->ts[0]));
ExtractOptStatsFromCmsg(&(elem.ts_.sent_time.metrics), opt_stats);
++it;
break;
case SCM_TSTAMP_ACK:
FillGprFromTimestamp(&(elem.ts_.acked_time.time), &(tss->ts[0]));
ExtractOptStatsFromCmsg(&(elem.ts_.acked_time.metrics), opt_stats);
// Got all timestamps. Do the callback and free this TracedBuffer. The
// thing below can be passed by value if we don't want the restriction
// on the lifetime.
g_timestamps_callback(elem.arg_, &(elem.ts_), absl::OkStatus());
it = buffer_list_.erase(it);
break;
default:
abort();
}
} else {
break;
}
}
}
void TracedBufferList::Shutdown(void* remaining, absl::Status shutdown_err) {
while (!buffer_list_.empty()) {
TracedBuffer& elem = buffer_list_.front();
g_timestamps_callback(elem.arg_, &(elem.ts_), shutdown_err);
buffer_list_.pop_front();
}
if (remaining != nullptr) {
g_timestamps_callback(remaining, nullptr, shutdown_err);
}
}
void TcpSetWriteTimestampsCallback(
absl::AnyInvocable<void(void*, Timestamps*, absl::Status)> fn) {
g_timestamps_callback = std::move(fn);
}
} // namespace posix_engine
} // namespace grpc_event_engine
#else /* GRPC_LINUX_ERRQUEUE */
namespace grpc_event_engine {
namespace posix_engine {
void TcpSetWriteTimestampsCallback(
absl::AnyInvocable<void(void*, Timestamps*, absl::Status)> /*fn*/) {
GPR_ASSERT(false && "Timestamps callback is not enabled for this platform");
}
} // namespace posix_engine
} // namespace grpc_event_engine
#endif /* GRPC_LINUX_ERRQUEUE */

@ -0,0 +1,169 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TRACED_BUFFER_LIST_H
#define GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TRACED_BUFFER_LIST_H
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <list>
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include <grpc/impl/codegen/gpr_types.h>
#include "src/core/lib/event_engine/posix_engine/internal_errqueue.h"
#include "src/core/lib/iomgr/port.h"
// #undef GRPC_LINUX_ERRQUEUE
namespace grpc_event_engine {
namespace posix_engine {
struct ConnectionMetrics { /* Delivery rate in Bytes/s. */
absl::optional<uint64_t> delivery_rate;
/* If the delivery rate is limited by the application, this is set to true. */
absl::optional<bool> is_delivery_rate_app_limited;
/* Total packets retransmitted. */
absl::optional<uint32_t> packet_retx;
/* Total packets retransmitted spuriously. This metric is smaller than or
equal to packet_retx. */
absl::optional<uint32_t> packet_spurious_retx;
/* Total packets sent. */
absl::optional<uint32_t> packet_sent;
/* Total packets delivered. */
absl::optional<uint32_t> packet_delivered;
/* Total packets delivered with ECE marked. This metric is smaller than or
equal to packet_delivered. */
absl::optional<uint32_t> packet_delivered_ce;
/* Total bytes lost so far. */
absl::optional<uint64_t> data_retx;
/* Total bytes sent so far. */
absl::optional<uint64_t> data_sent;
/* Total bytes in write queue but not sent. */
absl::optional<uint64_t> data_notsent;
/* Pacing rate of the connection in Bps */
absl::optional<uint64_t> pacing_rate;
/* Minimum RTT observed in usec. */
absl::optional<uint32_t> min_rtt;
/* Smoothed RTT in usec */
absl::optional<uint32_t> srtt;
/* Send congestion window. */
absl::optional<uint32_t> congestion_window;
/* Slow start threshold in packets. */
absl::optional<uint32_t> snd_ssthresh;
/* Maximum degree of reordering (i.e., maximum number of packets reodered)
on the connection. */
absl::optional<uint32_t> reordering;
/* Represents the number of recurring retransmissions of the first sequence
that is not acknowledged yet. */
absl::optional<uint8_t> recurring_retrans;
/* The cumulative time (in usec) that the transport protocol was busy
sending data. */
absl::optional<uint64_t> busy_usec;
/* The cumulative time (in usec) that the transport protocol was limited by
the receive window size. */
absl::optional<uint64_t> rwnd_limited_usec;
/* The cumulative time (in usec) that the transport protocol was limited by
the send buffer size. */
absl::optional<uint64_t> sndbuf_limited_usec;
};
struct BufferTimestamp {
gpr_timespec time;
ConnectionMetrics metrics; /* Metrics collected with this timestamp */
};
struct Timestamps {
BufferTimestamp sendmsg_time;
BufferTimestamp scheduled_time;
BufferTimestamp sent_time;
BufferTimestamp acked_time;
uint32_t byte_offset; /* byte offset relative to the start of the RPC */
#ifdef GRPC_LINUX_ERRQUEUE
tcp_info info; /* tcp_info collected on sendmsg */
#endif /* GRPC_LINUX_ERRQUEUE */
};
// TracedBuffer is a class to keep track of timestamps for a specific buffer in
// the TCP layer. We are only tracking timestamps for Linux kernels and hence
// this class would only be used by Linux platforms. For all other platforms,
// TracedBuffer would be an empty class.
// The timestamps collected are according to Timestamps declared above A
// TracedBuffer list is kept track of using the head element of the list. If
// *the head element of the list is nullptr, then the list is empty.
#ifdef GRPC_LINUX_ERRQUEUE
class TracedBufferList {
public:
// Add a new entry in the TracedBuffer list pointed to by head. Also saves
// sendmsg_time with the current timestamp.
void AddNewEntry(int32_t seq_no, int fd, void* arg);
// Processes a received timestamp based on sock_extended_err and
// scm_timestamping structures. It will invoke the timestamps callback if the
// timestamp type is SCM_TSTAMP_ACK.
void ProcessTimestamp(struct sock_extended_err* serr,
struct cmsghdr* opt_stats,
struct scm_timestamping* tss);
int Size() { return buffer_list_.size(); }
// Cleans the list by calling the callback for each traced buffer in the list
// with timestamps that it has.
void Shutdown(void* /*remaining*/, absl::Status /*shutdown_err*/);
private:
class TracedBuffer {
public:
TracedBuffer(uint32_t seq_no, void* arg) : seq_no_(seq_no), arg_(arg) {}
private:
friend class TracedBufferList;
uint32_t seq_no_; /* The sequence number for the last byte in the buffer */
void* arg_; /* The arg to pass to timestamps_callback */
Timestamps ts_; /* The timestamps corresponding to this buffer */
};
// TracedBuffers are ordered by sequence number and would need to be processed
// in a FIFO order starting with the smallest sequence number. To enable this,
// they are stored in a std::list which allows easy appends and forward
// iteration operations.
std::list<TracedBuffer> buffer_list_;
};
#else /* GRPC_LINUX_ERRQUEUE */
// TracedBufferList implementation is a no-op for this platform.
class TracedBufferList {
public:
void AddNewEntry(int32_t /*seq_no*/, int /*fd*/, void* /*arg*/) {}
void ProcessTimestamp(struct sock_extended_err* /*serr*/,
struct cmsghdr* /*opt_stats*/,
struct scm_timestamping* /*tss*/) {}
int Size() { return 0; }
void Shutdown(void* /*remaining*/, absl::Status /*shutdown_err*/) {}
};
#endif /* GRPC_LINUX_ERRQUEUE */
// Sets the callback function to call when timestamps for a write are collected.
// This is expected to be called atmost once.
void TcpSetWriteTimestampsCallback(
absl::AnyInvocable<void(void*, Timestamps*, absl::Status)>);
} // namespace posix_engine
} // namespace grpc_event_engine
#endif /* GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TRACED_BUFFER_LIST_H */

@ -97,3 +97,19 @@ grpc_cc_test(
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "traced_buffer_list_test",
srcs = ["traced_buffer_list_test.cc"],
external_deps = ["gtest"],
language = "C++",
tags = [
"no_windows",
],
uses_event_engine = False,
uses_polling = False,
deps = [
"//:posix_event_engine_traced_buffer_list",
"//test/core/util:grpc_test_util",
],
)

@ -0,0 +1,133 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/event_engine/posix_engine/traced_buffer_list.h"
#include <atomic>
#include <gtest/gtest.h>
#include <grpc/grpc.h>
#include "src/core/lib/iomgr/port.h"
#include "test/core/util/test_config.h"
#ifdef GRPC_LINUX_ERRQUEUE
#define NUM_ELEM 5
namespace grpc_event_engine {
namespace posix_engine {
namespace {
void TestShutdownFlushesListVerifier(void* arg, Timestamps* /*ts*/,
absl::Status status) {
ASSERT_TRUE(status.ok());
ASSERT_NE(arg, nullptr);
int* done = reinterpret_cast<int*>(arg);
*done = 1;
}
void TestVerifierCalledOnAckVerifier(void* arg, Timestamps* ts,
absl::Status status) {
ASSERT_TRUE(status.ok());
ASSERT_NE(arg, nullptr);
ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME);
ASSERT_EQ(ts->acked_time.time.tv_sec, 123);
ASSERT_EQ(ts->acked_time.time.tv_nsec, 456);
ASSERT_GT(ts->info.length, 0);
int* done = reinterpret_cast<int*>(arg);
*done = 1;
}
} // namespace
// Tests that all TracedBuffer elements in the list are flushed out on shutdown.
// Also tests that arg is passed correctly.
TEST(BufferListTest, TestShutdownFlushesList) {
TcpSetWriteTimestampsCallback(TestShutdownFlushesListVerifier);
TracedBufferList traced_buffers;
int verifier_called[NUM_ELEM];
for (auto i = 0; i < NUM_ELEM; i++) {
verifier_called[i] = 0;
traced_buffers.AddNewEntry(i, 0, static_cast<void*>(&verifier_called[i]));
}
traced_buffers.Shutdown(nullptr, absl::OkStatus());
for (auto i = 0; i < NUM_ELEM; i++) {
ASSERT_EQ(verifier_called[i], 1);
}
ASSERT_TRUE(traced_buffers.Size() == 0);
}
// Tests that the timestamp verifier is called on an ACK timestamp.
TEST(BufferListTest, TestVerifierCalledOnAck) {
struct sock_extended_err serr;
serr.ee_data = 213;
serr.ee_info = SCM_TSTAMP_ACK;
struct scm_timestamping tss;
tss.ts[0].tv_sec = 123;
tss.ts[0].tv_nsec = 456;
TcpSetWriteTimestampsCallback(TestVerifierCalledOnAckVerifier);
TracedBufferList traced_buffers;
int verifier_called = 0;
traced_buffers.AddNewEntry(213, 0, &verifier_called);
traced_buffers.ProcessTimestamp(&serr, nullptr, &tss);
ASSERT_EQ(verifier_called, 1);
ASSERT_TRUE(traced_buffers.Size() == 0);
traced_buffers.Shutdown(nullptr, absl::OkStatus());
ASSERT_TRUE(traced_buffers.Size() == 0);
}
// Tests that ProcessTimestamp called after Shutdown does nothing.
TEST(BufferListTest, TestProcessTimestampAfterShutdown) {
struct sock_extended_err serr;
serr.ee_data = 213;
serr.ee_info = SCM_TSTAMP_ACK;
struct scm_timestamping tss;
tss.ts[0].tv_sec = 123;
tss.ts[0].tv_nsec = 456;
TcpSetWriteTimestampsCallback(TestShutdownFlushesListVerifier);
TracedBufferList traced_buffers;
int verifier_called = 0;
traced_buffers.AddNewEntry(213, 0, &verifier_called);
ASSERT_TRUE(traced_buffers.Size() == 1);
traced_buffers.Shutdown(nullptr, absl::OkStatus());
ASSERT_TRUE(traced_buffers.Size() == 0);
// Check that the callback was executed after first Shutdown.
ASSERT_EQ(verifier_called, 1);
verifier_called = 0;
traced_buffers.Shutdown(nullptr, absl::OkStatus());
ASSERT_TRUE(traced_buffers.Size() == 0);
// Second Shutdown should not execute the callback.
ASSERT_EQ(verifier_called, 0);
traced_buffers.ProcessTimestamp(&serr, nullptr, &tss);
// A ProcessTimestamp after Shutdown should not execute the callback.
ASSERT_EQ(verifier_called, 0);
}
} // namespace posix_engine
} // namespace grpc_event_engine
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else /* GRPC_LINUX_ERRQUEUE */
int main(int /*argc*/, char** /*argv*/) { return 0; }
#endif /* GRPC_LINUX_ERRQUEUE */

@ -7487,6 +7487,28 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "traced_buffer_list_test",
"platforms": [
"linux",
"mac",
"posix"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save