From 009dadbb743f4575b60997fb723919eba0bcb00a Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Fri, 12 Aug 2022 11:51:16 -0700 Subject: [PATCH] Fork iomgr traced buffer list implementation for posix event engine (#30539) * Forking iomgr internal_errqueue defines for posix event engine * fix BUILD file * start * update * sanity * regenerate_projects * Automated change: Fix sanity tests * review comments * review comments * Automated change: Fix sanity tests * fix BUILD * add no_windows tag * minor typo * typo * review comments * regenerate projects * fix BUILD issue * iwyu fixes Co-authored-by: Vignesh2208 --- BUILD | 35 +++ CMakeLists.txt | 42 +++ build_autogenerated.yaml | 18 ++ .../posix_engine/internal_errqueue.cc | 73 +++++ .../posix_engine/internal_errqueue.h | 179 +++++++++++ .../posix_engine/traced_buffer_list.cc | 286 ++++++++++++++++++ .../posix_engine/traced_buffer_list.h | 169 +++++++++++ test/core/event_engine/posix/BUILD | 16 + .../posix/traced_buffer_list_test.cc | 133 ++++++++ tools/run_tests/generated/tests.json | 22 ++ 10 files changed, 973 insertions(+) create mode 100644 src/core/lib/event_engine/posix_engine/internal_errqueue.cc create mode 100644 src/core/lib/event_engine/posix_engine/internal_errqueue.h create mode 100644 src/core/lib/event_engine/posix_engine/traced_buffer_list.cc create mode 100644 src/core/lib/event_engine/posix_engine/traced_buffer_list.h create mode 100644 test/core/event_engine/posix/traced_buffer_list_test.cc diff --git a/BUILD b/BUILD index 025cd7eba32..ad6c1ef0ede 100644 --- a/BUILD +++ b/BUILD @@ -2564,6 +2564,41 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "posix_event_engine_internal_errqueue", + srcs = [ + "src/core/lib/event_engine/posix_engine/internal_errqueue.cc", + ], + hdrs = [ + "src/core/lib/event_engine/posix_engine/internal_errqueue.h", + ], + deps = [ + "gpr", + "iomgr_port", + ], +) + +grpc_cc_library( + name = "posix_event_engine_traced_buffer_list", + srcs = [ + "src/core/lib/event_engine/posix_engine/traced_buffer_list.cc", + ], + hdrs = [ + "src/core/lib/event_engine/posix_engine/traced_buffer_list.h", + ], + external_deps = [ + "absl/functional:any_invocable", + "absl/status", + "absl/types:optional", + ], + deps = [ + "gpr", + "gpr_codegen", + "iomgr_port", + "posix_event_engine_internal_errqueue", + ], +) + grpc_cc_library( name = "event_engine_utils", srcs = ["src/core/lib/event_engine/utils.cc"], diff --git a/CMakeLists.txt b/CMakeLists.txt index ad3ea34cdd3..420c836caaa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1210,6 +1210,9 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx tls_security_connector_test) add_dependencies(buildtests_cxx tls_test) add_dependencies(buildtests_cxx too_many_pings_test) + if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + add_dependencies(buildtests_cxx traced_buffer_list_test) + endif() add_dependencies(buildtests_cxx transport_security_common_api_test) add_dependencies(buildtests_cxx transport_security_test) add_dependencies(buildtests_cxx transport_stream_receiver_test) @@ -18792,6 +18795,45 @@ target_link_libraries(too_many_pings_test ) +endif() +if(gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + + add_executable(traced_buffer_list_test + src/core/lib/event_engine/posix_engine/internal_errqueue.cc + src/core/lib/event_engine/posix_engine/traced_buffer_list.cc + test/core/event_engine/posix/traced_buffer_list_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc + ) + + target_include_directories(traced_buffer_list_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} + ) + + target_link_libraries(traced_buffer_list_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + ) + + +endif() endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 27260339f51..3c66f031c40 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -10342,6 +10342,24 @@ targets: deps: - grpc++_test_config - grpc++_test_util +- name: traced_buffer_list_test + gtest: true + build: test + language: c++ + headers: + - src/core/lib/event_engine/posix_engine/internal_errqueue.h + - src/core/lib/event_engine/posix_engine/traced_buffer_list.h + src: + - src/core/lib/event_engine/posix_engine/internal_errqueue.cc + - src/core/lib/event_engine/posix_engine/traced_buffer_list.cc + - test/core/event_engine/posix/traced_buffer_list_test.cc + deps: + - grpc_test_util + platforms: + - linux + - posix + - mac + uses_polling: false - name: transport_security_common_api_test gtest: true build: test diff --git a/src/core/lib/event_engine/posix_engine/internal_errqueue.cc b/src/core/lib/event_engine/posix_engine/internal_errqueue.cc new file mode 100644 index 00000000000..348e682b7e2 --- /dev/null +++ b/src/core/lib/event_engine/posix_engine/internal_errqueue.cc @@ -0,0 +1,73 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/event_engine/posix_engine/internal_errqueue.h" + +#include + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_POSIX_SOCKET_TCP + +#include +#include +#include +#include +#include + +#include + +namespace grpc_event_engine { +namespace posix_engine { + +#ifdef GRPC_LINUX_ERRQUEUE +int GetSocketTcpInfo(struct tcp_info* info, int fd) { + memset(info, 0, sizeof(*info)); + info->length = offsetof(tcp_info, length); + return getsockopt(fd, IPPROTO_TCP, TCP_INFO, info, &(info->length)); +} +#endif + +bool KernelSupportsErrqueue() { + static const bool errqueue_supported = []() { +#ifdef GRPC_LINUX_ERRQUEUE + // Both-compile time and run-time linux kernel versions should be at + // least 4.0.0 + struct utsname buffer; + if (uname(&buffer) != 0) { + gpr_log(GPR_ERROR, "uname: %s", strerror(errno)); + return false; + } + char* release = buffer.release; + if (release == nullptr) { + return false; + } + + if (strtol(release, nullptr, 10) >= 4) { + return true; + } else { + gpr_log(GPR_DEBUG, "ERRQUEUE support not enabled"); + } +#endif // GRPC_LINUX_ERRQUEUE + return false; + }(); + return errqueue_supported; +} + +} // namespace posix_engine +} // namespace grpc_event_engine + +#endif // GRPC_POSIX_SOCKET_TCP diff --git a/src/core/lib/event_engine/posix_engine/internal_errqueue.h b/src/core/lib/event_engine/posix_engine/internal_errqueue.h new file mode 100644 index 00000000000..732e217e96d --- /dev/null +++ b/src/core/lib/event_engine/posix_engine/internal_errqueue.h @@ -0,0 +1,179 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_INTERNAL_ERRQUEUE_H +#define GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_INTERNAL_ERRQUEUE_H + +#include + +#include + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_POSIX_SOCKET_TCP + +#include + +#ifdef GRPC_LINUX_ERRQUEUE +#include // IWYU pragma: keep +#include +#endif /* GRPC_LINUX_ERRQUEUE */ + +namespace grpc_event_engine { +namespace posix_engine { + +#ifdef GRPC_LINUX_ERRQUEUE + +// Redefining scm_timestamping in the same way that defines +// it, so that code compiles on systems that don't have it. +struct scm_timestamping { + struct timespec ts[3]; +}; +// Also redefine timestamp types +// The timestamp type for when the driver passed skb to NIC, or HW. +constexpr int SCM_TSTAMP_SND = 0; +// The timestamp type for when data entered the packet scheduler. +constexpr int SCM_TSTAMP_SCHED = 1; +// The timestamp type for when data acknowledged by peer. +constexpr int SCM_TSTAMP_ACK = 2; + +// Control message type containing OPT_STATS +#ifndef SCM_TIMESTAMPING_OPT_STATS +#define SCM_TIMESTAMPING_OPT_STATS 54 +#endif + +// Redefine required constants from +constexpr uint32_t SOF_TIMESTAMPING_TX_SOFTWARE = 1u << 1; +constexpr uint32_t SOF_TIMESTAMPING_SOFTWARE = 1u << 4; +constexpr uint32_t SOF_TIMESTAMPING_OPT_ID = 1u << 7; +constexpr uint32_t SOF_TIMESTAMPING_TX_SCHED = 1u << 8; +constexpr uint32_t SOF_TIMESTAMPING_TX_ACK = 1u << 9; +constexpr uint32_t SOF_TIMESTAMPING_OPT_TSONLY = 1u << 11; +constexpr uint32_t SOF_TIMESTAMPING_OPT_STATS = 1u << 12; + +constexpr uint32_t kTimestampingSocketOptions = + SOF_TIMESTAMPING_SOFTWARE | SOF_TIMESTAMPING_OPT_ID | + SOF_TIMESTAMPING_OPT_TSONLY | SOF_TIMESTAMPING_OPT_STATS; +constexpr uint32_t kTimestampingRecordingOptions = + SOF_TIMESTAMPING_TX_SCHED | SOF_TIMESTAMPING_TX_SOFTWARE | + SOF_TIMESTAMPING_TX_ACK; + +// Netlink attribute types used for TCP opt stats. +enum TCPOptStats { + TCP_NLA_PAD, + TCP_NLA_BUSY, /* Time (usec) busy sending data. */ + TCP_NLA_RWND_LIMITED, /* Time (usec) limited by receive window. */ + TCP_NLA_SNDBUF_LIMITED, /* Time (usec) limited by send buffer. */ + TCP_NLA_DATA_SEGS_OUT, /* Data pkts sent including retransmission. */ + TCP_NLA_TOTAL_RETRANS, /* Data pkts retransmitted. */ + TCP_NLA_PACING_RATE, /* Pacing rate in Bps. */ + TCP_NLA_DELIVERY_RATE, /* Delivery rate in Bps. */ + TCP_NLA_SND_CWND, /* Sending congestion window. */ + TCP_NLA_REORDERING, /* Reordering metric. */ + TCP_NLA_MIN_RTT, /* minimum RTT. */ + TCP_NLA_RECUR_RETRANS, /* Recurring retransmits for the current pkt. */ + TCP_NLA_DELIVERY_RATE_APP_LMT, /* Delivery rate application limited? */ + TCP_NLA_SNDQ_SIZE, /* Data (bytes) pending in send queue */ + TCP_NLA_CA_STATE, /* ca_state of socket */ + TCP_NLA_SND_SSTHRESH, /* Slow start size threshold */ + TCP_NLA_DELIVERED, /* Data pkts delivered incl. out-of-order */ + TCP_NLA_DELIVERED_CE, /* Like above but only ones w/ CE marks */ + TCP_NLA_BYTES_SENT, /* Data bytes sent including retransmission */ + TCP_NLA_BYTES_RETRANS, /* Data bytes retransmitted */ + TCP_NLA_DSACK_DUPS, /* DSACK blocks received */ + TCP_NLA_REORD_SEEN, /* reordering events seen */ + TCP_NLA_SRTT, /* smoothed RTT in usecs */ +}; + +/* tcp_info from from linux/tcp.h */ +struct tcp_info { + uint8_t tcpi_state; + uint8_t tcpi_ca_state; + uint8_t tcpi_retransmits; + uint8_t tcpi_probes; + uint8_t tcpi_backoff; + uint8_t tcpi_options; + uint8_t tcpi_snd_wscale : 4, tcpi_rcv_wscale : 4; + uint8_t tcpi_delivery_rate_app_limited : 1; + uint32_t tcpi_rto; + uint32_t tcpi_ato; + uint32_t tcpi_snd_mss; + uint32_t tcpi_rcv_mss; + uint32_t tcpi_unacked; + uint32_t tcpi_sacked; + uint32_t tcpi_lost; + uint32_t tcpi_retrans; + uint32_t tcpi_fackets; + /* Times. */ + uint32_t tcpi_last_data_sent; + uint32_t tcpi_last_ack_sent; /* Not remembered, sorry. */ + uint32_t tcpi_last_data_recv; + uint32_t tcpi_last_ack_recv; + /* Metrics. */ + uint32_t tcpi_pmtu; + uint32_t tcpi_rcv_ssthresh; + uint32_t tcpi_rtt; + uint32_t tcpi_rttvar; + uint32_t tcpi_snd_ssthresh; + uint32_t tcpi_snd_cwnd; + uint32_t tcpi_advmss; + uint32_t tcpi_reordering; + uint32_t tcpi_rcv_rtt; + uint32_t tcpi_rcv_space; + uint32_t tcpi_total_retrans; + uint64_t tcpi_pacing_rate; + uint64_t tcpi_max_pacing_rate; + uint64_t tcpi_bytes_acked; /* RFC4898 tcpEStatsAppHCThruOctetsAcked */ + uint64_t tcpi_bytes_received; /* RFC4898 tcpEStatsAppHCThruOctetsReceived */ + + uint32_t tcpi_segs_out; /* RFC4898 tcpEStatsPerfSegsOut */ + uint32_t tcpi_segs_in; /* RFC4898 tcpEStatsPerfSegsIn */ + uint32_t tcpi_notsent_bytes; + uint32_t tcpi_min_rtt; + + uint32_t tcpi_data_segs_in; /* RFC4898 tcpEStatsDataSegsIn */ + uint32_t tcpi_data_segs_out; /* RFC4898 tcpEStatsDataSegsOut */ + + uint64_t tcpi_delivery_rate; + uint64_t tcpi_busy_time; /* Time (usec) busy sending data */ + uint64_t tcpi_rwnd_limited; /* Time (usec) limited by receive window */ + uint64_t tcpi_sndbuf_limited; /* Time (usec) limited by send buffer */ + + uint32_t tcpi_delivered; + uint32_t tcpi_delivered_ce; + uint64_t tcpi_bytes_sent; /* RFC4898 tcpEStatsPerfHCDataOctetsOut */ + uint64_t tcpi_bytes_retrans; /* RFC4898 tcpEStatsPerfOctetsRetrans */ + uint32_t tcpi_dsack_dups; /* RFC4898 tcpEStatsStackDSACKDups */ + uint32_t tcpi_reord_seen; /* reordering events seen */ + socklen_t length; /* Length of struct returned by kernel */ +}; + +#ifndef TCP_INFO +#define TCP_INFO 11 +#endif + +int GetSocketTcpInfo(tcp_info* info, int fd); + +#endif /* GRPC_LINUX_ERRQUEUE */ + +// Returns true if kernel is capable of supporting errqueue and timestamping. +// Currently allowing only linux kernels above 4.0.0 +bool KernelSupportsErrqueue(); + +} // namespace posix_engine +} // namespace grpc_event_engine + +#endif /* GRPC_POSIX_SOCKET_TCP */ + +#endif /* GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_INTERNAL_ERRQUEUE_H */ \ No newline at end of file diff --git a/src/core/lib/event_engine/posix_engine/traced_buffer_list.cc b/src/core/lib/event_engine/posix_engine/traced_buffer_list.cc new file mode 100644 index 00000000000..6ec3c7bcf5d --- /dev/null +++ b/src/core/lib/event_engine/posix_engine/traced_buffer_list.cc @@ -0,0 +1,286 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/event_engine/posix_engine/traced_buffer_list.h" + +#include +#include +#include +#include + +#include + +#include "absl/functional/any_invocable.h" + +#include +#include + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_LINUX_ERRQUEUE +#include // IWYU pragma: keep +#include +#include // IWYU pragma: keep + +namespace grpc_event_engine { +namespace posix_engine { + +namespace { +// Fills gpr_timespec gts based on values from timespec ts. +void FillGprFromTimestamp(gpr_timespec* gts, const struct timespec* ts) { + gts->tv_sec = ts->tv_sec; + gts->tv_nsec = static_cast(ts->tv_nsec); + gts->clock_type = GPR_CLOCK_REALTIME; +} + +void DefaultTimestampsCallback(void* /*arg*/, Timestamps* /*ts*/, + absl::Status /*shudown_err*/) { + gpr_log(GPR_DEBUG, "Timestamps callback has not been registered"); +} + +// The saved callback function that will be invoked when we get all the +// timestamps that we are going to get for a TracedBuffer. +absl::AnyInvocable + g_timestamps_callback = + []() -> absl::AnyInvocable { + return DefaultTimestampsCallback; +}(); + +// Used to extract individual opt stats from cmsg, so as to avoid troubles with +// unaligned reads. +template +T ReadUnaligned(const void* ptr) { + T val; + memcpy(&val, ptr, sizeof(val)); + return val; +} + +// Extracts opt stats from the tcp_info struct \a info to \a metrics +void ExtractOptStatsFromTcpInfo(ConnectionMetrics* metrics, + const tcp_info* info) { + if (info == nullptr) { + return; + } + if (info->length > offsetof(tcp_info, tcpi_sndbuf_limited)) { + metrics->recurring_retrans = info->tcpi_retransmits; + metrics->is_delivery_rate_app_limited = + info->tcpi_delivery_rate_app_limited; + metrics->congestion_window = info->tcpi_snd_cwnd; + metrics->reordering = info->tcpi_reordering; + metrics->packet_retx = info->tcpi_total_retrans; + metrics->pacing_rate = info->tcpi_pacing_rate; + metrics->data_notsent = info->tcpi_notsent_bytes; + if (info->tcpi_min_rtt != UINT32_MAX) { + metrics->min_rtt = info->tcpi_min_rtt; + } + metrics->packet_sent = info->tcpi_data_segs_out; + metrics->delivery_rate = info->tcpi_delivery_rate; + metrics->busy_usec = info->tcpi_busy_time; + metrics->rwnd_limited_usec = info->tcpi_rwnd_limited; + metrics->sndbuf_limited_usec = info->tcpi_sndbuf_limited; + } + if (info->length > offsetof(tcp_info, tcpi_dsack_dups)) { + metrics->data_sent = info->tcpi_bytes_sent; + metrics->data_retx = info->tcpi_bytes_retrans; + metrics->packet_spurious_retx = info->tcpi_dsack_dups; + } +} + +// Extracts opt stats from the given control message \a opt_stats to the +// connection metrics \a metrics. +void ExtractOptStatsFromCmsg(ConnectionMetrics* metrics, + const cmsghdr* opt_stats) { + if (opt_stats == nullptr) { + return; + } + const auto* data = CMSG_DATA(opt_stats); + constexpr int64_t cmsg_hdr_len = CMSG_ALIGN(sizeof(struct cmsghdr)); + const int64_t len = opt_stats->cmsg_len - cmsg_hdr_len; + int64_t offset = 0; + + while (offset < len) { + const auto* attr = reinterpret_cast(data + offset); + const void* val = data + offset + NLA_HDRLEN; + switch (attr->nla_type) { + case TCP_NLA_BUSY: { + metrics->busy_usec = ReadUnaligned(val); + break; + } + case TCP_NLA_RWND_LIMITED: { + metrics->rwnd_limited_usec = ReadUnaligned(val); + break; + } + case TCP_NLA_SNDBUF_LIMITED: { + metrics->sndbuf_limited_usec = ReadUnaligned(val); + break; + } + case TCP_NLA_PACING_RATE: { + metrics->pacing_rate = ReadUnaligned(val); + break; + } + case TCP_NLA_DELIVERY_RATE: { + metrics->delivery_rate = ReadUnaligned(val); + break; + } + case TCP_NLA_DELIVERY_RATE_APP_LMT: { + metrics->is_delivery_rate_app_limited = ReadUnaligned(val); + break; + } + case TCP_NLA_SND_CWND: { + metrics->congestion_window = ReadUnaligned(val); + break; + } + case TCP_NLA_MIN_RTT: { + metrics->min_rtt = ReadUnaligned(val); + break; + } + case TCP_NLA_SRTT: { + metrics->srtt = ReadUnaligned(val); + break; + } + case TCP_NLA_RECUR_RETRANS: { + metrics->recurring_retrans = ReadUnaligned(val); + break; + } + case TCP_NLA_BYTES_SENT: { + metrics->data_sent = ReadUnaligned(val); + break; + } + case TCP_NLA_DATA_SEGS_OUT: { + metrics->packet_sent = ReadUnaligned(val); + break; + } + case TCP_NLA_TOTAL_RETRANS: { + metrics->packet_retx = ReadUnaligned(val); + break; + } + case TCP_NLA_DELIVERED: { + metrics->packet_delivered = ReadUnaligned(val); + break; + } + case TCP_NLA_DELIVERED_CE: { + metrics->packet_delivered_ce = ReadUnaligned(val); + break; + } + case TCP_NLA_BYTES_RETRANS: { + metrics->data_retx = ReadUnaligned(val); + break; + } + case TCP_NLA_DSACK_DUPS: { + metrics->packet_spurious_retx = ReadUnaligned(val); + break; + } + case TCP_NLA_REORDERING: { + metrics->reordering = ReadUnaligned(val); + break; + } + case TCP_NLA_SND_SSTHRESH: { + metrics->snd_ssthresh = ReadUnaligned(val); + break; + } + } + offset += NLA_ALIGN(attr->nla_len); + } +} +} // namespace. + +void TracedBufferList::AddNewEntry(int32_t seq_no, int fd, void* arg) { + buffer_list_.emplace_back(seq_no, arg); + TracedBuffer& new_elem = buffer_list_.back(); + // Store the current time as the sendmsg time. + new_elem.ts_.sendmsg_time.time = gpr_now(GPR_CLOCK_REALTIME); + new_elem.ts_.scheduled_time.time = gpr_inf_past(GPR_CLOCK_REALTIME); + new_elem.ts_.sent_time.time = gpr_inf_past(GPR_CLOCK_REALTIME); + new_elem.ts_.acked_time.time = gpr_inf_past(GPR_CLOCK_REALTIME); + + if (GetSocketTcpInfo(&new_elem.ts_.info, fd) == 0) { + ExtractOptStatsFromTcpInfo(&new_elem.ts_.sendmsg_time.metrics, + &new_elem.ts_.info); + } +} + +void TracedBufferList::ProcessTimestamp(struct sock_extended_err* serr, + struct cmsghdr* opt_stats, + struct scm_timestamping* tss) { + auto it = buffer_list_.begin(); + while (it != buffer_list_.end()) { + TracedBuffer& elem = (*it); + // The byte number refers to the sequence number of the last byte which this + // timestamp relates to. + if (serr->ee_data >= elem.seq_no_) { + switch (serr->ee_info) { + case SCM_TSTAMP_SCHED: + FillGprFromTimestamp(&(elem.ts_.scheduled_time.time), &(tss->ts[0])); + ExtractOptStatsFromCmsg(&(elem.ts_.scheduled_time.metrics), + opt_stats); + ++it; + break; + case SCM_TSTAMP_SND: + FillGprFromTimestamp(&(elem.ts_.sent_time.time), &(tss->ts[0])); + ExtractOptStatsFromCmsg(&(elem.ts_.sent_time.metrics), opt_stats); + ++it; + break; + case SCM_TSTAMP_ACK: + FillGprFromTimestamp(&(elem.ts_.acked_time.time), &(tss->ts[0])); + ExtractOptStatsFromCmsg(&(elem.ts_.acked_time.metrics), opt_stats); + // Got all timestamps. Do the callback and free this TracedBuffer. The + // thing below can be passed by value if we don't want the restriction + // on the lifetime. + g_timestamps_callback(elem.arg_, &(elem.ts_), absl::OkStatus()); + it = buffer_list_.erase(it); + break; + default: + abort(); + } + } else { + break; + } + } +} + +void TracedBufferList::Shutdown(void* remaining, absl::Status shutdown_err) { + while (!buffer_list_.empty()) { + TracedBuffer& elem = buffer_list_.front(); + g_timestamps_callback(elem.arg_, &(elem.ts_), shutdown_err); + buffer_list_.pop_front(); + } + if (remaining != nullptr) { + g_timestamps_callback(remaining, nullptr, shutdown_err); + } +} + +void TcpSetWriteTimestampsCallback( + absl::AnyInvocable fn) { + g_timestamps_callback = std::move(fn); +} + +} // namespace posix_engine +} // namespace grpc_event_engine + +#else /* GRPC_LINUX_ERRQUEUE */ + +namespace grpc_event_engine { +namespace posix_engine { + +void TcpSetWriteTimestampsCallback( + absl::AnyInvocable /*fn*/) { + GPR_ASSERT(false && "Timestamps callback is not enabled for this platform"); +} + +} // namespace posix_engine +} // namespace grpc_event_engine + +#endif /* GRPC_LINUX_ERRQUEUE */ diff --git a/src/core/lib/event_engine/posix_engine/traced_buffer_list.h b/src/core/lib/event_engine/posix_engine/traced_buffer_list.h new file mode 100644 index 00000000000..02fd484219c --- /dev/null +++ b/src/core/lib/event_engine/posix_engine/traced_buffer_list.h @@ -0,0 +1,169 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TRACED_BUFFER_LIST_H +#define GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TRACED_BUFFER_LIST_H + +#include + +#include + +#include + +#include "absl/functional/any_invocable.h" +#include "absl/status/status.h" +#include "absl/types/optional.h" + +#include + +#include "src/core/lib/event_engine/posix_engine/internal_errqueue.h" +#include "src/core/lib/iomgr/port.h" + +// #undef GRPC_LINUX_ERRQUEUE + +namespace grpc_event_engine { +namespace posix_engine { + +struct ConnectionMetrics { /* Delivery rate in Bytes/s. */ + absl::optional delivery_rate; + /* If the delivery rate is limited by the application, this is set to true. */ + absl::optional is_delivery_rate_app_limited; + /* Total packets retransmitted. */ + absl::optional packet_retx; + /* Total packets retransmitted spuriously. This metric is smaller than or + equal to packet_retx. */ + absl::optional packet_spurious_retx; + /* Total packets sent. */ + absl::optional packet_sent; + /* Total packets delivered. */ + absl::optional packet_delivered; + /* Total packets delivered with ECE marked. This metric is smaller than or + equal to packet_delivered. */ + absl::optional packet_delivered_ce; + /* Total bytes lost so far. */ + absl::optional data_retx; + /* Total bytes sent so far. */ + absl::optional data_sent; + /* Total bytes in write queue but not sent. */ + absl::optional data_notsent; + /* Pacing rate of the connection in Bps */ + absl::optional pacing_rate; + /* Minimum RTT observed in usec. */ + absl::optional min_rtt; + /* Smoothed RTT in usec */ + absl::optional srtt; + /* Send congestion window. */ + absl::optional congestion_window; + /* Slow start threshold in packets. */ + absl::optional snd_ssthresh; + /* Maximum degree of reordering (i.e., maximum number of packets reodered) + on the connection. */ + absl::optional reordering; + /* Represents the number of recurring retransmissions of the first sequence + that is not acknowledged yet. */ + absl::optional recurring_retrans; + /* The cumulative time (in usec) that the transport protocol was busy + sending data. */ + absl::optional busy_usec; + /* The cumulative time (in usec) that the transport protocol was limited by + the receive window size. */ + absl::optional rwnd_limited_usec; + /* The cumulative time (in usec) that the transport protocol was limited by + the send buffer size. */ + absl::optional sndbuf_limited_usec; +}; + +struct BufferTimestamp { + gpr_timespec time; + ConnectionMetrics metrics; /* Metrics collected with this timestamp */ +}; + +struct Timestamps { + BufferTimestamp sendmsg_time; + BufferTimestamp scheduled_time; + BufferTimestamp sent_time; + BufferTimestamp acked_time; + + uint32_t byte_offset; /* byte offset relative to the start of the RPC */ + +#ifdef GRPC_LINUX_ERRQUEUE + tcp_info info; /* tcp_info collected on sendmsg */ +#endif /* GRPC_LINUX_ERRQUEUE */ +}; + +// TracedBuffer is a class to keep track of timestamps for a specific buffer in +// the TCP layer. We are only tracking timestamps for Linux kernels and hence +// this class would only be used by Linux platforms. For all other platforms, +// TracedBuffer would be an empty class. +// The timestamps collected are according to Timestamps declared above A +// TracedBuffer list is kept track of using the head element of the list. If +// *the head element of the list is nullptr, then the list is empty. +#ifdef GRPC_LINUX_ERRQUEUE + +class TracedBufferList { + public: + // Add a new entry in the TracedBuffer list pointed to by head. Also saves + // sendmsg_time with the current timestamp. + void AddNewEntry(int32_t seq_no, int fd, void* arg); + // Processes a received timestamp based on sock_extended_err and + // scm_timestamping structures. It will invoke the timestamps callback if the + // timestamp type is SCM_TSTAMP_ACK. + void ProcessTimestamp(struct sock_extended_err* serr, + struct cmsghdr* opt_stats, + struct scm_timestamping* tss); + int Size() { return buffer_list_.size(); } + // Cleans the list by calling the callback for each traced buffer in the list + // with timestamps that it has. + void Shutdown(void* /*remaining*/, absl::Status /*shutdown_err*/); + + private: + class TracedBuffer { + public: + TracedBuffer(uint32_t seq_no, void* arg) : seq_no_(seq_no), arg_(arg) {} + + private: + friend class TracedBufferList; + uint32_t seq_no_; /* The sequence number for the last byte in the buffer */ + void* arg_; /* The arg to pass to timestamps_callback */ + Timestamps ts_; /* The timestamps corresponding to this buffer */ + }; + // TracedBuffers are ordered by sequence number and would need to be processed + // in a FIFO order starting with the smallest sequence number. To enable this, + // they are stored in a std::list which allows easy appends and forward + // iteration operations. + std::list buffer_list_; +}; + +#else /* GRPC_LINUX_ERRQUEUE */ +// TracedBufferList implementation is a no-op for this platform. +class TracedBufferList { + public: + void AddNewEntry(int32_t /*seq_no*/, int /*fd*/, void* /*arg*/) {} + void ProcessTimestamp(struct sock_extended_err* /*serr*/, + struct cmsghdr* /*opt_stats*/, + struct scm_timestamping* /*tss*/) {} + int Size() { return 0; } + void Shutdown(void* /*remaining*/, absl::Status /*shutdown_err*/) {} +}; +#endif /* GRPC_LINUX_ERRQUEUE */ + +// Sets the callback function to call when timestamps for a write are collected. +// This is expected to be called atmost once. +void TcpSetWriteTimestampsCallback( + absl::AnyInvocable); + +} // namespace posix_engine +} // namespace grpc_event_engine + +#endif /* GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TRACED_BUFFER_LIST_H */ diff --git a/test/core/event_engine/posix/BUILD b/test/core/event_engine/posix/BUILD index 6056380e6e5..5945418a9bf 100644 --- a/test/core/event_engine/posix/BUILD +++ b/test/core/event_engine/posix/BUILD @@ -97,3 +97,19 @@ grpc_cc_test( "//test/core/util:grpc_test_util", ], ) + +grpc_cc_test( + name = "traced_buffer_list_test", + srcs = ["traced_buffer_list_test.cc"], + external_deps = ["gtest"], + language = "C++", + tags = [ + "no_windows", + ], + uses_event_engine = False, + uses_polling = False, + deps = [ + "//:posix_event_engine_traced_buffer_list", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/core/event_engine/posix/traced_buffer_list_test.cc b/test/core/event_engine/posix/traced_buffer_list_test.cc new file mode 100644 index 00000000000..26be24095e8 --- /dev/null +++ b/test/core/event_engine/posix/traced_buffer_list_test.cc @@ -0,0 +1,133 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/lib/event_engine/posix_engine/traced_buffer_list.h" + +#include + +#include + +#include + +#include "src/core/lib/iomgr/port.h" +#include "test/core/util/test_config.h" + +#ifdef GRPC_LINUX_ERRQUEUE + +#define NUM_ELEM 5 + +namespace grpc_event_engine { +namespace posix_engine { +namespace { + +void TestShutdownFlushesListVerifier(void* arg, Timestamps* /*ts*/, + absl::Status status) { + ASSERT_TRUE(status.ok()); + ASSERT_NE(arg, nullptr); + int* done = reinterpret_cast(arg); + *done = 1; +} + +void TestVerifierCalledOnAckVerifier(void* arg, Timestamps* ts, + absl::Status status) { + ASSERT_TRUE(status.ok()); + ASSERT_NE(arg, nullptr); + ASSERT_EQ(ts->acked_time.time.clock_type, GPR_CLOCK_REALTIME); + ASSERT_EQ(ts->acked_time.time.tv_sec, 123); + ASSERT_EQ(ts->acked_time.time.tv_nsec, 456); + ASSERT_GT(ts->info.length, 0); + int* done = reinterpret_cast(arg); + *done = 1; +} + +} // namespace + +// Tests that all TracedBuffer elements in the list are flushed out on shutdown. +// Also tests that arg is passed correctly. +TEST(BufferListTest, TestShutdownFlushesList) { + TcpSetWriteTimestampsCallback(TestShutdownFlushesListVerifier); + TracedBufferList traced_buffers; + int verifier_called[NUM_ELEM]; + for (auto i = 0; i < NUM_ELEM; i++) { + verifier_called[i] = 0; + traced_buffers.AddNewEntry(i, 0, static_cast(&verifier_called[i])); + } + traced_buffers.Shutdown(nullptr, absl::OkStatus()); + for (auto i = 0; i < NUM_ELEM; i++) { + ASSERT_EQ(verifier_called[i], 1); + } + ASSERT_TRUE(traced_buffers.Size() == 0); +} + +// Tests that the timestamp verifier is called on an ACK timestamp. +TEST(BufferListTest, TestVerifierCalledOnAck) { + struct sock_extended_err serr; + serr.ee_data = 213; + serr.ee_info = SCM_TSTAMP_ACK; + struct scm_timestamping tss; + tss.ts[0].tv_sec = 123; + tss.ts[0].tv_nsec = 456; + TcpSetWriteTimestampsCallback(TestVerifierCalledOnAckVerifier); + TracedBufferList traced_buffers; + int verifier_called = 0; + traced_buffers.AddNewEntry(213, 0, &verifier_called); + traced_buffers.ProcessTimestamp(&serr, nullptr, &tss); + ASSERT_EQ(verifier_called, 1); + ASSERT_TRUE(traced_buffers.Size() == 0); + traced_buffers.Shutdown(nullptr, absl::OkStatus()); + ASSERT_TRUE(traced_buffers.Size() == 0); +} + +// Tests that ProcessTimestamp called after Shutdown does nothing. +TEST(BufferListTest, TestProcessTimestampAfterShutdown) { + struct sock_extended_err serr; + serr.ee_data = 213; + serr.ee_info = SCM_TSTAMP_ACK; + struct scm_timestamping tss; + tss.ts[0].tv_sec = 123; + tss.ts[0].tv_nsec = 456; + TcpSetWriteTimestampsCallback(TestShutdownFlushesListVerifier); + TracedBufferList traced_buffers; + int verifier_called = 0; + + traced_buffers.AddNewEntry(213, 0, &verifier_called); + ASSERT_TRUE(traced_buffers.Size() == 1); + traced_buffers.Shutdown(nullptr, absl::OkStatus()); + ASSERT_TRUE(traced_buffers.Size() == 0); + // Check that the callback was executed after first Shutdown. + ASSERT_EQ(verifier_called, 1); + verifier_called = 0; + traced_buffers.Shutdown(nullptr, absl::OkStatus()); + ASSERT_TRUE(traced_buffers.Size() == 0); + // Second Shutdown should not execute the callback. + ASSERT_EQ(verifier_called, 0); + traced_buffers.ProcessTimestamp(&serr, nullptr, &tss); + // A ProcessTimestamp after Shutdown should not execute the callback. + ASSERT_EQ(verifier_called, 0); +} + +} // namespace posix_engine +} // namespace grpc_event_engine + +int main(int argc, char** argv) { + grpc::testing::TestEnvironment env(&argc, argv); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else /* GRPC_LINUX_ERRQUEUE */ + +int main(int /*argc*/, char** /*argv*/) { return 0; } + +#endif /* GRPC_LINUX_ERRQUEUE */ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index bcda1a666f9..2af1bcd2af7 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -7487,6 +7487,28 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "traced_buffer_list_test", + "platforms": [ + "linux", + "mac", + "posix" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,