Merge pull request #17757 from yashykt/optstats

Collect OPT_STATS along with tx timestamps
pull/17784/head
Yash Tibrewal 6 years ago committed by GitHub
commit ca90fd501d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      BUILD
  2. 40
      CMakeLists.txt
  3. 48
      Makefile
  4. 14
      build.yaml
  5. 2
      gRPC-C++.podspec
  6. 2
      gRPC-Core.podspec
  7. 1
      grpc.gemspec
  8. 1
      package.xml
  9. 47
      src/core/lib/gprpp/optional.h
  10. 206
      src/core/lib/iomgr/buffer_list.cc
  11. 74
      src/core/lib/iomgr/buffer_list.h
  12. 108
      src/core/lib/iomgr/internal_errqueue.h
  13. 27
      src/core/lib/iomgr/tcp_posix.cc
  14. 13
      test/core/gprpp/BUILD
  15. 50
      test/core/gprpp/optional_test.cc
  16. 12
      test/core/iomgr/buffer_list_test.cc
  17. 6
      test/core/iomgr/tcp_posix_test.cc
  18. 1
      tools/doxygen/Doxyfile.c++.internal
  19. 1
      tools/doxygen/Doxyfile.core.internal
  20. 20
      tools/run_tests/generated/sources_and_headers.json
  21. 24
      tools/run_tests/generated/tests.json

12
BUILD

