diff --git a/BUILD b/BUILD index 03dc449cb02..55f8f199195 100644 --- a/BUILD +++ b/BUILD @@ -643,6 +643,17 @@ grpc_cc_library( public_hdrs = ["src/core/lib/gprpp/debug_location.h"], ) +grpc_cc_library( + name = "optional", + language = "c++", + public_hdrs = [ + "src/core/lib/gprpp/optional.h", + ], + deps = [ + "gpr_base", + ], +) + grpc_cc_library( name = "orphanable", language = "c++", @@ -976,6 +987,7 @@ grpc_cc_library( "grpc_codegen", "grpc_trace", "inlined_vector", + "optional", "orphanable", "ref_counted", "ref_counted_ptr", diff --git a/CMakeLists.txt b/CMakeLists.txt index cf6256af288..a36d06a703c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -652,6 +652,7 @@ add_dependencies(buildtests_cxx metrics_client) add_dependencies(buildtests_cxx mock_test) add_dependencies(buildtests_cxx nonblocking_test) add_dependencies(buildtests_cxx noop-benchmark) +add_dependencies(buildtests_cxx optional_test) add_dependencies(buildtests_cxx orphanable_test) add_dependencies(buildtests_cxx proto_server_reflection_test) add_dependencies(buildtests_cxx proto_utils_test) @@ -14445,6 +14446,45 @@ target_link_libraries(noop-benchmark ) +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) + +add_executable(optional_test + test/core/gprpp/optional_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + + +target_include_directories(optional_test + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${_gRPC_SSL_INCLUDE_DIR} + PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR} + PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR} + PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR} + PRIVATE ${_gRPC_CARES_INCLUDE_DIR} + PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR} + PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR} + PRIVATE third_party/googletest/googletest/include + PRIVATE third_party/googletest/googletest + PRIVATE third_party/googletest/googlemock/include + PRIVATE third_party/googletest/googlemock + PRIVATE ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(optional_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + grpc++ + grpc + gpr + ${_gRPC_GFLAGS_LIBRARIES} +) + + endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) diff --git a/Makefile b/Makefile index 3353e140351..fd76e8b7d72 100644 --- a/Makefile +++ b/Makefile @@ -1213,6 +1213,7 @@ metrics_client: $(BINDIR)/$(CONFIG)/metrics_client mock_test: $(BINDIR)/$(CONFIG)/mock_test nonblocking_test: $(BINDIR)/$(CONFIG)/nonblocking_test noop-benchmark: $(BINDIR)/$(CONFIG)/noop-benchmark +optional_test: $(BINDIR)/$(CONFIG)/optional_test orphanable_test: $(BINDIR)/$(CONFIG)/orphanable_test proto_server_reflection_test: $(BINDIR)/$(CONFIG)/proto_server_reflection_test proto_utils_test: $(BINDIR)/$(CONFIG)/proto_utils_test @@ -1720,6 +1721,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/mock_test \ $(BINDIR)/$(CONFIG)/nonblocking_test \ $(BINDIR)/$(CONFIG)/noop-benchmark \ + $(BINDIR)/$(CONFIG)/optional_test \ $(BINDIR)/$(CONFIG)/orphanable_test \ $(BINDIR)/$(CONFIG)/proto_server_reflection_test \ $(BINDIR)/$(CONFIG)/proto_utils_test \ @@ -1906,6 +1908,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/mock_test \ $(BINDIR)/$(CONFIG)/nonblocking_test \ $(BINDIR)/$(CONFIG)/noop-benchmark \ + $(BINDIR)/$(CONFIG)/optional_test \ $(BINDIR)/$(CONFIG)/orphanable_test \ $(BINDIR)/$(CONFIG)/proto_server_reflection_test \ $(BINDIR)/$(CONFIG)/proto_utils_test \ @@ -2399,6 +2402,8 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/nonblocking_test || ( echo test nonblocking_test failed ; exit 1 ) $(E) "[RUN] Testing noop-benchmark" $(Q) $(BINDIR)/$(CONFIG)/noop-benchmark || ( echo test noop-benchmark failed ; exit 1 ) + $(E) "[RUN] Testing optional_test" + $(Q) $(BINDIR)/$(CONFIG)/optional_test || ( echo test optional_test failed ; exit 1 ) $(E) "[RUN] Testing orphanable_test" $(Q) $(BINDIR)/$(CONFIG)/orphanable_test || ( echo test orphanable_test failed ; exit 1 ) $(E) "[RUN] Testing proto_server_reflection_test" @@ -19444,6 +19449,49 @@ endif endif +OPTIONAL_TEST_SRC = \ + test/core/gprpp/optional_test.cc \ + +OPTIONAL_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(OPTIONAL_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/optional_test: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+. + +$(BINDIR)/$(CONFIG)/optional_test: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/optional_test: $(PROTOBUF_DEP) $(OPTIONAL_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(OPTIONAL_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/optional_test + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/core/gprpp/optional_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_optional_test: $(OPTIONAL_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(OPTIONAL_TEST_OBJS:.o=.dep) +endif +endif + + ORPHANABLE_TEST_SRC = \ test/core/gprpp/orphanable_test.cc \ diff --git a/build.yaml b/build.yaml index 86fb0267fa9..3afe4a3e9ce 100644 --- a/build.yaml +++ b/build.yaml @@ -430,6 +430,7 @@ filegroups: - src/core/lib/debug/stats_data.h - src/core/lib/gprpp/debug_location.h - src/core/lib/gprpp/inlined_vector.h + - src/core/lib/gprpp/optional.h - src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted_ptr.h @@ -5059,6 +5060,19 @@ targets: deps: - benchmark defaults: benchmark +- name: optional_test + gtest: true + build: test + language: c++ + src: + - test/core/gprpp/optional_test.cc + deps: + - grpc_test_util + - grpc++ + - grpc + - gpr + uses: + - grpc++_test - name: orphanable_test gtest: true build: test diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 481892b63c7..710fc461441 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -402,6 +402,7 @@ Pod::Spec.new do |s| 'src/core/lib/debug/stats_data.h', 'src/core/lib/gprpp/debug_location.h', 'src/core/lib/gprpp/inlined_vector.h', + 'src/core/lib/gprpp/optional.h', 'src/core/lib/gprpp/orphanable.h', 'src/core/lib/gprpp/ref_counted.h', 'src/core/lib/gprpp/ref_counted_ptr.h', @@ -595,6 +596,7 @@ Pod::Spec.new do |s| 'src/core/lib/debug/stats_data.h', 'src/core/lib/gprpp/debug_location.h', 'src/core/lib/gprpp/inlined_vector.h', + 'src/core/lib/gprpp/optional.h', 'src/core/lib/gprpp/orphanable.h', 'src/core/lib/gprpp/ref_counted.h', 'src/core/lib/gprpp/ref_counted_ptr.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 414fe1d3172..b5f41dfbe77 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -396,6 +396,7 @@ Pod::Spec.new do |s| 'src/core/lib/debug/stats_data.h', 'src/core/lib/gprpp/debug_location.h', 'src/core/lib/gprpp/inlined_vector.h', + 'src/core/lib/gprpp/optional.h', 'src/core/lib/gprpp/orphanable.h', 'src/core/lib/gprpp/ref_counted.h', 'src/core/lib/gprpp/ref_counted_ptr.h', @@ -1024,6 +1025,7 @@ Pod::Spec.new do |s| 'src/core/lib/debug/stats_data.h', 'src/core/lib/gprpp/debug_location.h', 'src/core/lib/gprpp/inlined_vector.h', + 'src/core/lib/gprpp/optional.h', 'src/core/lib/gprpp/orphanable.h', 'src/core/lib/gprpp/ref_counted.h', 'src/core/lib/gprpp/ref_counted_ptr.h', diff --git a/grpc.gemspec b/grpc.gemspec index 5e5eb65ed2f..d245c722037 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -332,6 +332,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/debug/stats_data.h ) s.files += %w( src/core/lib/gprpp/debug_location.h ) s.files += %w( src/core/lib/gprpp/inlined_vector.h ) + s.files += %w( src/core/lib/gprpp/optional.h ) s.files += %w( src/core/lib/gprpp/orphanable.h ) s.files += %w( src/core/lib/gprpp/ref_counted.h ) s.files += %w( src/core/lib/gprpp/ref_counted_ptr.h ) diff --git a/package.xml b/package.xml index 523f78f1db6..cb036c81daf 100644 --- a/package.xml +++ b/package.xml @@ -337,6 +337,7 @@ + diff --git a/src/core/lib/gprpp/optional.h b/src/core/lib/gprpp/optional.h new file mode 100644 index 00000000000..e517c6edccc --- /dev/null +++ b/src/core/lib/gprpp/optional.h @@ -0,0 +1,47 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_GPRPP_OPTIONAL_H +#define GRPC_CORE_LIB_GPRPP_OPTIONAL_H + +namespace grpc_core { + +/* A make-shift alternative for absl::Optional. This can be removed in favor of + * that once absl dependencies can be introduced. */ +template +class Optional { + public: + void set(const T& val) { + value_ = val; + set_ = true; + } + + bool has_value() { return set_; } + + void reset() { set_ = false; } + + T value() { return value_; } + + private: + T value_; + bool set_ = false; +}; + +} /* namespace grpc_core */ + +#endif /* GRPC_CORE_LIB_GPRPP_OPTIONAL_H */ diff --git a/src/core/lib/iomgr/buffer_list.cc b/src/core/lib/iomgr/buffer_list.cc index ace17a108d1..321de539934 100644 --- a/src/core/lib/iomgr/buffer_list.cc +++ b/src/core/lib/iomgr/buffer_list.cc @@ -24,32 +24,13 @@ #include #ifdef GRPC_LINUX_ERRQUEUE +#include +#include #include #include "src/core/lib/gprpp/memory.h" namespace grpc_core { -void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no, - void* arg) { - GPR_DEBUG_ASSERT(head != nullptr); - TracedBuffer* new_elem = New(seq_no, arg); - /* Store the current time as the sendmsg time. */ - new_elem->ts_.sendmsg_time = gpr_now(GPR_CLOCK_REALTIME); - new_elem->ts_.scheduled_time = gpr_inf_past(GPR_CLOCK_REALTIME); - new_elem->ts_.sent_time = gpr_inf_past(GPR_CLOCK_REALTIME); - new_elem->ts_.acked_time = gpr_inf_past(GPR_CLOCK_REALTIME); - if (*head == nullptr) { - *head = new_elem; - return; - } - /* Append at the end. */ - TracedBuffer* ptr = *head; - while (ptr->next_ != nullptr) { - ptr = ptr->next_; - } - ptr->next_ = new_elem; -} - namespace { /** Fills gpr_timespec gts based on values from timespec ts */ void fill_gpr_from_timestamp(gpr_timespec* gts, const struct timespec* ts) { @@ -68,10 +49,180 @@ void default_timestamps_callback(void* arg, grpc_core::Timestamps* ts, void (*timestamps_callback)(void*, grpc_core::Timestamps*, grpc_error* shutdown_err) = default_timestamps_callback; + +/* Used to extract individual opt stats from cmsg, so as to avoid troubles with + * unaligned reads */ +template +T read_unaligned(const void* ptr) { + T val; + memcpy(&val, ptr, sizeof(val)); + return val; +} + +/* Extracts opt stats from the tcp_info struct \a info to \a metrics */ +void extract_opt_stats_from_tcp_info(ConnectionMetrics* metrics, + const grpc_core::tcp_info* info) { + if (info == nullptr) { + return; + } + if (info->length > offsetof(grpc_core::tcp_info, tcpi_sndbuf_limited)) { + metrics->recurring_retrans.set(info->tcpi_retransmits); + metrics->is_delivery_rate_app_limited.set( + info->tcpi_delivery_rate_app_limited); + metrics->congestion_window.set(info->tcpi_snd_cwnd); + metrics->reordering.set(info->tcpi_reordering); + metrics->packet_retx.set(info->tcpi_total_retrans); + metrics->pacing_rate.set(info->tcpi_pacing_rate); + metrics->data_notsent.set(info->tcpi_notsent_bytes); + if (info->tcpi_min_rtt != UINT32_MAX) { + metrics->min_rtt.set(info->tcpi_min_rtt); + } + metrics->packet_sent.set(info->tcpi_data_segs_out); + metrics->delivery_rate.set(info->tcpi_delivery_rate); + metrics->busy_usec.set(info->tcpi_busy_time); + metrics->rwnd_limited_usec.set(info->tcpi_rwnd_limited); + metrics->sndbuf_limited_usec.set(info->tcpi_sndbuf_limited); + } + if (info->length > offsetof(grpc_core::tcp_info, tcpi_dsack_dups)) { + metrics->data_sent.set(info->tcpi_bytes_sent); + metrics->data_retx.set(info->tcpi_bytes_retrans); + metrics->packet_spurious_retx.set(info->tcpi_dsack_dups); + } +} + +/** Extracts opt stats from the given control message \a opt_stats to the + * connection metrics \a metrics */ +void extract_opt_stats_from_cmsg(ConnectionMetrics* metrics, + const cmsghdr* opt_stats) { + if (opt_stats == nullptr) { + return; + } + const auto* data = CMSG_DATA(opt_stats); + constexpr int64_t cmsg_hdr_len = CMSG_ALIGN(sizeof(struct cmsghdr)); + const int64_t len = opt_stats->cmsg_len - cmsg_hdr_len; + int64_t offset = 0; + + while (offset < len) { + const auto* attr = reinterpret_cast(data + offset); + const void* val = data + offset + NLA_HDRLEN; + switch (attr->nla_type) { + case TCP_NLA_BUSY: { + metrics->busy_usec.set(read_unaligned(val)); + break; + } + case TCP_NLA_RWND_LIMITED: { + metrics->rwnd_limited_usec.set(read_unaligned(val)); + break; + } + case TCP_NLA_SNDBUF_LIMITED: { + metrics->sndbuf_limited_usec.set(read_unaligned(val)); + break; + } + case TCP_NLA_PACING_RATE: { + metrics->pacing_rate.set(read_unaligned(val)); + break; + } + case TCP_NLA_DELIVERY_RATE: { + metrics->delivery_rate.set(read_unaligned(val)); + break; + } + case TCP_NLA_DELIVERY_RATE_APP_LMT: { + metrics->is_delivery_rate_app_limited.set(read_unaligned(val)); + break; + } + case TCP_NLA_SND_CWND: { + metrics->congestion_window.set(read_unaligned(val)); + break; + } + case TCP_NLA_MIN_RTT: { + metrics->min_rtt.set(read_unaligned(val)); + break; + } + case TCP_NLA_SRTT: { + metrics->srtt.set(read_unaligned(val)); + break; + } + case TCP_NLA_RECUR_RETRANS: { + metrics->recurring_retrans.set(read_unaligned(val)); + break; + } + case TCP_NLA_BYTES_SENT: { + metrics->data_sent.set(read_unaligned(val)); + break; + } + case TCP_NLA_DATA_SEGS_OUT: { + metrics->packet_sent.set(read_unaligned(val)); + break; + } + case TCP_NLA_TOTAL_RETRANS: { + metrics->packet_retx.set(read_unaligned(val)); + break; + } + case TCP_NLA_DELIVERED: { + metrics->packet_delivered.set(read_unaligned(val)); + break; + } + case TCP_NLA_DELIVERED_CE: { + metrics->packet_delivered_ce.set(read_unaligned(val)); + break; + } + case TCP_NLA_BYTES_RETRANS: { + metrics->data_retx.set(read_unaligned(val)); + break; + } + case TCP_NLA_DSACK_DUPS: { + metrics->packet_spurious_retx.set(read_unaligned(val)); + break; + } + case TCP_NLA_REORDERING: { + metrics->reordering.set(read_unaligned(val)); + break; + } + case TCP_NLA_SND_SSTHRESH: { + metrics->snd_ssthresh.set(read_unaligned(val)); + break; + } + } + offset += NLA_ALIGN(attr->nla_len); + } +} + +static int get_socket_tcp_info(grpc_core::tcp_info* info, int fd) { + info->length = sizeof(*info) - sizeof(socklen_t); + memset(info, 0, sizeof(*info)); + return getsockopt(fd, IPPROTO_TCP, TCP_INFO, info, &(info->length)); +} } /* namespace */ +void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no, int fd, + void* arg) { + GPR_DEBUG_ASSERT(head != nullptr); + TracedBuffer* new_elem = New(seq_no, arg); + /* Store the current time as the sendmsg time. */ + new_elem->ts_.sendmsg_time.time = gpr_now(GPR_CLOCK_REALTIME); + new_elem->ts_.scheduled_time.time = gpr_inf_past(GPR_CLOCK_REALTIME); + new_elem->ts_.sent_time.time = gpr_inf_past(GPR_CLOCK_REALTIME); + new_elem->ts_.acked_time.time = gpr_inf_past(GPR_CLOCK_REALTIME); + + if (get_socket_tcp_info(&new_elem->ts_.info, fd) == 0) { + extract_opt_stats_from_tcp_info(&new_elem->ts_.sendmsg_time.metrics, + &new_elem->ts_.info); + } + if (*head == nullptr) { + *head = new_elem; + return; + } + /* Append at the end. */ + TracedBuffer* ptr = *head; + while (ptr->next_ != nullptr) { + ptr = ptr->next_; + } + ptr->next_ = new_elem; +} + void TracedBuffer::ProcessTimestamp(TracedBuffer** head, struct sock_extended_err* serr, + struct cmsghdr* opt_stats, struct scm_timestamping* tss) { GPR_DEBUG_ASSERT(head != nullptr); TracedBuffer* elem = *head; @@ -82,15 +233,22 @@ void TracedBuffer::ProcessTimestamp(TracedBuffer** head, if (serr->ee_data >= elem->seq_no_) { switch (serr->ee_info) { case SCM_TSTAMP_SCHED: - fill_gpr_from_timestamp(&(elem->ts_.scheduled_time), &(tss->ts[0])); + fill_gpr_from_timestamp(&(elem->ts_.scheduled_time.time), + &(tss->ts[0])); + extract_opt_stats_from_cmsg(&(elem->ts_.scheduled_time.metrics), + opt_stats); elem = elem->next_; break; case SCM_TSTAMP_SND: - fill_gpr_from_timestamp(&(elem->ts_.sent_time), &(tss->ts[0])); + fill_gpr_from_timestamp(&(elem->ts_.sent_time.time), &(tss->ts[0])); + extract_opt_stats_from_cmsg(&(elem->ts_.sent_time.metrics), + opt_stats); elem = elem->next_; break; case SCM_TSTAMP_ACK: - fill_gpr_from_timestamp(&(elem->ts_.acked_time), &(tss->ts[0])); + fill_gpr_from_timestamp(&(elem->ts_.acked_time.time), &(tss->ts[0])); + extract_opt_stats_from_cmsg(&(elem->ts_.acked_time.metrics), + opt_stats); /* Got all timestamps. Do the callback and free this TracedBuffer. * The thing below can be passed by value if we don't want the * restriction on the lifetime. */ diff --git a/src/core/lib/iomgr/buffer_list.h b/src/core/lib/iomgr/buffer_list.h index 627f1bde99a..215ab03a563 100644 --- a/src/core/lib/iomgr/buffer_list.h +++ b/src/core/lib/iomgr/buffer_list.h @@ -26,19 +26,78 @@ #include #include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/gprpp/optional.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/internal_errqueue.h" namespace grpc_core { + +struct ConnectionMetrics { + /* Delivery rate in Bytes/s. */ + Optional delivery_rate; + /* If the delivery rate is limited by the application, this is set to true. */ + Optional is_delivery_rate_app_limited; + /* Total packets retransmitted. */ + Optional packet_retx; + /* Total packets retransmitted spuriously. This metric is smaller than or + equal to packet_retx. */ + Optional packet_spurious_retx; + /* Total packets sent. */ + Optional packet_sent; + /* Total packets delivered. */ + Optional packet_delivered; + /* Total packets delivered with ECE marked. This metric is smaller than or + equal to packet_delivered. */ + Optional packet_delivered_ce; + /* Total bytes lost so far. */ + Optional data_retx; + /* Total bytes sent so far. */ + Optional data_sent; + /* Total bytes in write queue but not sent. */ + Optional data_notsent; + /* Pacing rate of the connection in Bps */ + Optional pacing_rate; + /* Minimum RTT observed in usec. */ + Optional min_rtt; + /* Smoothed RTT in usec */ + Optional srtt; + /* Send congestion window. */ + Optional congestion_window; + /* Slow start threshold in packets. */ + Optional snd_ssthresh; + /* Maximum degree of reordering (i.e., maximum number of packets reodered) + on the connection. */ + Optional reordering; + /* Represents the number of recurring retransmissions of the first sequence + that is not acknowledged yet. */ + Optional recurring_retrans; + /* The cumulative time (in usec) that the transport protocol was busy + sending data. */ + Optional busy_usec; + /* The cumulative time (in usec) that the transport protocol was limited by + the receive window size. */ + Optional rwnd_limited_usec; + /* The cumulative time (in usec) that the transport protocol was limited by + the send buffer size. */ + Optional sndbuf_limited_usec; +}; + +struct Timestamp { + gpr_timespec time; + ConnectionMetrics metrics; /* Metrics collected with this timestamp */ +}; + struct Timestamps { - /* TODO(yashykt): This would also need to store OPTSTAT once support is added - */ - gpr_timespec sendmsg_time; - gpr_timespec scheduled_time; - gpr_timespec sent_time; - gpr_timespec acked_time; + Timestamp sendmsg_time; + Timestamp scheduled_time; + Timestamp sent_time; + Timestamp acked_time; uint32_t byte_offset; /* byte offset relative to the start of the RPC */ + +#ifdef GRPC_LINUX_ERRQUEUE + grpc_core::tcp_info info; /* tcp_info collected on sendmsg */ +#endif /* GRPC_LINUX_ERRQUEUE */ }; /** TracedBuffer is a class to keep track of timestamps for a specific buffer in @@ -58,13 +117,14 @@ class TracedBuffer { /** Add a new entry in the TracedBuffer list pointed to by head. Also saves * sendmsg_time with the current timestamp. */ static void AddNewEntry(grpc_core::TracedBuffer** head, uint32_t seq_no, - void* arg); + int fd, void* arg); /** Processes a received timestamp based on sock_extended_err and * scm_timestamping structures. It will invoke the timestamps callback if the * timestamp type is SCM_TSTAMP_ACK. */ static void ProcessTimestamp(grpc_core::TracedBuffer** head, struct sock_extended_err* serr, + struct cmsghdr* opt_stats, struct scm_timestamping* tss); /** Cleans the list by calling the callback for each traced buffer in the list diff --git a/src/core/lib/iomgr/internal_errqueue.h b/src/core/lib/iomgr/internal_errqueue.h index f8644c2536c..b9fe411769f 100644 --- a/src/core/lib/iomgr/internal_errqueue.h +++ b/src/core/lib/iomgr/internal_errqueue.h @@ -37,6 +37,7 @@ #ifdef GRPC_LINUX_ERRQUEUE #include #include +#include #include #endif /* GRPC_LINUX_ERRQUEUE */ @@ -56,6 +57,12 @@ constexpr int SCM_TSTAMP_SND = 0; constexpr int SCM_TSTAMP_SCHED = 1; /* The timestamp type for when data acknowledged by peer. */ constexpr int SCM_TSTAMP_ACK = 2; + +/* Control message type containing OPT_STATS */ +#ifndef SCM_TIMESTAMPING_OPT_STATS +#define SCM_TIMESTAMPING_OPT_STATS 54 +#endif + /* Redefine required constants from */ constexpr uint32_t SOF_TIMESTAMPING_TX_SOFTWARE = 1u << 1; constexpr uint32_t SOF_TIMESTAMPING_SOFTWARE = 1u << 4; @@ -63,13 +70,108 @@ constexpr uint32_t SOF_TIMESTAMPING_OPT_ID = 1u << 7; constexpr uint32_t SOF_TIMESTAMPING_TX_SCHED = 1u << 8; constexpr uint32_t SOF_TIMESTAMPING_TX_ACK = 1u << 9; constexpr uint32_t SOF_TIMESTAMPING_OPT_TSONLY = 1u << 11; +constexpr uint32_t SOF_TIMESTAMPING_OPT_STATS = 1u << 12; -constexpr uint32_t kTimestampingSocketOptions = SOF_TIMESTAMPING_SOFTWARE | - SOF_TIMESTAMPING_OPT_ID | - SOF_TIMESTAMPING_OPT_TSONLY; +constexpr uint32_t kTimestampingSocketOptions = + SOF_TIMESTAMPING_SOFTWARE | SOF_TIMESTAMPING_OPT_ID | + SOF_TIMESTAMPING_OPT_TSONLY | SOF_TIMESTAMPING_OPT_STATS; constexpr uint32_t kTimestampingRecordingOptions = SOF_TIMESTAMPING_TX_SCHED | SOF_TIMESTAMPING_TX_SOFTWARE | SOF_TIMESTAMPING_TX_ACK; + +/* Netlink attribute types used for TCP opt stats. */ +enum TCPOptStats { + TCP_NLA_PAD, + TCP_NLA_BUSY, /* Time (usec) busy sending data. */ + TCP_NLA_RWND_LIMITED, /* Time (usec) limited by receive window. */ + TCP_NLA_SNDBUF_LIMITED, /* Time (usec) limited by send buffer. */ + TCP_NLA_DATA_SEGS_OUT, /* Data pkts sent including retransmission. */ + TCP_NLA_TOTAL_RETRANS, /* Data pkts retransmitted. */ + TCP_NLA_PACING_RATE, /* Pacing rate in Bps. */ + TCP_NLA_DELIVERY_RATE, /* Delivery rate in Bps. */ + TCP_NLA_SND_CWND, /* Sending congestion window. */ + TCP_NLA_REORDERING, /* Reordering metric. */ + TCP_NLA_MIN_RTT, /* minimum RTT. */ + TCP_NLA_RECUR_RETRANS, /* Recurring retransmits for the current pkt. */ + TCP_NLA_DELIVERY_RATE_APP_LMT, /* Delivery rate application limited? */ + TCP_NLA_SNDQ_SIZE, /* Data (bytes) pending in send queue */ + TCP_NLA_CA_STATE, /* ca_state of socket */ + TCP_NLA_SND_SSTHRESH, /* Slow start size threshold */ + TCP_NLA_DELIVERED, /* Data pkts delivered incl. out-of-order */ + TCP_NLA_DELIVERED_CE, /* Like above but only ones w/ CE marks */ + TCP_NLA_BYTES_SENT, /* Data bytes sent including retransmission */ + TCP_NLA_BYTES_RETRANS, /* Data bytes retransmitted */ + TCP_NLA_DSACK_DUPS, /* DSACK blocks received */ + TCP_NLA_REORD_SEEN, /* reordering events seen */ + TCP_NLA_SRTT, /* smoothed RTT in usecs */ +}; + +/* tcp_info from from linux/tcp.h */ +struct tcp_info { + uint8_t tcpi_state; + uint8_t tcpi_ca_state; + uint8_t tcpi_retransmits; + uint8_t tcpi_probes; + uint8_t tcpi_backoff; + uint8_t tcpi_options; + uint8_t tcpi_snd_wscale : 4, tcpi_rcv_wscale : 4; + uint8_t tcpi_delivery_rate_app_limited : 1; + uint32_t tcpi_rto; + uint32_t tcpi_ato; + uint32_t tcpi_snd_mss; + uint32_t tcpi_rcv_mss; + uint32_t tcpi_unacked; + uint32_t tcpi_sacked; + uint32_t tcpi_lost; + uint32_t tcpi_retrans; + uint32_t tcpi_fackets; + /* Times. */ + uint32_t tcpi_last_data_sent; + uint32_t tcpi_last_ack_sent; /* Not remembered, sorry. */ + uint32_t tcpi_last_data_recv; + uint32_t tcpi_last_ack_recv; + /* Metrics. */ + uint32_t tcpi_pmtu; + uint32_t tcpi_rcv_ssthresh; + uint32_t tcpi_rtt; + uint32_t tcpi_rttvar; + uint32_t tcpi_snd_ssthresh; + uint32_t tcpi_snd_cwnd; + uint32_t tcpi_advmss; + uint32_t tcpi_reordering; + uint32_t tcpi_rcv_rtt; + uint32_t tcpi_rcv_space; + uint32_t tcpi_total_retrans; + uint64_t tcpi_pacing_rate; + uint64_t tcpi_max_pacing_rate; + uint64_t tcpi_bytes_acked; /* RFC4898 tcpEStatsAppHCThruOctetsAcked */ + uint64_t tcpi_bytes_received; /* RFC4898 tcpEStatsAppHCThruOctetsReceived */ + + uint32_t tcpi_segs_out; /* RFC4898 tcpEStatsPerfSegsOut */ + uint32_t tcpi_segs_in; /* RFC4898 tcpEStatsPerfSegsIn */ + uint32_t tcpi_notsent_bytes; + uint32_t tcpi_min_rtt; + + uint32_t tcpi_data_segs_in; /* RFC4898 tcpEStatsDataSegsIn */ + uint32_t tcpi_data_segs_out; /* RFC4898 tcpEStatsDataSegsOut */ + + uint64_t tcpi_delivery_rate; + uint64_t tcpi_busy_time; /* Time (usec) busy sending data */ + uint64_t tcpi_rwnd_limited; /* Time (usec) limited by receive window */ + uint64_t tcpi_sndbuf_limited; /* Time (usec) limited by send buffer */ + + uint32_t tcpi_delivered; + uint32_t tcpi_delivered_ce; + uint64_t tcpi_bytes_sent; /* RFC4898 tcpEStatsPerfHCDataOctetsOut */ + uint64_t tcpi_bytes_retrans; /* RFC4898 tcpEStatsPerfOctetsRetrans */ + uint32_t tcpi_dsack_dups; /* RFC4898 tcpEStatsStackDSACKDups */ + uint32_t tcpi_reord_seen; /* reordering events seen */ + socklen_t length; /* Length of struct returned by kernel */ +}; + +#ifndef TCP_INFO +#define TCP_INFO 11 +#endif #endif /* GRPC_LINUX_ERRQUEUE */ /* Returns true if kernel is capable of supporting errqueue and timestamping. diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 92f163b58e9..02478f40657 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -593,6 +593,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error); #ifdef GRPC_LINUX_ERRQUEUE + static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, size_t sending_length, ssize_t* sent_length) { @@ -631,7 +632,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, gpr_mu_lock(&tcp->tb_mu); grpc_core::TracedBuffer::AddNewEntry( &tcp->tb_head, static_cast(tcp->bytes_counter + length), - tcp->outgoing_buffer_arg); + tcp->fd, tcp->outgoing_buffer_arg); gpr_mu_unlock(&tcp->tb_mu); tcp->outgoing_buffer_arg = nullptr; } @@ -648,6 +649,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, struct cmsghdr* cmsg) { auto next_cmsg = CMSG_NXTHDR(msg, cmsg); + cmsghdr* opt_stats = nullptr; if (next_cmsg == nullptr) { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_ERROR, "Received timestamp without extended error"); @@ -655,6 +657,19 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, return cmsg; } + /* Check if next_cmsg is an OPT_STATS msg */ + if (next_cmsg->cmsg_level == SOL_SOCKET && + next_cmsg->cmsg_type == SCM_TIMESTAMPING_OPT_STATS) { + opt_stats = next_cmsg; + next_cmsg = CMSG_NXTHDR(msg, opt_stats); + if (next_cmsg == nullptr) { + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_ERROR, "Received timestamp without extended error"); + } + return opt_stats; + } + } + if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) || !(next_cmsg->cmsg_type == IP_RECVERR || next_cmsg->cmsg_type == IPV6_RECVERR)) { @@ -676,7 +691,8 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, * to protect the traced buffer list. A lock free list might be better. Using * a simple mutex for now. */ gpr_mu_lock(&tcp->tb_mu); - grpc_core::TracedBuffer::ProcessTimestamp(&tcp->tb_head, serr, tss); + grpc_core::TracedBuffer::ProcessTimestamp(&tcp->tb_head, serr, opt_stats, + tss); gpr_mu_unlock(&tcp->tb_mu); return next_cmsg; } @@ -696,10 +712,11 @@ static void process_errors(grpc_tcp* tcp) { msg.msg_iovlen = 0; msg.msg_flags = 0; + // Allocate aligned space for cmsgs received along with a timestamps union { - char rbuf[1024 /*CMSG_SPACE(sizeof(scm_timestamping)) + - CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in))*/ - ]; + char rbuf[CMSG_SPACE(sizeof(scm_timestamping)) + + CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in)) + + CMSG_SPACE(16 * NLA_ALIGN(NLA_HDRLEN + sizeof(uint64_t)))]; struct cmsghdr align; } aligned_buf; memset(&aligned_buf, 0, sizeof(aligned_buf)); diff --git a/test/core/gprpp/BUILD b/test/core/gprpp/BUILD index fe3fea1df88..c8d47be5bb2 100644 --- a/test/core/gprpp/BUILD +++ b/test/core/gprpp/BUILD @@ -64,6 +64,19 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "optional_test", + srcs = ["optional_test.cc"], + external_deps = [ + "gtest", + ], + language = "C++", + deps = [ + "//:optional", + "//test/core/util:grpc_test_util", + ], +) + grpc_cc_test( name = "orphanable_test", srcs = ["orphanable_test.cc"], diff --git a/test/core/gprpp/optional_test.cc b/test/core/gprpp/optional_test.cc new file mode 100644 index 00000000000..ce6f8692fd5 --- /dev/null +++ b/test/core/gprpp/optional_test.cc @@ -0,0 +1,50 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/gprpp/optional.h" +#include +#include +#include "src/core/lib/gprpp/memory.h" +#include "test/core/util/test_config.h" + +namespace grpc_core { +namespace testing { + +namespace { +TEST(OptionalTest, BasicTest) { + grpc_core::Optional opt_val; + EXPECT_FALSE(opt_val.has_value()); + const int kTestVal = 123; + + opt_val.set(kTestVal); + EXPECT_TRUE(opt_val.has_value()); + EXPECT_EQ(opt_val.value(), kTestVal); + + opt_val.reset(); + EXPECT_EQ(opt_val.has_value(), false); +} +} // namespace + +} // namespace testing +} // namespace grpc_core + +int main(int argc, char** argv) { + grpc::testing::TestEnvironment env(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/core/iomgr/buffer_list_test.cc b/test/core/iomgr/buffer_list_test.cc index eca8f76e673..61a81e31c2b 100644 --- a/test/core/iomgr/buffer_list_test.cc +++ b/test/core/iomgr/buffer_list_test.cc @@ -48,7 +48,7 @@ static void TestShutdownFlushesList() { for (auto i = 0; i < NUM_ELEM; i++) { gpr_atm_rel_store(&verifier_called[i], static_cast(0)); grpc_core::TracedBuffer::AddNewEntry( - &list, i, static_cast(&verifier_called[i])); + &list, i, 0, static_cast(&verifier_called[i])); } grpc_core::TracedBuffer::Shutdown(&list, nullptr, GRPC_ERROR_NONE); GPR_ASSERT(list == nullptr); @@ -63,9 +63,9 @@ static void TestVerifierCalledOnAckVerifier(void* arg, grpc_error* error) { GPR_ASSERT(error == GRPC_ERROR_NONE); GPR_ASSERT(arg != nullptr); - GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME); - GPR_ASSERT(ts->acked_time.tv_sec == 123); - GPR_ASSERT(ts->acked_time.tv_nsec == 456); + GPR_ASSERT(ts->acked_time.time.clock_type == GPR_CLOCK_REALTIME); + GPR_ASSERT(ts->acked_time.time.tv_sec == 123); + GPR_ASSERT(ts->acked_time.time.tv_nsec == 456); gpr_atm* done = reinterpret_cast(arg); gpr_atm_rel_store(done, static_cast(1)); } @@ -84,8 +84,8 @@ static void TestVerifierCalledOnAck() { grpc_core::TracedBuffer* list = nullptr; gpr_atm verifier_called; gpr_atm_rel_store(&verifier_called, static_cast(0)); - grpc_core::TracedBuffer::AddNewEntry(&list, 213, &verifier_called); - grpc_core::TracedBuffer::ProcessTimestamp(&list, &serr, &tss); + grpc_core::TracedBuffer::AddNewEntry(&list, 213, 0, &verifier_called); + grpc_core::TracedBuffer::ProcessTimestamp(&list, &serr, nullptr, &tss); GPR_ASSERT(gpr_atm_acq_load(&verifier_called) == static_cast(1)); GPR_ASSERT(list == nullptr); grpc_core::TracedBuffer::Shutdown(&list, nullptr, GRPC_ERROR_NONE); diff --git a/test/core/iomgr/tcp_posix_test.cc b/test/core/iomgr/tcp_posix_test.cc index 80f17a914fa..5b601b1ae5f 100644 --- a/test/core/iomgr/tcp_posix_test.cc +++ b/test/core/iomgr/tcp_posix_test.cc @@ -384,9 +384,9 @@ void timestamps_verifier(void* arg, grpc_core::Timestamps* ts, grpc_error* error) { GPR_ASSERT(error == GRPC_ERROR_NONE); GPR_ASSERT(arg != nullptr); - GPR_ASSERT(ts->sendmsg_time.clock_type == GPR_CLOCK_REALTIME); - GPR_ASSERT(ts->scheduled_time.clock_type == GPR_CLOCK_REALTIME); - GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME); + GPR_ASSERT(ts->sendmsg_time.time.clock_type == GPR_CLOCK_REALTIME); + GPR_ASSERT(ts->scheduled_time.time.clock_type == GPR_CLOCK_REALTIME); + GPR_ASSERT(ts->acked_time.time.clock_type == GPR_CLOCK_REALTIME); gpr_atm* done_timestamps = (gpr_atm*)arg; gpr_atm_rel_store(done_timestamps, static_cast(1)); } diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 363df22aa15..8aec165a339 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1076,6 +1076,7 @@ src/core/lib/gprpp/inlined_vector.h \ src/core/lib/gprpp/manual_constructor.h \ src/core/lib/gprpp/memory.h \ src/core/lib/gprpp/mutex_lock.h \ +src/core/lib/gprpp/optional.h \ src/core/lib/gprpp/orphanable.h \ src/core/lib/gprpp/ref_counted.h \ src/core/lib/gprpp/ref_counted_ptr.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 51b9eda22b6..041c7382be5 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1167,6 +1167,7 @@ src/core/lib/gprpp/inlined_vector.h \ src/core/lib/gprpp/manual_constructor.h \ src/core/lib/gprpp/memory.h \ src/core/lib/gprpp/mutex_lock.h \ +src/core/lib/gprpp/optional.h \ src/core/lib/gprpp/orphanable.h \ src/core/lib/gprpp/ref_counted.h \ src/core/lib/gprpp/ref_counted_ptr.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index bb1c5168546..b5992c219d9 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -4184,6 +4184,24 @@ "third_party": false, "type": "target" }, + { + "deps": [ + "gpr", + "grpc", + "grpc++", + "grpc++_test", + "grpc_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c++", + "name": "optional_test", + "src": [ + "test/core/gprpp/optional_test.cc" + ], + "third_party": false, + "type": "target" + }, { "deps": [ "gpr", @@ -9556,6 +9574,7 @@ "src/core/lib/debug/stats_data.h", "src/core/lib/gprpp/debug_location.h", "src/core/lib/gprpp/inlined_vector.h", + "src/core/lib/gprpp/optional.h", "src/core/lib/gprpp/orphanable.h", "src/core/lib/gprpp/ref_counted.h", "src/core/lib/gprpp/ref_counted_ptr.h", @@ -9709,6 +9728,7 @@ "src/core/lib/debug/stats_data.h", "src/core/lib/gprpp/debug_location.h", "src/core/lib/gprpp/inlined_vector.h", + "src/core/lib/gprpp/optional.h", "src/core/lib/gprpp/orphanable.h", "src/core/lib/gprpp/ref_counted.h", "src/core/lib/gprpp/ref_counted_ptr.h", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 6c667f10c48..b41fef6b795 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -4887,6 +4887,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "optional_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,