@ -643,6 +643,17 @@ grpc_cc_library(
public_hdrs = ["src/core/lib/gprpp/debug_location.h"],
)
grpc_cc_library(
name = "optional",
language = "c++",
public_hdrs = [
"src/core/lib/gprpp/optional.h",
],
deps = [
"gpr_base",
],
)
grpc_cc_library(
name = "orphanable",
language = "c++",
@ -976,6 +987,7 @@ grpc_cc_library(
"grpc_codegen",
"grpc_trace",
"inlined_vector",
"optional",
"orphanable",
"ref_counted",
"ref_counted_ptr",

@ -652,6 +652,7 @@ add_dependencies(buildtests_cxx metrics_client)
add_dependencies(buildtests_cxx mock_test)
add_dependencies(buildtests_cxx nonblocking_test)
add_dependencies(buildtests_cxx noop-benchmark)
add_dependencies(buildtests_cxx optional_test)
add_dependencies(buildtests_cxx orphanable_test)
add_dependencies(buildtests_cxx proto_server_reflection_test)
add_dependencies(buildtests_cxx proto_utils_test)
@ -14445,6 +14446,45 @@ target_link_libraries(noop-benchmark
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(optional_test
test/core/gprpp/optional_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(optional_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
PRIVATE third_party/googletest/googletest/include
PRIVATE third_party/googletest/googletest
PRIVATE third_party/googletest/googlemock/include
PRIVATE third_party/googletest/googlemock
PRIVATE ${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(optional_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc++
grpc
gpr
${_gRPC_GFLAGS_LIBRARIES}
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)

@ -1213,6 +1213,7 @@ metrics_client: $(BINDIR)/$(CONFIG)/metrics_client
mock_test: $(BINDIR)/$(CONFIG)/mock_test
nonblocking_test: $(BINDIR)/$(CONFIG)/nonblocking_test
noop-benchmark: $(BINDIR)/$(CONFIG)/noop-benchmark
optional_test: $(BINDIR)/$(CONFIG)/optional_test
orphanable_test: $(BINDIR)/$(CONFIG)/orphanable_test
proto_server_reflection_test: $(BINDIR)/$(CONFIG)/proto_server_reflection_test
proto_utils_test: $(BINDIR)/$(CONFIG)/proto_utils_test
@ -1720,6 +1721,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/mock_test \
$(BINDIR)/$(CONFIG)/nonblocking_test \
$(BINDIR)/$(CONFIG)/noop-benchmark \
$(BINDIR)/$(CONFIG)/optional_test \
$(BINDIR)/$(CONFIG)/orphanable_test \
$(BINDIR)/$(CONFIG)/proto_server_reflection_test \
$(BINDIR)/$(CONFIG)/proto_utils_test \
@ -1906,6 +1908,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/mock_test \
$(BINDIR)/$(CONFIG)/nonblocking_test \
$(BINDIR)/$(CONFIG)/noop-benchmark \
$(BINDIR)/$(CONFIG)/optional_test \
$(BINDIR)/$(CONFIG)/orphanable_test \
$(BINDIR)/$(CONFIG)/proto_server_reflection_test \
$(BINDIR)/$(CONFIG)/proto_utils_test \
@ -2399,6 +2402,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/nonblocking_test || ( echo test nonblocking_test failed ; exit 1 )
$(E) "[RUN] Testing noop-benchmark"
$(Q) $(BINDIR)/$(CONFIG)/noop-benchmark || ( echo test noop-benchmark failed ; exit 1 )
$(E) "[RUN] Testing optional_test"
$(Q) $(BINDIR)/$(CONFIG)/optional_test || ( echo test optional_test failed ; exit 1 )
$(E) "[RUN] Testing orphanable_test"
$(Q) $(BINDIR)/$(CONFIG)/orphanable_test || ( echo test orphanable_test failed ; exit 1 )
$(E) "[RUN] Testing proto_server_reflection_test"
@ -19444,6 +19449,49 @@ endif
endif
OPTIONAL_TEST_SRC = \
test/core/gprpp/optional_test.cc \
OPTIONAL_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(OPTIONAL_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/optional_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
$(BINDIR)/$(CONFIG)/optional_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/optional_test: $(PROTOBUF_DEP) $(OPTIONAL_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(OPTIONAL_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/optional_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/core/gprpp/optional_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_optional_test: $(OPTIONAL_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(OPTIONAL_TEST_OBJS:.o=.dep)
endif
endif
ORPHANABLE_TEST_SRC = \
test/core/gprpp/orphanable_test.cc \

@ -430,6 +430,7 @@ filegroups:
- src/core/lib/debug/stats_data.h
- src/core/lib/gprpp/debug_location.h
- src/core/lib/gprpp/inlined_vector.h
- src/core/lib/gprpp/optional.h
- src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h
@ -5059,6 +5060,19 @@ targets:
deps:
- benchmark
defaults: benchmark
- name: optional_test
gtest: true
build: test
language: c++
src:
- test/core/gprpp/optional_test.cc
deps:
- grpc_test_util
- grpc++
- grpc
- gpr
uses:
- grpc++_test
- name: orphanable_test
gtest: true
build: test

@ -402,6 +402,7 @@ Pod::Spec.new do |s|
'src/core/lib/debug/stats_data.h',
'src/core/lib/gprpp/debug_location.h',
'src/core/lib/gprpp/inlined_vector.h',
'src/core/lib/gprpp/optional.h',
'src/core/lib/gprpp/orphanable.h',
'src/core/lib/gprpp/ref_counted.h',
'src/core/lib/gprpp/ref_counted_ptr.h',
@ -595,6 +596,7 @@ Pod::Spec.new do |s|
'src/core/lib/debug/stats_data.h',
'src/core/lib/gprpp/debug_location.h',
'src/core/lib/gprpp/inlined_vector.h',
'src/core/lib/gprpp/optional.h',
'src/core/lib/gprpp/orphanable.h',
'src/core/lib/gprpp/ref_counted.h',
'src/core/lib/gprpp/ref_counted_ptr.h',

@ -396,6 +396,7 @@ Pod::Spec.new do |s|
'src/core/lib/debug/stats_data.h',
'src/core/lib/gprpp/debug_location.h',
'src/core/lib/gprpp/inlined_vector.h',
'src/core/lib/gprpp/optional.h',
'src/core/lib/gprpp/orphanable.h',
'src/core/lib/gprpp/ref_counted.h',
'src/core/lib/gprpp/ref_counted_ptr.h',
@ -1024,6 +1025,7 @@ Pod::Spec.new do |s|
'src/core/lib/debug/stats_data.h',
'src/core/lib/gprpp/debug_location.h',
'src/core/lib/gprpp/inlined_vector.h',
'src/core/lib/gprpp/optional.h',
'src/core/lib/gprpp/orphanable.h',
'src/core/lib/gprpp/ref_counted.h',
'src/core/lib/gprpp/ref_counted_ptr.h',

@ -332,6 +332,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/debug/stats_data.h )
s.files += %w( src/core/lib/gprpp/debug_location.h )
s.files += %w( src/core/lib/gprpp/inlined_vector.h )
s.files += %w( src/core/lib/gprpp/optional.h )
s.files += %w( src/core/lib/gprpp/orphanable.h )
s.files += %w( src/core/lib/gprpp/ref_counted.h )
s.files += %w( src/core/lib/gprpp/ref_counted_ptr.h )

@ -337,6 +337,7 @@
<file baseinstalldir="/" name="src/core/lib/debug/stats_data.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/debug_location.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/inlined_vector.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/optional.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/orphanable.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/ref_counted.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/ref_counted_ptr.h" role="src" />

@ -0,0 +1,47 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_GPRPP_OPTIONAL_H
#define GRPC_CORE_LIB_GPRPP_OPTIONAL_H
namespace grpc_core {
/* A make-shift alternative for absl::Optional. This can be removed in favor of
* that once absl dependencies can be introduced. */
template <typename T>
class Optional {
public:
void set(const T& val) {
value_ = val;
set_ = true;
}
bool has_value() { return set_; }
void reset() { set_ = false; }
T value() { return value_; }
private:
T value_;
bool set_ = false;
};
} /* namespace grpc_core */
#endif /* GRPC_CORE_LIB_GPRPP_OPTIONAL_H */

@ -24,32 +24,13 @@
#include <grpc/support/log.h>
#ifdef GRPC_LINUX_ERRQUEUE
#include <netinet/in.h>
#include <string.h>
#include <time.h>
#include "src/core/lib/gprpp/memory.h"
namespace grpc_core {
void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no,
void* arg) {
GPR_DEBUG_ASSERT(head != nullptr);
TracedBuffer* new_elem = New<TracedBuffer>(seq_no, arg);
/* Store the current time as the sendmsg time. */
new_elem->ts_.sendmsg_time = gpr_now(GPR_CLOCK_REALTIME);
new_elem->ts_.scheduled_time = gpr_inf_past(GPR_CLOCK_REALTIME);
new_elem->ts_.sent_time = gpr_inf_past(GPR_CLOCK_REALTIME);
new_elem->ts_.acked_time = gpr_inf_past(GPR_CLOCK_REALTIME);
if (*head == nullptr) {
*head = new_elem;
return;
}
/* Append at the end. */
TracedBuffer* ptr = *head;
while (ptr->next_ != nullptr) {
ptr = ptr->next_;
}
ptr->next_ = new_elem;
}
namespace {
/** Fills gpr_timespec gts based on values from timespec ts */
void fill_gpr_from_timestamp(gpr_timespec* gts, const struct timespec* ts) {
@ -68,10 +49,180 @@ void default_timestamps_callback(void* arg, grpc_core::Timestamps* ts,
void (*timestamps_callback)(void*, grpc_core::Timestamps*,
grpc_error* shutdown_err) =
default_timestamps_callback;
/* Used to extract individual opt stats from cmsg, so as to avoid troubles with
* unaligned reads */
template <typename T>
T read_unaligned(const void* ptr) {
T val;
memcpy(&val, ptr, sizeof(val));
return val;
}
/* Extracts opt stats from the tcp_info struct \a info to \a metrics */
void extract_opt_stats_from_tcp_info(ConnectionMetrics* metrics,
const grpc_core::tcp_info* info) {
if (info == nullptr) {
return;
}
if (info->length > offsetof(grpc_core::tcp_info, tcpi_sndbuf_limited)) {
metrics->recurring_retrans.set(info->tcpi_retransmits);
metrics->is_delivery_rate_app_limited.set(
info->tcpi_delivery_rate_app_limited);
metrics->congestion_window.set(info->tcpi_snd_cwnd);
metrics->reordering.set(info->tcpi_reordering);
metrics->packet_retx.set(info->tcpi_total_retrans);
metrics->pacing_rate.set(info->tcpi_pacing_rate);
metrics->data_notsent.set(info->tcpi_notsent_bytes);
if (info->tcpi_min_rtt != UINT32_MAX) {
metrics->min_rtt.set(info->tcpi_min_rtt);
}
metrics->packet_sent.set(info->tcpi_data_segs_out);
metrics->delivery_rate.set(info->tcpi_delivery_rate);
metrics->busy_usec.set(info->tcpi_busy_time);
metrics->rwnd_limited_usec.set(info->tcpi_rwnd_limited);
metrics->sndbuf_limited_usec.set(info->tcpi_sndbuf_limited);
}
if (info->length > offsetof(grpc_core::tcp_info, tcpi_dsack_dups)) {
metrics->data_sent.set(info->tcpi_bytes_sent);
metrics->data_retx.set(info->tcpi_bytes_retrans);
metrics->packet_spurious_retx.set(info->tcpi_dsack_dups);
}
}
/** Extracts opt stats from the given control message \a opt_stats to the
* connection metrics \a metrics */
void extract_opt_stats_from_cmsg(ConnectionMetrics* metrics,
const cmsghdr* opt_stats) {
if (opt_stats == nullptr) {
return;
}
const auto* data = CMSG_DATA(opt_stats);
constexpr int64_t cmsg_hdr_len = CMSG_ALIGN(sizeof(struct cmsghdr));
const int64_t len = opt_stats->cmsg_len - cmsg_hdr_len;
int64_t offset = 0;
while (offset < len) {
const auto* attr = reinterpret_cast<const nlattr*>(data + offset);
const void* val = data + offset + NLA_HDRLEN;
switch (attr->nla_type) {
case TCP_NLA_BUSY: {
metrics->busy_usec.set(read_unaligned<uint64_t>(val));
break;
}
case TCP_NLA_RWND_LIMITED: {
metrics->rwnd_limited_usec.set(read_unaligned<uint64_t>(val));
break;
}
case TCP_NLA_SNDBUF_LIMITED: {
metrics->sndbuf_limited_usec.set(read_unaligned<uint64_t>(val));
break;
}
case TCP_NLA_PACING_RATE: {
metrics->pacing_rate.set(read_unaligned<uint64_t>(val));
break;
}
case TCP_NLA_DELIVERY_RATE: {
metrics->delivery_rate.set(read_unaligned<uint64_t>(val));
break;
}
case TCP_NLA_DELIVERY_RATE_APP_LMT: {
metrics->is_delivery_rate_app_limited.set(read_unaligned<uint8_t>(val));
break;
}
case TCP_NLA_SND_CWND: {
metrics->congestion_window.set(read_unaligned<uint32_t>(val));
break;
}
case TCP_NLA_MIN_RTT: {
metrics->min_rtt.set(read_unaligned<uint32_t>(val));
break;
}
case TCP_NLA_SRTT: {
metrics->srtt.set(read_unaligned<uint32_t>(val));
break;
}
case TCP_NLA_RECUR_RETRANS: {
metrics->recurring_retrans.set(read_unaligned<uint8_t>(val));
break;
}
case TCP_NLA_BYTES_SENT: {
metrics->data_sent.set(read_unaligned<uint64_t>(val));
break;
}
case TCP_NLA_DATA_SEGS_OUT: {
metrics->packet_sent.set(read_unaligned<uint64_t>(val));
break;
}
case TCP_NLA_TOTAL_RETRANS: {
metrics->packet_retx.set(read_unaligned<uint64_t>(val));
break;
}
case TCP_NLA_DELIVERED: {
metrics->packet_delivered.set(read_unaligned<uint32_t>(val));
break;
}
case TCP_NLA_DELIVERED_CE: {
metrics->packet_delivered_ce.set(read_unaligned<uint32_t>(val));
break;
}
case TCP_NLA_BYTES_RETRANS: {
metrics->data_retx.set(read_unaligned<uint64_t>(val));
break;
}
case TCP_NLA_DSACK_DUPS: {
metrics->packet_spurious_retx.set(read_unaligned<uint32_t>(val));
break;
}
case TCP_NLA_REORDERING: {
metrics->reordering.set(read_unaligned<uint32_t>(val));
break;
}
case TCP_NLA_SND_SSTHRESH: {
metrics->snd_ssthresh.set(read_unaligned<uint32_t>(val));
break;
}
}
offset += NLA_ALIGN(attr->nla_len);
}
}
static int get_socket_tcp_info(grpc_core::tcp_info* info, int fd) {
info->length = sizeof(*info) - sizeof(socklen_t);
memset(info, 0, sizeof(*info));
return getsockopt(fd, IPPROTO_TCP, TCP_INFO, info, &(info->length));
}
} /* namespace */
void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no, int fd,
void* arg) {
GPR_DEBUG_ASSERT(head != nullptr);
TracedBuffer* new_elem = New<TracedBuffer>(seq_no, arg);
/* Store the current time as the sendmsg time. */
new_elem->ts_.sendmsg_time.time = gpr_now(GPR_CLOCK_REALTIME);
new_elem->ts_.scheduled_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
new_elem->ts_.sent_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
new_elem->ts_.acked_time.time = gpr_inf_past(GPR_CLOCK_REALTIME);
if (get_socket_tcp_info(&new_elem->ts_.info, fd) == 0) {
extract_opt_stats_from_tcp_info(&new_elem->ts_.sendmsg_time.metrics,
&new_elem->ts_.info);
}
if (*head == nullptr) {
*head = new_elem;
return;
}
/* Append at the end. */
TracedBuffer* ptr = *head;
while (ptr->next_ != nullptr) {
ptr = ptr->next_;
}
ptr->next_ = new_elem;
}
void TracedBuffer::ProcessTimestamp(TracedBuffer** head,
struct sock_extended_err* serr,
struct cmsghdr* opt_stats,
struct scm_timestamping* tss) {
GPR_DEBUG_ASSERT(head != nullptr);
TracedBuffer* elem = *head;
@ -82,15 +233,22 @@ void TracedBuffer::ProcessTimestamp(TracedBuffer** head,
if (serr->ee_data >= elem->seq_no_) {
switch (serr->ee_info) {
case SCM_TSTAMP_SCHED:
fill_gpr_from_timestamp(&(elem->ts_.scheduled_time), &(tss->ts[0]));
fill_gpr_from_timestamp(&(elem->ts_.scheduled_time.time),
&(tss->ts[0]));
extract_opt_stats_from_cmsg(&(elem->ts_.scheduled_time.metrics),
opt_stats);
elem = elem->next_;
break;
case SCM_TSTAMP_SND:
fill_gpr_from_timestamp(&(elem->ts_.sent_time), &(tss->ts[0]));
fill_gpr_from_timestamp(&(elem->ts_.sent_time.time), &(tss->ts[0]));
extract_opt_stats_from_cmsg(&(elem->ts_.sent_time.metrics),
opt_stats);
elem = elem->next_;
break;
case SCM_TSTAMP_ACK:
fill_gpr_from_timestamp(&(elem->ts_.acked_time), &(tss->ts[0]));
fill_gpr_from_timestamp(&(elem->ts_.acked_time.time), &(tss->ts[0]));
extract_opt_stats_from_cmsg(&(elem->ts_.acked_time.metrics),
opt_stats);
/* Got all timestamps. Do the callback and free this TracedBuffer.
* The thing below can be passed by value if we don't want the
* restriction on the lifetime. */

@ -26,19 +26,78 @@
#include <grpc/support/time.h>
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/optional.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/internal_errqueue.h"
namespace grpc_core {
struct ConnectionMetrics {
/* Delivery rate in Bytes/s. */
Optional<uint64_t> delivery_rate;
/* If the delivery rate is limited by the application, this is set to true. */
Optional<bool> is_delivery_rate_app_limited;
/* Total packets retransmitted. */
Optional<uint32_t> packet_retx;
/* Total packets retransmitted spuriously. This metric is smaller than or
equal to packet_retx. */
Optional<uint32_t> packet_spurious_retx;
/* Total packets sent. */
Optional<uint32_t> packet_sent;
/* Total packets delivered. */
Optional<uint32_t> packet_delivered;
/* Total packets delivered with ECE marked. This metric is smaller than or
equal to packet_delivered. */
Optional<uint32_t> packet_delivered_ce;
/* Total bytes lost so far. */
Optional<uint64_t> data_retx;
/* Total bytes sent so far. */
Optional<uint64_t> data_sent;
/* Total bytes in write queue but not sent. */
Optional<uint64_t> data_notsent;
/* Pacing rate of the connection in Bps */
Optional<uint64_t> pacing_rate;
/* Minimum RTT observed in usec. */
Optional<uint32_t> min_rtt;
/* Smoothed RTT in usec */
Optional<uint32_t> srtt;
/* Send congestion window. */
Optional<uint32_t> congestion_window;
/* Slow start threshold in packets. */
Optional<uint32_t> snd_ssthresh;
/* Maximum degree of reordering (i.e., maximum number of packets reodered)
on the connection. */
Optional<uint32_t> reordering;
/* Represents the number of recurring retransmissions of the first sequence
that is not acknowledged yet. */
Optional<uint8_t> recurring_retrans;
/* The cumulative time (in usec) that the transport protocol was busy
sending data. */
Optional<uint64_t> busy_usec;
/* The cumulative time (in usec) that the transport protocol was limited by
the receive window size. */
Optional<uint64_t> rwnd_limited_usec;
/* The cumulative time (in usec) that the transport protocol was limited by
the send buffer size. */
Optional<uint64_t> sndbuf_limited_usec;
};
struct Timestamp {
gpr_timespec time;
ConnectionMetrics metrics; /* Metrics collected with this timestamp */
};
struct Timestamps {
/* TODO(yashykt): This would also need to store OPTSTAT once support is added
*/
gpr_timespec sendmsg_time;
gpr_timespec scheduled_time;
gpr_timespec sent_time;
gpr_timespec acked_time;
Timestamp sendmsg_time;
Timestamp scheduled_time;
Timestamp sent_time;
Timestamp acked_time;
uint32_t byte_offset; /* byte offset relative to the start of the RPC */
#ifdef GRPC_LINUX_ERRQUEUE
grpc_core::tcp_info info; /* tcp_info collected on sendmsg */
#endif /* GRPC_LINUX_ERRQUEUE */
};
/** TracedBuffer is a class to keep track of timestamps for a specific buffer in
@ -58,13 +117,14 @@ class TracedBuffer {
/** Add a new entry in the TracedBuffer list pointed to by head. Also saves
* sendmsg_time with the current timestamp. */
static void AddNewEntry(grpc_core::TracedBuffer** head, uint32_t seq_no,
void* arg);
int fd, void* arg);
/** Processes a received timestamp based on sock_extended_err and
* scm_timestamping structures. It will invoke the timestamps callback if the
* timestamp type is SCM_TSTAMP_ACK. */
static void ProcessTimestamp(grpc_core::TracedBuffer** head,
struct sock_extended_err* serr,
struct cmsghdr* opt_stats,
struct scm_timestamping* tss);
/** Cleans the list by calling the callback for each traced buffer in the list

@ -37,6 +37,7 @@
#ifdef GRPC_LINUX_ERRQUEUE
#include <linux/errqueue.h>
#include <linux/net_tstamp.h>
#include <linux/netlink.h>
#include <sys/socket.h>
#endif /* GRPC_LINUX_ERRQUEUE */
@ -56,6 +57,12 @@ constexpr int SCM_TSTAMP_SND = 0;
constexpr int SCM_TSTAMP_SCHED = 1;
/* The timestamp type for when data acknowledged by peer. */
constexpr int SCM_TSTAMP_ACK = 2;
/* Control message type containing OPT_STATS */
#ifndef SCM_TIMESTAMPING_OPT_STATS
#define SCM_TIMESTAMPING_OPT_STATS 54
#endif
/* Redefine required constants from <linux/net_tstamp.h> */
constexpr uint32_t SOF_TIMESTAMPING_TX_SOFTWARE = 1u << 1;
constexpr uint32_t SOF_TIMESTAMPING_SOFTWARE = 1u << 4;
@ -63,13 +70,108 @@ constexpr uint32_t SOF_TIMESTAMPING_OPT_ID = 1u << 7;
constexpr uint32_t SOF_TIMESTAMPING_TX_SCHED = 1u << 8;
constexpr uint32_t SOF_TIMESTAMPING_TX_ACK = 1u << 9;
constexpr uint32_t SOF_TIMESTAMPING_OPT_TSONLY = 1u << 11;
constexpr uint32_t SOF_TIMESTAMPING_OPT_STATS = 1u << 12;
constexpr uint32_t kTimestampingSocketOptions = SOF_TIMESTAMPING_SOFTWARE |
SOF_TIMESTAMPING_OPT_ID |
SOF_TIMESTAMPING_OPT_TSONLY;
constexpr uint32_t kTimestampingSocketOptions =
SOF_TIMESTAMPING_SOFTWARE | SOF_TIMESTAMPING_OPT_ID |
SOF_TIMESTAMPING_OPT_TSONLY | SOF_TIMESTAMPING_OPT_STATS;
constexpr uint32_t kTimestampingRecordingOptions =
SOF_TIMESTAMPING_TX_SCHED | SOF_TIMESTAMPING_TX_SOFTWARE |
SOF_TIMESTAMPING_TX_ACK;
/* Netlink attribute types used for TCP opt stats. */
enum TCPOptStats {
TCP_NLA_PAD,
TCP_NLA_BUSY, /* Time (usec) busy sending data. */
TCP_NLA_RWND_LIMITED, /* Time (usec) limited by receive window. */
TCP_NLA_SNDBUF_LIMITED, /* Time (usec) limited by send buffer. */
TCP_NLA_DATA_SEGS_OUT, /* Data pkts sent including retransmission. */
TCP_NLA_TOTAL_RETRANS, /* Data pkts retransmitted. */
TCP_NLA_PACING_RATE, /* Pacing rate in Bps. */
TCP_NLA_DELIVERY_RATE, /* Delivery rate in Bps. */
TCP_NLA_SND_CWND, /* Sending congestion window. */
TCP_NLA_REORDERING, /* Reordering metric. */
TCP_NLA_MIN_RTT, /* minimum RTT. */
TCP_NLA_RECUR_RETRANS, /* Recurring retransmits for the current pkt. */
TCP_NLA_DELIVERY_RATE_APP_LMT, /* Delivery rate application limited? */
TCP_NLA_SNDQ_SIZE, /* Data (bytes) pending in send queue */
TCP_NLA_CA_STATE, /* ca_state of socket */
TCP_NLA_SND_SSTHRESH, /* Slow start size threshold */
TCP_NLA_DELIVERED, /* Data pkts delivered incl. out-of-order */
TCP_NLA_DELIVERED_CE, /* Like above but only ones w/ CE marks */
TCP_NLA_BYTES_SENT, /* Data bytes sent including retransmission */
TCP_NLA_BYTES_RETRANS, /* Data bytes retransmitted */
TCP_NLA_DSACK_DUPS, /* DSACK blocks received */
TCP_NLA_REORD_SEEN, /* reordering events seen */
TCP_NLA_SRTT, /* smoothed RTT in usecs */
};
/* tcp_info from from linux/tcp.h */
struct tcp_info {
uint8_t tcpi_state;
uint8_t tcpi_ca_state;
uint8_t tcpi_retransmits;
uint8_t tcpi_probes;
uint8_t tcpi_backoff;
uint8_t tcpi_options;
uint8_t tcpi_snd_wscale : 4, tcpi_rcv_wscale : 4;
uint8_t tcpi_delivery_rate_app_limited : 1;
uint32_t tcpi_rto;
uint32_t tcpi_ato;
uint32_t tcpi_snd_mss;
uint32_t tcpi_rcv_mss;
uint32_t tcpi_unacked;
uint32_t tcpi_sacked;
uint32_t tcpi_lost;
uint32_t tcpi_retrans;
uint32_t tcpi_fackets;
/* Times. */
uint32_t tcpi_last_data_sent;
uint32_t tcpi_last_ack_sent; /* Not remembered, sorry. */
uint32_t tcpi_last_data_recv;
uint32_t tcpi_last_ack_recv;
/* Metrics. */
uint32_t tcpi_pmtu;
uint32_t tcpi_rcv_ssthresh;
uint32_t tcpi_rtt;
uint32_t tcpi_rttvar;
uint32_t tcpi_snd_ssthresh;
uint32_t tcpi_snd_cwnd;
uint32_t tcpi_advmss;
uint32_t tcpi_reordering;
uint32_t tcpi_rcv_rtt;
uint32_t tcpi_rcv_space;
uint32_t tcpi_total_retrans;
uint64_t tcpi_pacing_rate;
uint64_t tcpi_max_pacing_rate;
uint64_t tcpi_bytes_acked; /* RFC4898 tcpEStatsAppHCThruOctetsAcked */
uint64_t tcpi_bytes_received; /* RFC4898 tcpEStatsAppHCThruOctetsReceived */
uint32_t tcpi_segs_out; /* RFC4898 tcpEStatsPerfSegsOut */
uint32_t tcpi_segs_in; /* RFC4898 tcpEStatsPerfSegsIn */
uint32_t tcpi_notsent_bytes;
uint32_t tcpi_min_rtt;
uint32_t tcpi_data_segs_in; /* RFC4898 tcpEStatsDataSegsIn */
uint32_t tcpi_data_segs_out; /* RFC4898 tcpEStatsDataSegsOut */
uint64_t tcpi_delivery_rate;
uint64_t tcpi_busy_time; /* Time (usec) busy sending data */
uint64_t tcpi_rwnd_limited; /* Time (usec) limited by receive window */
uint64_t tcpi_sndbuf_limited; /* Time (usec) limited by send buffer */
uint32_t tcpi_delivered;
uint32_t tcpi_delivered_ce;
uint64_t tcpi_bytes_sent; /* RFC4898 tcpEStatsPerfHCDataOctetsOut */
uint64_t tcpi_bytes_retrans; /* RFC4898 tcpEStatsPerfOctetsRetrans */
uint32_t tcpi_dsack_dups; /* RFC4898 tcpEStatsStackDSACKDups */
uint32_t tcpi_reord_seen; /* reordering events seen */
socklen_t length; /* Length of struct returned by kernel */
};
#ifndef TCP_INFO
#define TCP_INFO 11
#endif
#endif /* GRPC_LINUX_ERRQUEUE */
/* Returns true if kernel is capable of supporting errqueue and timestamping.

@ -593,6 +593,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error);
#ifdef GRPC_LINUX_ERRQUEUE
static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
size_t sending_length,
ssize_t* sent_length) {
@ -631,7 +632,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
gpr_mu_lock(&tcp->tb_mu);
grpc_core::TracedBuffer::AddNewEntry(
&tcp->tb_head, static_cast<uint32_t>(tcp->bytes_counter + length),
tcp->outgoing_buffer_arg);
tcp->fd, tcp->outgoing_buffer_arg);
gpr_mu_unlock(&tcp->tb_mu);
tcp->outgoing_buffer_arg = nullptr;
}
@ -648,6 +649,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
struct cmsghdr* cmsg) {
auto next_cmsg = CMSG_NXTHDR(msg, cmsg);
cmsghdr* opt_stats = nullptr;
if (next_cmsg == nullptr) {
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_ERROR, "Received timestamp without extended error");
@ -655,6 +657,19 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
return cmsg;
}
/* Check if next_cmsg is an OPT_STATS msg */
if (next_cmsg->cmsg_level == SOL_SOCKET &&
next_cmsg->cmsg_type == SCM_TIMESTAMPING_OPT_STATS) {
opt_stats = next_cmsg;
next_cmsg = CMSG_NXTHDR(msg, opt_stats);
if (next_cmsg == nullptr) {
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_ERROR, "Received timestamp without extended error");
}
return opt_stats;
}
}
if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) ||
!(next_cmsg->cmsg_type == IP_RECVERR ||
next_cmsg->cmsg_type == IPV6_RECVERR)) {
@ -676,7 +691,8 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
* to protect the traced buffer list. A lock free list might be better. Using
* a simple mutex for now. */
gpr_mu_lock(&tcp->tb_mu);
grpc_core::TracedBuffer::ProcessTimestamp(&tcp->tb_head, serr, tss);
grpc_core::TracedBuffer::ProcessTimestamp(&tcp->tb_head, serr, opt_stats,
tss);
gpr_mu_unlock(&tcp->tb_mu);
return next_cmsg;
}
@ -696,10 +712,11 @@ static void process_errors(grpc_tcp* tcp) {
msg.msg_iovlen = 0;
msg.msg_flags = 0;
// Allocate aligned space for cmsgs received along with a timestamps
union {
char rbuf[1024 /*CMSG_SPACE(sizeof(scm_timestamping)) +
CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in))*/
];
char rbuf[CMSG_SPACE(sizeof(scm_timestamping)) +
CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in)) +
CMSG_SPACE(16 * NLA_ALIGN(NLA_HDRLEN + sizeof(uint64_t)))];
struct cmsghdr align;
} aligned_buf;
memset(&aligned_buf, 0, sizeof(aligned_buf));

@ -64,6 +64,19 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "optional_test",
srcs = ["optional_test.cc"],
external_deps = [
"gtest",
],
language = "C++",
deps = [
"//:optional",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "orphanable_test",
srcs = ["orphanable_test.cc"],

@ -0,0 +1,50 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/core/lib/gprpp/optional.h"
#include <grpc/support/log.h>
#include <gtest/gtest.h>
#include "src/core/lib/gprpp/memory.h"
#include "test/core/util/test_config.h"
namespace grpc_core {
namespace testing {
namespace {
TEST(OptionalTest, BasicTest) {
grpc_core::Optional<int> opt_val;
EXPECT_FALSE(opt_val.has_value());
const int kTestVal = 123;
opt_val.set(kTestVal);
EXPECT_TRUE(opt_val.has_value());
EXPECT_EQ(opt_val.value(), kTestVal);
opt_val.reset();
EXPECT_EQ(opt_val.has_value(), false);
}
} // namespace
} // namespace testing
} // namespace grpc_core
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -48,7 +48,7 @@ static void TestShutdownFlushesList() {
for (auto i = 0; i < NUM_ELEM; i++) {
gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0));
grpc_core::TracedBuffer::AddNewEntry(
&list, i, static_cast<void*>(&verifier_called[i]));
&list, i, 0, static_cast<void*>(&verifier_called[i]));
}
grpc_core::TracedBuffer::Shutdown(&list, nullptr, GRPC_ERROR_NONE);
GPR_ASSERT(list == nullptr);
@ -63,9 +63,9 @@ static void TestVerifierCalledOnAckVerifier(void* arg,
grpc_error* error) {
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(arg != nullptr);
GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME);
GPR_ASSERT(ts->acked_time.tv_sec == 123);
GPR_ASSERT(ts->acked_time.tv_nsec == 456);
GPR_ASSERT(ts->acked_time.time.clock_type == GPR_CLOCK_REALTIME);
GPR_ASSERT(ts->acked_time.time.tv_sec == 123);
GPR_ASSERT(ts->acked_time.time.tv_nsec == 456);
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
}
@ -84,8 +84,8 @@ static void TestVerifierCalledOnAck() {
grpc_core::TracedBuffer* list = nullptr;
gpr_atm verifier_called;
gpr_atm_rel_store(&verifier_called, static_cast<gpr_atm>(0));
grpc_core::TracedBuffer::AddNewEntry(&list, 213, &verifier_called);
grpc_core::TracedBuffer::ProcessTimestamp(&list, &serr, &tss);
grpc_core::TracedBuffer::AddNewEntry(&list, 213, 0, &verifier_called);
grpc_core::TracedBuffer::ProcessTimestamp(&list, &serr, nullptr, &tss);
GPR_ASSERT(gpr_atm_acq_load(&verifier_called) == static_cast<gpr_atm>(1));
GPR_ASSERT(list == nullptr);
grpc_core::TracedBuffer::Shutdown(&list, nullptr, GRPC_ERROR_NONE);

@ -384,9 +384,9 @@ void timestamps_verifier(void* arg, grpc_core::Timestamps* ts,
grpc_error* error) {
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(arg != nullptr);
GPR_ASSERT(ts->sendmsg_time.clock_type == GPR_CLOCK_REALTIME);
GPR_ASSERT(ts->scheduled_time.clock_type == GPR_CLOCK_REALTIME);
GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME);
GPR_ASSERT(ts->sendmsg_time.time.clock_type == GPR_CLOCK_REALTIME);
GPR_ASSERT(ts->scheduled_time.time.clock_type == GPR_CLOCK_REALTIME);
GPR_ASSERT(ts->acked_time.time.clock_type == GPR_CLOCK_REALTIME);
gpr_atm* done_timestamps = (gpr_atm*)arg;
gpr_atm_rel_store(done_timestamps, static_cast<gpr_atm>(1));
}

@ -1076,6 +1076,7 @@ src/core/lib/gprpp/inlined_vector.h \
src/core/lib/gprpp/manual_constructor.h \
src/core/lib/gprpp/memory.h \
src/core/lib/gprpp/mutex_lock.h \
src/core/lib/gprpp/optional.h \
src/core/lib/gprpp/orphanable.h \
src/core/lib/gprpp/ref_counted.h \
src/core/lib/gprpp/ref_counted_ptr.h \

@ -1167,6 +1167,7 @@ src/core/lib/gprpp/inlined_vector.h \
src/core/lib/gprpp/manual_constructor.h \
src/core/lib/gprpp/memory.h \
src/core/lib/gprpp/mutex_lock.h \
src/core/lib/gprpp/optional.h \
src/core/lib/gprpp/orphanable.h \
src/core/lib/gprpp/ref_counted.h \
src/core/lib/gprpp/ref_counted_ptr.h \

@ -4184,6 +4184,24 @@
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
"grpc",
"grpc++",
"grpc++_test",
"grpc_test_util"
],
"headers": [],
"is_filegroup": false,
"language": "c++",
"name": "optional_test",
"src": [
"test/core/gprpp/optional_test.cc"
],
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
@ -9556,6 +9574,7 @@
"src/core/lib/debug/stats_data.h",
"src/core/lib/gprpp/debug_location.h",
"src/core/lib/gprpp/inlined_vector.h",
"src/core/lib/gprpp/optional.h",
"src/core/lib/gprpp/orphanable.h",
"src/core/lib/gprpp/ref_counted.h",
"src/core/lib/gprpp/ref_counted_ptr.h",
@ -9709,6 +9728,7 @@
"src/core/lib/debug/stats_data.h",
"src/core/lib/gprpp/debug_location.h",
"src/core/lib/gprpp/inlined_vector.h",
"src/core/lib/gprpp/optional.h",
"src/core/lib/gprpp/orphanable.h",
"src/core/lib/gprpp/ref_counted.h",
"src/core/lib/gprpp/ref_counted_ptr.h",

@ -4887,6 +4887,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "optional_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save