[chttp2] Continue refactoring towards promises (#34437)

Isolate ping callback tracking to its own file.
Also takes the opportunity to simplify keepalive code by applying the
ping timeout to all pings.
Adds an experiment to allow multiple pings outstanding too (this was
originally an accidental behavior change of the work, but one that I
think may be useful going forward).

---------

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/34564/head
Craig Tiller 1 year ago committed by GitHub
parent e76ac4ab07
commit a17f08b49d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 36
      CMakeLists.txt
  3. 2
      Makefile
  4. 2
      Package.swift
  5. 3
      bazel/experiments.bzl
  6. 16
      build_autogenerated.yaml
  7. 1
      config.m4
  8. 1
      config.w32
  9. 2
      gRPC-C++.podspec
  10. 3
      gRPC-Core.podspec
  11. 2
      grpc.gemspec
  12. 2
      grpc.gyp
  13. 2
      package.xml
  14. 24
      src/core/BUILD
  15. 226
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  16. 22
      src/core/ext/transport/chttp2/transport/internal.h
  17. 99
      src/core/ext/transport/chttp2/transport/ping_callbacks.cc
  18. 71
      src/core/ext/transport/chttp2/transport/ping_callbacks.h
  19. 30
      src/core/ext/transport/chttp2/transport/ping_rate_policy.cc
  20. 7
      src/core/ext/transport/chttp2/transport/ping_rate_policy.h
  21. 46
      src/core/ext/transport/chttp2/transport/writing.cc
  22. 15
      src/core/lib/experiments/experiments.cc
  23. 8
      src/core/lib/experiments/experiments.h
  24. 6
      src/core/lib/experiments/experiments.yaml
  25. 1
      src/python/grpcio/grpc_core_dependencies.py
  26. 2
      test/core/end2end/tests/keepalive_timeout.cc
  27. 16
      test/core/transport/chttp2/BUILD
  28. 66
      test/core/transport/chttp2/graceful_shutdown_test.cc
  29. 568
      test/core/transport/chttp2/ping_callbacks_test.cc
  30. 2
      test/core/transport/chttp2/ping_configuration_test.cc
  31. 27
      test/core/transport/chttp2/ping_rate_policy_test.cc
  32. 2
      tools/doxygen/Doxyfile.c++.internal
  33. 2
      tools/doxygen/Doxyfile.core.internal
  34. 24
      tools/run_tests/generated/tests.json

@ -4027,6 +4027,7 @@ grpc_cc_library(
"//src/core:match",
"//src/core:memory_quota",
"//src/core:ping_abuse_policy",
"//src/core:ping_callbacks",
"//src/core:ping_rate_policy",
"//src/core:poll",
"//src/core:ref_counted",

36
CMakeLists.txt generated

@ -1175,6 +1175,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx pick_first_test)
add_dependencies(buildtests_cxx pid_controller_test)
add_dependencies(buildtests_cxx ping_abuse_policy_test)
add_dependencies(buildtests_cxx ping_callbacks_test)
add_dependencies(buildtests_cxx ping_configuration_test)
add_dependencies(buildtests_cxx ping_pong_streaming_test)
add_dependencies(buildtests_cxx ping_rate_policy_test)
@ -1832,6 +1833,7 @@ add_library(grpc
src/core/ext/transport/chttp2/transport/huffsyms.cc
src/core/ext/transport/chttp2/transport/parsing.cc
src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc
src/core/ext/transport/chttp2/transport/ping_callbacks.cc
src/core/ext/transport/chttp2/transport/ping_rate_policy.cc
src/core/ext/transport/chttp2/transport/stream_lists.cc
src/core/ext/transport/chttp2/transport/varint.cc
@ -2868,6 +2870,7 @@ add_library(grpc_unsecure
src/core/ext/transport/chttp2/transport/huffsyms.cc
src/core/ext/transport/chttp2/transport/parsing.cc
src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc
src/core/ext/transport/chttp2/transport/ping_callbacks.cc
src/core/ext/transport/chttp2/transport/ping_rate_policy.cc
src/core/ext/transport/chttp2/transport/stream_lists.cc
src/core/ext/transport/chttp2/transport/varint.cc
@ -17287,6 +17290,39 @@ target_link_libraries(ping_abuse_policy_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(ping_callbacks_test
test/core/transport/chttp2/ping_callbacks_test.cc
)
target_compile_features(ping_callbacks_test PUBLIC cxx_std_14)
target_include_directories(ping_callbacks_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(ping_callbacks_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc
)
endif()
if(gRPC_BUILD_TESTS)

2
Makefile generated

@ -1059,6 +1059,7 @@ LIBGRPC_SRC = \
src/core/ext/transport/chttp2/transport/huffsyms.cc \
src/core/ext/transport/chttp2/transport/parsing.cc \
src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc \
src/core/ext/transport/chttp2/transport/ping_callbacks.cc \
src/core/ext/transport/chttp2/transport/ping_rate_policy.cc \
src/core/ext/transport/chttp2/transport/stream_lists.cc \
src/core/ext/transport/chttp2/transport/varint.cc \
@ -1947,6 +1948,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/transport/chttp2/transport/huffsyms.cc \
src/core/ext/transport/chttp2/transport/parsing.cc \
src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc \
src/core/ext/transport/chttp2/transport/ping_callbacks.cc \
src/core/ext/transport/chttp2/transport/ping_rate_policy.cc \
src/core/ext/transport/chttp2/transport/stream_lists.cc \
src/core/ext/transport/chttp2/transport/varint.cc \

2
Package.swift generated

@ -314,6 +314,8 @@ let package = Package(
"src/core/ext/transport/chttp2/transport/parsing.cc",
"src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc",
"src/core/ext/transport/chttp2/transport/ping_abuse_policy.h",
"src/core/ext/transport/chttp2/transport/ping_callbacks.cc",
"src/core/ext/transport/chttp2/transport/ping_callbacks.h",
"src/core/ext/transport/chttp2/transport/ping_rate_policy.cc",
"src/core/ext/transport/chttp2/transport/ping_rate_policy.h",
"src/core/ext/transport/chttp2/transport/stream_lists.cc",

@ -39,6 +39,7 @@ EXPERIMENTS = {
"event_engine_listener",
],
"flow_control_test": [
"multiping",
"peer_state_based_framing",
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
@ -108,6 +109,7 @@ EXPERIMENTS = {
"event_engine_listener",
],
"flow_control_test": [
"multiping",
"peer_state_based_framing",
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
@ -184,6 +186,7 @@ EXPERIMENTS = {
"event_engine_listener",
],
"flow_control_test": [
"multiping",
"peer_state_based_framing",
"tcp_frame_size_tuning",
"tcp_rcv_lowat",

@ -310,6 +310,7 @@ libs:
- src/core/ext/transport/chttp2/transport/internal.h
- src/core/ext/transport/chttp2/transport/legacy_frame.h
- src/core/ext/transport/chttp2/transport/ping_abuse_policy.h
- src/core/ext/transport/chttp2/transport/ping_callbacks.h
- src/core/ext/transport/chttp2/transport/ping_rate_policy.h
- src/core/ext/transport/chttp2/transport/varint.h
- src/core/ext/transport/inproc/inproc_transport.h
@ -1135,6 +1136,7 @@ libs:
- src/core/ext/transport/chttp2/transport/huffsyms.cc
- src/core/ext/transport/chttp2/transport/parsing.cc
- src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc
- src/core/ext/transport/chttp2/transport/ping_callbacks.cc
- src/core/ext/transport/chttp2/transport/ping_rate_policy.cc
- src/core/ext/transport/chttp2/transport/stream_lists.cc
- src/core/ext/transport/chttp2/transport/varint.cc
@ -2048,6 +2050,7 @@ libs:
- src/core/ext/transport/chttp2/transport/internal.h
- src/core/ext/transport/chttp2/transport/legacy_frame.h
- src/core/ext/transport/chttp2/transport/ping_abuse_policy.h
- src/core/ext/transport/chttp2/transport/ping_callbacks.h
- src/core/ext/transport/chttp2/transport/ping_rate_policy.h
- src/core/ext/transport/chttp2/transport/varint.h
- src/core/ext/transport/inproc/inproc_transport.h
@ -2485,6 +2488,7 @@ libs:
- src/core/ext/transport/chttp2/transport/huffsyms.cc
- src/core/ext/transport/chttp2/transport/parsing.cc
- src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc
- src/core/ext/transport/chttp2/transport/ping_callbacks.cc
- src/core/ext/transport/chttp2/transport/ping_rate_policy.cc
- src/core/ext/transport/chttp2/transport/stream_lists.cc
- src/core/ext/transport/chttp2/transport/varint.cc
@ -11687,6 +11691,18 @@ targets:
- gtest
- grpc_test_util
uses_polling: false
- name: ping_callbacks_test
gtest: true
build: test
language: c++
headers:
- test/core/event_engine/mock_event_engine.h
src:
- test/core/transport/chttp2/ping_callbacks_test.cc
deps:
- gtest
- grpc
uses_polling: false
- name: ping_configuration_test
gtest: true
build: test

1
config.m4 generated

@ -146,6 +146,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/transport/chttp2/transport/huffsyms.cc \
src/core/ext/transport/chttp2/transport/parsing.cc \
src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc \
src/core/ext/transport/chttp2/transport/ping_callbacks.cc \
src/core/ext/transport/chttp2/transport/ping_rate_policy.cc \
src/core/ext/transport/chttp2/transport/stream_lists.cc \
src/core/ext/transport/chttp2/transport/varint.cc \

1
config.w32 generated

@ -111,6 +111,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\transport\\chttp2\\transport\\huffsyms.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\parsing.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\ping_abuse_policy.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\ping_callbacks.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\ping_rate_policy.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\stream_lists.cc " +
"src\\core\\ext\\transport\\chttp2\\transport\\varint.cc " +

2
gRPC-C++.podspec generated

@ -381,6 +381,7 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/internal.h',
'src/core/ext/transport/chttp2/transport/legacy_frame.h',
'src/core/ext/transport/chttp2/transport/ping_abuse_policy.h',
'src/core/ext/transport/chttp2/transport/ping_callbacks.h',
'src/core/ext/transport/chttp2/transport/ping_rate_policy.h',
'src/core/ext/transport/chttp2/transport/varint.h',
'src/core/ext/transport/inproc/inproc_transport.h',
@ -1450,6 +1451,7 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/internal.h',
'src/core/ext/transport/chttp2/transport/legacy_frame.h',
'src/core/ext/transport/chttp2/transport/ping_abuse_policy.h',
'src/core/ext/transport/chttp2/transport/ping_callbacks.h',
'src/core/ext/transport/chttp2/transport/ping_rate_policy.h',
'src/core/ext/transport/chttp2/transport/varint.h',
'src/core/ext/transport/inproc/inproc_transport.h',

3
gRPC-Core.podspec generated

@ -417,6 +417,8 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/parsing.cc',
'src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc',
'src/core/ext/transport/chttp2/transport/ping_abuse_policy.h',
'src/core/ext/transport/chttp2/transport/ping_callbacks.cc',
'src/core/ext/transport/chttp2/transport/ping_callbacks.h',
'src/core/ext/transport/chttp2/transport/ping_rate_policy.cc',
'src/core/ext/transport/chttp2/transport/ping_rate_policy.h',
'src/core/ext/transport/chttp2/transport/stream_lists.cc',
@ -2202,6 +2204,7 @@ Pod::Spec.new do |s|
'src/core/ext/transport/chttp2/transport/internal.h',
'src/core/ext/transport/chttp2/transport/legacy_frame.h',
'src/core/ext/transport/chttp2/transport/ping_abuse_policy.h',
'src/core/ext/transport/chttp2/transport/ping_callbacks.h',
'src/core/ext/transport/chttp2/transport/ping_rate_policy.h',
'src/core/ext/transport/chttp2/transport/varint.h',
'src/core/ext/transport/inproc/inproc_transport.h',

2
grpc.gemspec generated

@ -320,6 +320,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/transport/chttp2/transport/parsing.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/ping_abuse_policy.h )
s.files += %w( src/core/ext/transport/chttp2/transport/ping_callbacks.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/ping_callbacks.h )
s.files += %w( src/core/ext/transport/chttp2/transport/ping_rate_policy.cc )
s.files += %w( src/core/ext/transport/chttp2/transport/ping_rate_policy.h )
s.files += %w( src/core/ext/transport/chttp2/transport/stream_lists.cc )

2
grpc.gyp generated

@ -377,6 +377,7 @@
'src/core/ext/transport/chttp2/transport/huffsyms.cc',
'src/core/ext/transport/chttp2/transport/parsing.cc',
'src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc',
'src/core/ext/transport/chttp2/transport/ping_callbacks.cc',
'src/core/ext/transport/chttp2/transport/ping_rate_policy.cc',
'src/core/ext/transport/chttp2/transport/stream_lists.cc',
'src/core/ext/transport/chttp2/transport/varint.cc',
@ -1207,6 +1208,7 @@
'src/core/ext/transport/chttp2/transport/huffsyms.cc',
'src/core/ext/transport/chttp2/transport/parsing.cc',
'src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc',
'src/core/ext/transport/chttp2/transport/ping_callbacks.cc',
'src/core/ext/transport/chttp2/transport/ping_rate_policy.cc',
'src/core/ext/transport/chttp2/transport/stream_lists.cc',
'src/core/ext/transport/chttp2/transport/varint.cc',

2
package.xml generated

@ -302,6 +302,8 @@
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/parsing.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/ping_abuse_policy.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/ping_callbacks.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/ping_callbacks.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/ping_rate_policy.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/ping_rate_policy.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/transport/chttp2/transport/stream_lists.cc" role="src" />

@ -5677,6 +5677,29 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "ping_callbacks",
srcs = [
"ext/transport/chttp2/transport/ping_callbacks.cc",
],
hdrs = [
"ext/transport/chttp2/transport/ping_callbacks.h",
],
external_deps = [
"absl/container:flat_hash_map",
"absl/functional:any_invocable",
"absl/hash",
"absl/meta:type_traits",
"absl/random:bit_gen_ref",
"absl/random:distributions",
],
deps = [
"time",
"//:event_engine_base_hdrs",
"//:gpr_platform",
],
)
grpc_cc_library(
name = "ping_rate_policy",
srcs = [
@ -5692,6 +5715,7 @@ grpc_cc_library(
],
deps = [
"channel_args",
"experiments",
"match",
"time",
"//:channel_arg_names",

@ -64,6 +64,7 @@
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
#include "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h"
#include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
#include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/channel/call_tracer.h"
@ -202,19 +203,10 @@ static void init_keepalive_ping(
static void init_keepalive_ping_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
GRPC_UNUSED grpc_error_handle error);
static void start_keepalive_ping(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t, grpc_error_handle error);
static void finish_keepalive_ping(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t, grpc_error_handle error);
static void start_keepalive_ping_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t, grpc_error_handle error);
static void finish_keepalive_ping_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t, grpc_error_handle error);
static void keepalive_watchdog_fired(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
static void keepalive_watchdog_fired_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
GRPC_UNUSED grpc_error_handle error);
static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t);
namespace {
@ -317,6 +309,8 @@ void ForEachContextListEntryExecute(void* arg, Timestamps* ts,
grpc_chttp2_transport::~grpc_chttp2_transport() {
size_t i;
cancel_pings(this, GRPC_ERROR_CREATE("Transport destroyed"));
event_engine.reset();
if (channelz_socket != nullptr) {
@ -346,8 +340,6 @@ grpc_chttp2_transport::~grpc_chttp2_transport() {
GPR_ASSERT(stream_map.empty());
GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
cancel_pings(this, GRPC_ERROR_CREATE("Transport destroyed"));
while (write_cb_pool) {
grpc_chttp2_write_cb* next = write_cb_pool->next;
gpr_free(write_cb_pool);
@ -394,8 +386,10 @@ static void read_channel_args(grpc_chttp2_transport* t,
t->keepalive_timeout = std::max(
grpc_core::Duration::Zero(),
channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIMEOUT_MS)
.value_or(t->is_client ? g_default_client_keepalive_timeout
: g_default_server_keepalive_timeout));
.value_or(t->keepalive_time == grpc_core::Duration::Infinity()
? grpc_core::Duration::Infinity()
: (t->is_client ? g_default_client_keepalive_timeout
: g_default_server_keepalive_timeout)));
if (t->is_client) {
t->keepalive_permit_without_calls =
channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
@ -708,11 +702,6 @@ static void close_transport_locked(grpc_chttp2_transport* t,
t->keepalive_ping_timer_handle.reset();
}
}
if (t->keepalive_watchdog_timer_handle.has_value()) {
if (t->event_engine->Cancel(*t->keepalive_watchdog_timer_handle)) {
t->keepalive_watchdog_timer_handle.reset();
}
}
break;
case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
@ -1616,14 +1605,38 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error) {
grpc_core::StatusToString(error).c_str()));
// callback remaining pings: they're not allowed to call into the transport,
// and maybe they hold resources that need to be freed
grpc_chttp2_ping_queue* pq = &t->ping_queue;
GPR_ASSERT(!error.ok());
for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
grpc_closure_list_fail_all(&pq->lists[j], error);
grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &pq->lists[j]);
}
t->ping_callbacks.CancelAll(t->event_engine.get());
}
namespace {
class PingClosureWrapper {
public:
explicit PingClosureWrapper(grpc_closure* closure) : closure_(closure) {}
PingClosureWrapper(const PingClosureWrapper&) = delete;
PingClosureWrapper& operator=(const PingClosureWrapper&) = delete;
PingClosureWrapper(PingClosureWrapper&& other) noexcept
: closure_(other.Take()) {}
PingClosureWrapper& operator=(PingClosureWrapper&& other) noexcept {
std::swap(closure_, other.closure_);
return *this;
}
~PingClosureWrapper() {
if (closure_ != nullptr) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure_, absl::CancelledError());
}
}
void operator()() {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, Take(), absl::OkStatus());
}
private:
grpc_closure* Take() { return std::exchange(closure_, nullptr); }
grpc_closure* closure_ = nullptr;
};
} // namespace
static void send_ping_locked(grpc_chttp2_transport* t,
grpc_closure* on_initiate, grpc_closure* on_ack) {
if (!t->closed_with_error.ok()) {
@ -1631,11 +1644,8 @@ static void send_ping_locked(grpc_chttp2_transport* t,
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_ack, t->closed_with_error);
return;
}
grpc_chttp2_ping_queue* pq = &t->ping_queue;
grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
absl::OkStatus());
grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
absl::OkStatus());
t->ping_callbacks.OnPing(PingClosureWrapper(on_initiate),
PingClosureWrapper(on_ack));
}
// Specialized form of send_ping_locked for keepalive ping. If there is already
@ -1644,40 +1654,15 @@ static void send_ping_locked(grpc_chttp2_transport* t,
static void send_keepalive_ping_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
if (!t->closed_with_error.ok()) {
t->combiner->Run(
grpc_core::InitTransportClosure<start_keepalive_ping_locked>(
t->Ref(), &t->start_keepalive_ping_locked),
t->closed_with_error);
t->combiner->Run(
grpc_core::InitTransportClosure<finish_keepalive_ping_locked>(
t->Ref(), &t->finish_keepalive_ping_locked),
t->closed_with_error);
return;
}
grpc_chttp2_ping_queue* pq = &t->ping_queue;
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
// There is a ping in flight. Add yourself to the inflight closure list.
t->combiner->Run(
grpc_core::InitTransportClosure<start_keepalive_ping_locked>(
t->Ref(), &t->start_keepalive_ping_locked),
t->closed_with_error);
grpc_closure_list_append(
&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
grpc_core::InitTransportClosure<finish_keepalive_ping>(
t->Ref(), &t->finish_keepalive_ping_locked),
absl::OkStatus());
return;
}
grpc_closure_list_append(
&pq->lists[GRPC_CHTTP2_PCL_INITIATE],
grpc_core::InitTransportClosure<start_keepalive_ping>(
t->Ref(), &t->start_keepalive_ping_locked),
absl::OkStatus());
grpc_closure_list_append(
&pq->lists[GRPC_CHTTP2_PCL_NEXT],
grpc_core::InitTransportClosure<finish_keepalive_ping>(
t->Ref(), &t->finish_keepalive_ping_locked),
absl::OkStatus());
t->ping_callbacks.OnPingAck(
PingClosureWrapper(grpc_core::InitTransportClosure<finish_keepalive_ping>(
t->Ref(), &t->finish_keepalive_ping_locked)));
}
void grpc_chttp2_retry_initiate_ping(
@ -1699,19 +1684,31 @@ static void retry_initiate_ping_locked(
}
void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
grpc_chttp2_ping_queue* pq = &t->ping_queue;
if (pq->inflight_id != id) {
if (!t->ping_callbacks.AckPing(id, t->event_engine.get())) {
gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64,
std::string(t->peer_string.as_string_view()).c_str(), id);
return;
}
grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
if (t->ping_callbacks.ping_requested()) {
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
}
}
void grpc_chttp2_ping_timeout(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
t->combiner->Run(
grpc_core::NewClosure([t](grpc_error_handle) {
gpr_log(GPR_INFO, "%s: Ping timeout. Closing transport.",
std::string(t->peer_string.as_string_view()).c_str());
close_transport_locked(
t.get(),
grpc_error_set_int(GRPC_ERROR_CREATE("ping timeout"),
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE));
}),
absl::OkStatus());
}
namespace {
// Fire and forget (deletes itself on completion). Does a graceful shutdown by
@ -1731,20 +1728,11 @@ class GracefulGoaway : public grpc_core::RefCounted<GracefulGoaway> {
explicit GracefulGoaway(grpc_chttp2_transport* t) : t_(t->Ref()) {
t->sent_goaway_state = GRPC_CHTTP2_GRACEFUL_GOAWAY;
grpc_chttp2_goaway_append((1u << 31) - 1, 0, grpc_empty_slice(), &t->qbuf);
t->keepalive_timeout =
std::min(t->keepalive_timeout, grpc_core::Duration::Seconds(20));
send_ping_locked(
t, nullptr, GRPC_CLOSURE_INIT(&on_ping_ack_, OnPingAck, this, nullptr));
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
timer_handle_ = t_->event_engine->RunAfter(
grpc_core::Duration::Seconds(20),
[self = Ref(DEBUG_LOCATION, "GoawayTimer")]() mutable {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
// The ref will be unreffed in the combiner.
auto* ptr = self.release();
ptr->t_->combiner->Run(
GRPC_CLOSURE_INIT(&ptr->on_timer_, OnTimerLocked, ptr, nullptr),
absl::OkStatus());
});
}
void MaybeSendFinalGoawayLocked() {
@ -1785,26 +1773,12 @@ class GracefulGoaway : public grpc_core::RefCounted<GracefulGoaway> {
static void OnPingAckLocked(void* arg, grpc_error_handle /* error */) {
auto* self = static_cast<GracefulGoaway*>(arg);
if (self->timer_handle_ != TaskHandle::kInvalid) {
self->t_->event_engine->Cancel(
std::exchange(self->timer_handle_, TaskHandle::kInvalid));
}
self->MaybeSendFinalGoawayLocked();
self->Unref();
}
static void OnTimerLocked(void* arg, grpc_error_handle /* error */) {
auto* self = static_cast<GracefulGoaway*>(arg);
// Clearing the handle since the timer has fired and the handle is invalid.
self->timer_handle_ = TaskHandle::kInvalid;
self->MaybeSendFinalGoawayLocked();
self->Unref();
}
const grpc_core::RefCountedPtr<grpc_chttp2_transport> t_;
grpc_closure on_ping_ack_;
TaskHandle timer_handle_ = TaskHandle::kInvalid;
grpc_closure on_timer_;
};
} // namespace
@ -2791,39 +2765,6 @@ static void init_keepalive_ping_locked(
}
}
static void start_keepalive_ping(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
grpc_error_handle error) {
auto* tp = t.get();
tp->combiner->Run(
grpc_core::InitTransportClosure<start_keepalive_ping_locked>(
std::move(t), &tp->start_keepalive_ping_locked),
error);
}
static void start_keepalive_ping_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
grpc_error_handle error) {
if (!error.ok()) {
return;
}
if (t->channelz_socket != nullptr) {
t->channelz_socket->RecordKeepaliveSent();
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
gpr_log(GPR_INFO, "%s: Start keepalive ping",
std::string(t->peer_string.as_string_view()).c_str());
}
t->keepalive_watchdog_timer_handle =
t->event_engine->RunAfter(t->keepalive_timeout, [t]() mutable {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
keepalive_watchdog_fired(std::move(t));
});
t->keepalive_ping_started = true;
}
static void finish_keepalive_ping(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
grpc_error_handle error) {
@ -2844,19 +2785,7 @@ static void finish_keepalive_ping_locked(
gpr_log(GPR_INFO, "%s: Finish keepalive ping",
std::string(t->peer_string.as_string_view()).c_str());
}
if (!t->keepalive_ping_started) {
// start_keepalive_ping_locked has not run yet. Reschedule
// finish_keepalive_ping_locked for it to be run later.
finish_keepalive_ping(std::move(t), std::move(error));
return;
}
t->keepalive_ping_started = false;
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
if (t->keepalive_watchdog_timer_handle.has_value()) {
if (t->event_engine->Cancel(*t->keepalive_watchdog_timer_handle)) {
t->keepalive_watchdog_timer_handle.reset();
}
}
GPR_ASSERT(!t->keepalive_ping_timer_handle.has_value());
t->keepalive_ping_timer_handle =
t->event_engine->RunAfter(t->keepalive_time, [t] {
@ -2868,39 +2797,6 @@ static void finish_keepalive_ping_locked(
}
}
static void keepalive_watchdog_fired(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
auto* tp = t.get();
tp->combiner->Run(
grpc_core::InitTransportClosure<keepalive_watchdog_fired_locked>(
std::move(t), &tp->keepalive_watchdog_fired_locked),
absl::OkStatus());
}
static void keepalive_watchdog_fired_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
GRPC_UNUSED grpc_error_handle error) {
GPR_DEBUG_ASSERT(error.ok());
GPR_ASSERT(t->keepalive_watchdog_timer_handle.has_value());
t->keepalive_watchdog_timer_handle.reset();
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
gpr_log(GPR_INFO, "%s: Keepalive watchdog fired. Closing transport.",
std::string(t->peer_string.as_string_view()).c_str());
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
close_transport_locked(
t.get(),
grpc_error_set_int(GRPC_ERROR_CREATE("keepalive watchdog timeout"),
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE));
} else {
// If keepalive_state is not PINGING, we consider it as an error. Maybe the
// cancellation failed in finish_keepalive_ping_locked. Users have seen
// other states: https://github.com/grpc/grpc/issues/32085.
gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
}
}
static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t) {
if (t->keepalive_ping_timer_handle.has_value()) {
if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) {

@ -50,6 +50,7 @@
#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
#include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
#include "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h"
#include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
#include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz.h"
@ -145,18 +146,6 @@ typedef enum {
const char* grpc_chttp2_initiate_write_reason_string(
grpc_chttp2_initiate_write_reason reason);
struct grpc_chttp2_ping_queue {
grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT] = {};
uint64_t inflight_id = 0;
};
struct grpc_chttp2_repeated_ping_state {
grpc_core::Timestamp last_ping_sent_time;
int pings_before_data_required;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
delayed_ping_timer_handle;
};
// deframer state for the overall http2 stream of bytes
typedef enum {
// prefix: one entry per http2 connection prefix byte
@ -339,12 +328,11 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
uint32_t last_new_stream_id = 0;
/// ping queues for various ping insertion points
grpc_chttp2_ping_queue ping_queue = grpc_chttp2_ping_queue();
grpc_core::Chttp2PingAbusePolicy ping_abuse_policy;
grpc_core::Chttp2PingRatePolicy ping_rate_policy;
grpc_core::Chttp2PingCallbacks ping_callbacks;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
delayed_ping_timer_handle;
uint64_t ping_ctr = 0; // unique id for pings
grpc_closure retry_initiate_ping_locked;
/// ping acks
@ -425,9 +413,6 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
/// timer to initiate ping events
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
keepalive_ping_timer_handle;
/// watchdog to kill the transport when waiting for the keepalive ping
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
keepalive_watchdog_timer_handle;
/// time duration in between pings
grpc_core::Duration keepalive_time;
/// grace period for a ping to complete before watchdog kicks in
@ -723,6 +708,9 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
const char* desc,
grpc_core::DebugLocation whence = {});
void grpc_chttp2_ping_timeout(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
#define GRPC_HEADER_SIZE_IN_BYTES 5
#define MAX_SIZE_T (~(size_t)0)

@ -0,0 +1,99 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
#include <utility>
#include "absl/meta/type_traits.h"
#include "absl/random/distributions.h"
namespace grpc_core {
void Chttp2PingCallbacks::OnPing(Callback on_start, Callback on_ack) {
on_start_.emplace_back(std::move(on_start));
on_ack_.emplace_back(std::move(on_ack));
ping_requested_ = true;
}
void Chttp2PingCallbacks::OnPingAck(Callback on_ack) {
auto it = inflight_.find(most_recent_inflight_);
if (it != inflight_.end()) {
it->second.on_ack.emplace_back(std::move(on_ack));
return;
}
ping_requested_ = true;
on_ack_.emplace_back(std::move(on_ack));
}
uint64_t Chttp2PingCallbacks::StartPing(
absl::BitGenRef bitgen, Duration ping_timeout, Callback on_timeout,
grpc_event_engine::experimental::EventEngine* event_engine) {
uint64_t id;
do {
id = absl::Uniform<uint64_t>(bitgen);
} while (inflight_.contains(id));
CallbackVec cbs = std::move(on_start_);
CallbackVec().swap(on_start_);
InflightPing inflight;
inflight.on_ack.swap(on_ack_);
if (ping_timeout != Duration::Infinity()) {
inflight.on_timeout =
event_engine->RunAfter(ping_timeout, std::move(on_timeout));
} else {
inflight.on_timeout =
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
}
inflight_.emplace(id, std::move(inflight));
most_recent_inflight_ = id;
ping_requested_ = false;
for (auto& cb : cbs) {
cb();
}
return id;
}
bool Chttp2PingCallbacks::AckPing(
uint64_t id, grpc_event_engine::experimental::EventEngine* event_engine) {
auto ping = inflight_.extract(id);
if (ping.empty()) return false;
if (ping.mapped().on_timeout !=
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid) {
event_engine->Cancel(ping.mapped().on_timeout);
}
for (auto& cb : ping.mapped().on_ack) {
cb();
}
return true;
}
void Chttp2PingCallbacks::CancelAll(
grpc_event_engine::experimental::EventEngine* event_engine) {
CallbackVec().swap(on_start_);
CallbackVec().swap(on_ack_);
for (auto& cbs : inflight_) {
CallbackVec().swap(cbs.second.on_ack);
if (cbs.second.on_timeout !=
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid) {
event_engine->Cancel(std::exchange(
cbs.second.on_timeout,
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid));
}
}
ping_requested_ = false;
}
} // namespace grpc_core

@ -0,0 +1,71 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_PING_CALLBACKS_H
#define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_PING_CALLBACKS_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <vector>
#include "absl/container/flat_hash_map.h"
#include "absl/functional/any_invocable.h"
#include "absl/hash/hash.h"
#include "absl/random/bit_gen_ref.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/gprpp/time.h"
namespace grpc_core {
class Chttp2PingCallbacks {
public:
using Callback = absl::AnyInvocable<void()>;
void RequestPing() { ping_requested_ = true; }
void OnPing(Callback on_start, Callback on_ack);
void OnPingAck(Callback on_ack);
GRPC_MUST_USE_RESULT uint64_t
StartPing(absl::BitGenRef bitgen, Duration ping_timeout, Callback on_timeout,
grpc_event_engine::experimental::EventEngine* event_engine);
bool AckPing(uint64_t id,
grpc_event_engine::experimental::EventEngine* event_engine);
void CancelAll(grpc_event_engine::experimental::EventEngine* event_engine);
bool ping_requested() const { return ping_requested_; }
size_t pings_inflight() const { return inflight_.size(); }
private:
using CallbackVec = std::vector<Callback>;
struct InflightPing {
grpc_event_engine::experimental::EventEngine::TaskHandle on_timeout;
CallbackVec on_ack;
};
absl::flat_hash_map<uint64_t, InflightPing> inflight_;
uint64_t most_recent_inflight_ = 0;
bool ping_requested_ = false;
CallbackVec on_start_;
CallbackVec on_ack_;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_PING_CALLBACKS_H

@ -25,12 +25,20 @@
#include <grpc/impl/channel_arg_names.h>
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/match.h"
/** How many pings do we allow to be inflight at any given time?
In older versions of gRPC this was implicitly 1.
With the multiping experiment we allow this to rise to 100 by default.
TODO(ctiller): consider making this public API */
#define GRPC_ARG_HTTP2_MAX_INFLIGHT_PINGS "grpc.http2.max_inflight_pings"
namespace grpc_core {
namespace {
int g_default_max_pings_without_data = 2;
absl::optional<int> g_default_max_inflight_pings;
} // namespace
Chttp2PingRatePolicy::Chttp2PingRatePolicy(const ChannelArgs& args,
@ -39,19 +47,30 @@ Chttp2PingRatePolicy::Chttp2PingRatePolicy(const ChannelArgs& args,
is_client
? std::max(0, args.GetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)
.value_or(g_default_max_pings_without_data))
: 0) {}
: 0),
// Configuration via channel arg dominates, otherwise if the multiping
// experiment is enabled we use 100, otherwise 1.
max_inflight_pings_(
std::max(0, args.GetInt(GRPC_ARG_HTTP2_MAX_INFLIGHT_PINGS)
.value_or(g_default_max_inflight_pings.value_or(
IsMultipingEnabled() ? 100 : 1)))) {}
void Chttp2PingRatePolicy::SetDefaults(const ChannelArgs& args) {
g_default_max_pings_without_data =
std::max(0, args.GetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)
.value_or(g_default_max_pings_without_data));
g_default_max_inflight_pings = args.GetInt(GRPC_ARG_HTTP2_MAX_INFLIGHT_PINGS);
}
Chttp2PingRatePolicy::RequestSendPingResult
Chttp2PingRatePolicy::RequestSendPing(Duration next_allowed_ping_interval) {
Chttp2PingRatePolicy::RequestSendPing(Duration next_allowed_ping_interval,
size_t inflight_pings) const {
if (max_pings_without_data_ != 0 && pings_before_data_required_ == 0) {
return TooManyRecentPings{};
}
if (max_inflight_pings_ != 0 && inflight_pings > max_inflight_pings_) {
return TooManyRecentPings{};
}
const Timestamp next_allowed_ping =
last_ping_sent_time_ + next_allowed_ping_interval;
const Timestamp now = Timestamp::Now();
@ -59,11 +78,14 @@ Chttp2PingRatePolicy::RequestSendPing(Duration next_allowed_ping_interval) {
return TooSoon{next_allowed_ping_interval, last_ping_sent_time_,
next_allowed_ping - now};
}
last_ping_sent_time_ = now;
if (pings_before_data_required_) --pings_before_data_required_;
return SendGranted{};
}
void Chttp2PingRatePolicy::SentPing() {
last_ping_sent_time_ = Timestamp::Now();
if (pings_before_data_required_) --pings_before_data_required_;
}
void Chttp2PingRatePolicy::ReceivedDataFrame() {
last_ping_sent_time_ = Timestamp::InfPast();
}

@ -17,6 +17,8 @@
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <iosfwd>
#include <string>
@ -51,7 +53,9 @@ class Chttp2PingRatePolicy {
using RequestSendPingResult =
absl::variant<SendGranted, TooManyRecentPings, TooSoon>;
RequestSendPingResult RequestSendPing(Duration next_allowed_ping_interval);
RequestSendPingResult RequestSendPing(Duration next_allowed_ping_interval,
size_t inflight_pings) const;
void SentPing();
void ResetPingsBeforeDataRequired();
void ReceivedDataFrame();
std::string GetDebugString() const;
@ -60,6 +64,7 @@ class Chttp2PingRatePolicy {
private:
const int max_pings_without_data_;
const int max_inflight_pings_;
// No pings allowed before receiving a header or data frame.
int pings_before_data_required_ = 0;
Timestamp last_ping_sent_time_ = Timestamp::InfPast();

@ -49,17 +49,17 @@
#include "src/core/ext/transport/chttp2/transport/http_trace.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
#include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
#include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@ -98,6 +98,9 @@ static grpc_core::Duration NextAllowedPingInterval(grpc_chttp2_transport* t) {
// The gRPC keepalive spec doesn't call for any throttling on the server
// side, but we are adding some throttling for protection anyway, unless
// we are doing a graceful GOAWAY in which case we don't want to wait.
if (grpc_core::IsMultipingEnabled()) {
return grpc_core::Duration::Seconds(1);
}
return t->keepalive_time == grpc_core::Duration::Infinity()
? grpc_core::Duration::Seconds(20)
: t->keepalive_time / 2;
@ -106,37 +109,32 @@ static grpc_core::Duration NextAllowedPingInterval(grpc_chttp2_transport* t) {
}
static void maybe_initiate_ping(grpc_chttp2_transport* t) {
grpc_chttp2_ping_queue* pq = &t->ping_queue;
if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
if (!t->ping_callbacks.ping_requested()) {
// no ping needed: wait
return;
}
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
// ping already in-flight: wait
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
gpr_log(GPR_INFO, "%s: Ping delayed [%s]: already pinging",
t->is_client ? "CLIENT" : "SERVER",
std::string(t->peer_string.as_string_view()).c_str());
}
return;
}
// InvalidateNow to avoid getting stuck re-initializing the ping timer
// in a loop while draining the currently-held combiner. Also see
// https://github.com/grpc/grpc/issues/26079.
grpc_core::ExecCtx::Get()->InvalidateNow();
Match(
t->ping_rate_policy.RequestSendPing(NextAllowedPingInterval(t)),
[pq, t](grpc_core::Chttp2PingRatePolicy::SendGranted) {
pq->inflight_id = t->ping_ctr;
t->ping_ctr++;
grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
&pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
t->ping_rate_policy.RequestSendPing(NextAllowedPingInterval(t),
t->ping_callbacks.pings_inflight()),
[t](grpc_core::Chttp2PingRatePolicy::SendGranted) {
t->ping_rate_policy.SentPing();
const uint64_t id = t->ping_callbacks.StartPing(
t->bitgen, t->keepalive_timeout,
[t = t->Ref()] {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_chttp2_ping_timeout(t);
},
t->event_engine.get());
grpc_slice_buffer_add(t->outbuf.c_slice_buffer(),
grpc_chttp2_ping_create(false, pq->inflight_id));
grpc_chttp2_ping_create(false, id));
if (t->channelz_socket != nullptr) {
t->channelz_socket->RecordKeepaliveSent();
}
grpc_core::global_stats().IncrementHttp2PingsSent();
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||

@ -118,6 +118,9 @@ const char* const description_combiner_offload_to_event_engine =
"Offload Combiner work onto the EventEngine instead of the Executor.";
const char* const additional_constraints_combiner_offload_to_event_engine =
"{}";
const char* const description_multiping =
"Allow more than one ping to be in flight at a time by default.";
const char* const additional_constraints_multiping = "{}";
const char* const description_registered_method_lookup_in_transport =
"Change registered method's lookup point to transport";
const char* const additional_constraints_registered_method_lookup_in_transport =
@ -193,6 +196,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"combiner_offload_to_event_engine",
description_combiner_offload_to_event_engine,
additional_constraints_combiner_offload_to_event_engine, true, true},
{"multiping", description_multiping, additional_constraints_multiping,
false, true},
{"registered_method_lookup_in_transport",
description_registered_method_lookup_in_transport,
additional_constraints_registered_method_lookup_in_transport, true, true},
@ -302,6 +307,9 @@ const char* const description_combiner_offload_to_event_engine =
"Offload Combiner work onto the EventEngine instead of the Executor.";
const char* const additional_constraints_combiner_offload_to_event_engine =
"{}";
const char* const description_multiping =
"Allow more than one ping to be in flight at a time by default.";
const char* const additional_constraints_multiping = "{}";
const char* const description_registered_method_lookup_in_transport =
"Change registered method's lookup point to transport";
const char* const additional_constraints_registered_method_lookup_in_transport =
@ -377,6 +385,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"combiner_offload_to_event_engine",
description_combiner_offload_to_event_engine,
additional_constraints_combiner_offload_to_event_engine, true, true},
{"multiping", description_multiping, additional_constraints_multiping,
false, true},
{"registered_method_lookup_in_transport",
description_registered_method_lookup_in_transport,
additional_constraints_registered_method_lookup_in_transport, true, true},
@ -486,6 +496,9 @@ const char* const description_combiner_offload_to_event_engine =
"Offload Combiner work onto the EventEngine instead of the Executor.";
const char* const additional_constraints_combiner_offload_to_event_engine =
"{}";
const char* const description_multiping =
"Allow more than one ping to be in flight at a time by default.";
const char* const additional_constraints_multiping = "{}";
const char* const description_registered_method_lookup_in_transport =
"Change registered method's lookup point to transport";
const char* const additional_constraints_registered_method_lookup_in_transport =
@ -561,6 +574,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"combiner_offload_to_event_engine",
description_combiner_offload_to_event_engine,
additional_constraints_combiner_offload_to_event_engine, true, true},
{"multiping", description_multiping, additional_constraints_multiping,
false, true},
{"registered_method_lookup_in_transport",
description_registered_method_lookup_in_transport,
additional_constraints_registered_method_lookup_in_transport, true, true},

@ -90,6 +90,7 @@ inline bool IsWrrDelegateToPickFirstEnabled() { return true; }
inline bool IsPickFirstHappyEyeballsEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_COMBINER_OFFLOAD_TO_EVENT_ENGINE
inline bool IsCombinerOffloadToEventEngineEnabled() { return true; }
inline bool IsMultipingEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_REGISTERED_METHOD_LOOKUP_IN_TRANSPORT
inline bool IsRegisteredMethodLookupInTransportEnabled() { return true; }
#ifndef NDEBUG
@ -137,6 +138,7 @@ inline bool IsWrrDelegateToPickFirstEnabled() { return true; }
inline bool IsPickFirstHappyEyeballsEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_COMBINER_OFFLOAD_TO_EVENT_ENGINE
inline bool IsCombinerOffloadToEventEngineEnabled() { return true; }
inline bool IsMultipingEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_REGISTERED_METHOD_LOOKUP_IN_TRANSPORT
inline bool IsRegisteredMethodLookupInTransportEnabled() { return true; }
#ifndef NDEBUG
@ -184,6 +186,7 @@ inline bool IsWrrDelegateToPickFirstEnabled() { return true; }
inline bool IsPickFirstHappyEyeballsEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_COMBINER_OFFLOAD_TO_EVENT_ENGINE
inline bool IsCombinerOffloadToEventEngineEnabled() { return true; }
inline bool IsMultipingEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_REGISTERED_METHOD_LOOKUP_IN_TRANSPORT
inline bool IsRegisteredMethodLookupInTransportEnabled() { return true; }
#ifndef NDEBUG
@ -226,6 +229,7 @@ enum ExperimentIds {
kExperimentIdWrrDelegateToPickFirst,
kExperimentIdPickFirstHappyEyeballs,
kExperimentIdCombinerOffloadToEventEngine,
kExperimentIdMultiping,
kExperimentIdRegisteredMethodLookupInTransport,
kExperimentIdCallStatusOverrideOnCancellation,
kNumExperiments
@ -334,6 +338,10 @@ inline bool IsPickFirstHappyEyeballsEnabled() {
inline bool IsCombinerOffloadToEventEngineEnabled() {
return IsExperimentEnabled(kExperimentIdCombinerOffloadToEventEngine);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_MULTIPING
inline bool IsMultipingEnabled() {
return IsExperimentEnabled(kExperimentIdMultiping);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_REGISTERED_METHOD_LOOKUP_IN_TRANSPORT
inline bool IsRegisteredMethodLookupInTransportEnabled() {
return IsExperimentEnabled(kExperimentIdRegisteredMethodLookupInTransport);

@ -203,6 +203,12 @@
expiry: 2024/01/15
owner: hork@google.com
test_tags: []
- name: multiping
description:
Allow more than one ping to be in flight at a time by default.
expiry: 2024/01/15
owner: ctiller@google.com
test_tags: [flow_control_test]
- name: registered_method_lookup_in_transport
description:
Change registered method's lookup point to transport

@ -120,6 +120,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/transport/chttp2/transport/huffsyms.cc',
'src/core/ext/transport/chttp2/transport/parsing.cc',
'src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc',
'src/core/ext/transport/chttp2/transport/ping_callbacks.cc',
'src/core/ext/transport/chttp2/transport/ping_rate_policy.cc',
'src/core/ext/transport/chttp2/transport/stream_lists.cc',
'src/core/ext/transport/chttp2/transport/varint.cc',

@ -51,7 +51,7 @@ CORE_END2END_TEST(Http2SingleHopTest, KeepaliveTimeout) {
Expect(1, true);
Step();
EXPECT_EQ(server_status.status(), GRPC_STATUS_UNAVAILABLE);
EXPECT_EQ(server_status.message(), "keepalive watchdog timeout");
EXPECT_EQ(server_status.message(), "ping timeout");
}
// Verify that reads reset the keepalive ping timer. The client sends 30 pings

@ -218,6 +218,22 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "ping_callbacks_test",
srcs = ["ping_callbacks_test.cc"],
external_deps = [
"gtest",
"absl/random",
],
language = "C++",
uses_polling = False,
deps = [
"//:gpr",
"//:grpc",
"//test/core/event_engine:mock_event_engine",
],
)
grpc_cc_test(
name = "flow_control_test",
srcs = ["flow_control_test.cc"],

@ -27,10 +27,10 @@
#include <memory>
#include <string>
#include <thread>
#include <tuple>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
@ -50,6 +50,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/closure.h"
@ -176,16 +177,34 @@ class GracefulShutdownTest : public ::testing::Test {
// Waits for \a bytes to show up in read_bytes_
void WaitForReadBytes(absl::string_view bytes) {
std::atomic<bool> done{false};
auto start_time = absl::Now();
{
MutexLock lock(&mu_);
while (!absl::StrContains(read_bytes_, bytes)) {
while (true) {
auto where = read_bytes_.find(std::string(bytes));
if (where != std::string::npos) {
read_bytes_ = read_bytes_.substr(where + bytes.size());
break;
}
ASSERT_LT(absl::Now() - start_time, absl::Seconds(60));
read_cv_.WaitWithTimeout(&mu_, absl::Seconds(5));
}
}
done = true;
std::string WaitForNBytes(size_t bytes) {
auto start_time = absl::Now();
MutexLock lock(&mu_);
while (read_bytes_.size() < bytes) {
EXPECT_LT(absl::Now() - start_time, absl::Seconds(60));
read_cv_.WaitWithTimeout(&mu_, absl::Seconds(5));
}
std::string result = read_bytes_.substr(0, bytes);
read_bytes_ = read_bytes_.substr(bytes);
return result;
}
void WaitForClose() {
ASSERT_TRUE(read_end_notification_.WaitForNotificationWithTimeout(
absl::Minutes(1)));
}
void WaitForGoaway(uint32_t last_stream_id, uint32_t error_code = 0,
@ -201,9 +220,20 @@ class GracefulShutdownTest : public ::testing::Test {
WaitForReadBytes(expected_bytes);
}
void WaitForPing(uint64_t opaque_data) {
grpc_slice ping_slice = grpc_chttp2_ping_create(0, opaque_data);
WaitForReadBytes(StringViewFromSlice(ping_slice));
uint64_t WaitForPing() {
grpc_slice ping_slice = grpc_chttp2_ping_create(0, 0);
auto whole_ping = StringViewFromSlice(ping_slice);
GPR_ASSERT(whole_ping.size() == 9 + 8);
WaitForReadBytes(whole_ping.substr(0, 9));
std::string ping = WaitForNBytes(8);
return (static_cast<uint64_t>(static_cast<uint8_t>(ping[0])) << 56) |
(static_cast<uint64_t>(static_cast<uint8_t>(ping[1])) << 48) |
(static_cast<uint64_t>(static_cast<uint8_t>(ping[2])) << 40) |
(static_cast<uint64_t>(static_cast<uint8_t>(ping[3])) << 32) |
(static_cast<uint64_t>(static_cast<uint8_t>(ping[4])) << 24) |
(static_cast<uint64_t>(static_cast<uint8_t>(ping[5])) << 16) |
(static_cast<uint64_t>(static_cast<uint8_t>(ping[6])) << 8) |
(static_cast<uint64_t>(static_cast<uint8_t>(ping[7])));
}
void SendPingAck(uint64_t opaque_data) {
@ -237,7 +267,9 @@ class GracefulShutdownTest : public ::testing::Test {
}
static void OnWriteDone(void* arg, grpc_error_handle error) {
GPR_ASSERT(error.ok());
if (!error.ok()) {
Crash(absl::StrCat("Write failed: ", error.ToString()));
}
Notification* on_write_done_notification_ = static_cast<Notification*>(arg);
on_write_done_notification_->Notify();
}
@ -263,9 +295,9 @@ TEST_F(GracefulShutdownTest, GracefulGoaway) {
// Wait for first goaway
WaitForGoaway((1u << 31) - 1);
// Wait for the ping
WaitForPing(0);
uint64_t ping_id = WaitForPing();
// Reply to the ping
SendPingAck(0);
SendPingAck(ping_id);
// Wait for final goaway
WaitForGoaway(0);
// The shutdown should successfully complete.
@ -288,7 +320,7 @@ TEST_F(GracefulShutdownTest, RequestStartedBeforeFinalGoaway) {
// Wait for first goaway
WaitForGoaway((1u << 31) - 1);
// Wait for the ping
WaitForPing(0);
uint64_t ping_id = WaitForPing();
// Start a request
constexpr char kRequestFrame[] =
"\x00\x00\xbe\x01\x05\x00\x00\x00\x01"
@ -304,7 +336,7 @@ TEST_F(GracefulShutdownTest, RequestStartedBeforeFinalGoaway) {
"\x10\x0auser-agent\x17grpc-c/0.12.0.0 (linux)";
Write(absl::string_view(kRequestFrame, sizeof(kRequestFrame) - 1));
// Reply to the ping
SendPingAck(0);
SendPingAck(ping_id);
// Wait for final goaway with last stream ID 1 to show that the HTTP2
// transport accepted the stream.
WaitForGoaway(1);
@ -352,9 +384,9 @@ TEST_F(GracefulShutdownTest, RequestStartedAfterFinalGoawayIsIgnored) {
// Wait for first goaway
WaitForGoaway((1u << 31) - 1);
// Wait for the ping
WaitForPing(0);
uint64_t ping_id = WaitForPing();
// Reply to the ping
SendPingAck(0);
SendPingAck(ping_id);
// Wait for final goaway
WaitForGoaway(1);
@ -418,9 +450,9 @@ TEST_F(GracefulShutdownTest, UnresponsiveClient) {
// Wait for first goaway
WaitForGoaway((1u << 31) - 1);
// Wait for the ping
WaitForPing(0);
std::ignore = WaitForPing();
// Wait for final goaway without sending a ping ACK.
WaitForGoaway(0);
WaitForClose();
EXPECT_GE(absl::Now() - initial_time,
absl::Seconds(20) -
absl::Seconds(

@ -0,0 +1,568 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
#include <chrono>
#include "absl/random/random.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "src/core/lib/gprpp/crash.h"
#include "test/core/event_engine/mock_event_engine.h"
using grpc_event_engine::experimental::EventEngine;
using grpc_event_engine::experimental::MockEventEngine;
using testing::_;
using testing::Matcher;
using testing::Return;
using testing::StrictMock;
namespace grpc_core {
namespace {
TEST(PingCallbacksTest, RequestPingRequestsPing) {
Chttp2PingCallbacks callbacks;
EXPECT_FALSE(callbacks.ping_requested());
callbacks.RequestPing();
EXPECT_TRUE(callbacks.ping_requested());
}
TEST(PingCallbacksTest, OnPingRequestsPing) {
Chttp2PingCallbacks callbacks;
EXPECT_FALSE(callbacks.ping_requested());
callbacks.OnPing([] {}, [] {});
EXPECT_TRUE(callbacks.ping_requested());
}
TEST(PingCallbacksTest, OnPingAckRequestsPing) {
Chttp2PingCallbacks callbacks;
EXPECT_FALSE(callbacks.ping_requested());
callbacks.OnPingAck([] {});
EXPECT_TRUE(callbacks.ping_requested());
}
TEST(PingCallbacksTest, PingRoundtrips) {
StrictMock<MockEventEngine> event_engine;
absl::BitGen bitgen;
Chttp2PingCallbacks callbacks;
bool started = false;
bool acked = false;
EXPECT_FALSE(callbacks.ping_requested());
// Request ping
callbacks.OnPing(
[&started] {
EXPECT_FALSE(started);
started = true;
},
[&acked] {
EXPECT_FALSE(acked);
acked = true;
});
EXPECT_TRUE(callbacks.ping_requested());
EXPECT_EQ(callbacks.pings_inflight(), 0);
EXPECT_FALSE(started);
EXPECT_FALSE(acked);
// Start ping should call the start methods, set a timeout, and clear the
// request
EXPECT_CALL(event_engine, RunAfter(EventEngine::Duration(Duration::Hours(24)),
Matcher<absl::AnyInvocable<void()>>(_)))
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_EQ(callbacks.pings_inflight(), 1);
EXPECT_TRUE(started);
EXPECT_FALSE(acked);
// Ack should cancel the timeout
EXPECT_CALL(event_engine, Cancel(EventEngine::TaskHandle{123, 456}))
.WillOnce(Return(true));
EXPECT_TRUE(callbacks.AckPing(id, &event_engine));
EXPECT_EQ(callbacks.pings_inflight(), 0);
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started);
EXPECT_TRUE(acked);
}
TEST(PingCallbacksTest, PingRoundtripsWithInfiniteTimeout) {
StrictMock<MockEventEngine> event_engine;
absl::BitGen bitgen;
Chttp2PingCallbacks callbacks;
bool started = false;
bool acked = false;
EXPECT_FALSE(callbacks.ping_requested());
// Request ping
callbacks.OnPing(
[&started] {
EXPECT_FALSE(started);
started = true;
},
[&acked] {
EXPECT_FALSE(acked);
acked = true;
});
EXPECT_TRUE(callbacks.ping_requested());
EXPECT_EQ(callbacks.pings_inflight(), 0);
EXPECT_FALSE(started);
EXPECT_FALSE(acked);
auto id = callbacks.StartPing(
bitgen, Duration::Infinity(), [] { Crash("should not reach here"); },
&event_engine);
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_EQ(callbacks.pings_inflight(), 1);
EXPECT_TRUE(started);
EXPECT_FALSE(acked);
EXPECT_TRUE(callbacks.AckPing(id, &event_engine));
EXPECT_EQ(callbacks.pings_inflight(), 0);
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started);
EXPECT_TRUE(acked);
}
TEST(PingCallbacksTest, InvalidPingIdFlagsError) {
StrictMock<MockEventEngine> event_engine;
Chttp2PingCallbacks callbacks;
EXPECT_FALSE(callbacks.AckPing(1234, &event_engine));
}
TEST(PingCallbacksTest, DuplicatePingIdFlagsError) {
StrictMock<MockEventEngine> event_engine;
absl::BitGen bitgen;
Chttp2PingCallbacks callbacks;
bool started = false;
bool acked = false;
EXPECT_FALSE(callbacks.ping_requested());
callbacks.OnPing(
[&started] {
EXPECT_FALSE(started);
started = true;
},
[&acked] {
EXPECT_FALSE(acked);
acked = true;
});
EXPECT_TRUE(callbacks.ping_requested());
EXPECT_FALSE(started);
EXPECT_FALSE(acked);
EXPECT_CALL(event_engine, RunAfter(EventEngine::Duration(Duration::Hours(24)),
Matcher<absl::AnyInvocable<void()>>(_)))
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started);
EXPECT_FALSE(acked);
EXPECT_CALL(event_engine, Cancel(EventEngine::TaskHandle{123, 456}))
.WillOnce(Return(true));
EXPECT_TRUE(callbacks.AckPing(id, &event_engine));
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started);
EXPECT_TRUE(acked);
// Second ping ack on the same id should fail
EXPECT_FALSE(callbacks.AckPing(id, &event_engine));
}
TEST(PingCallbacksTest, OnPingAckCanPiggybackInflightPings) {
StrictMock<MockEventEngine> event_engine;
absl::BitGen bitgen;
Chttp2PingCallbacks callbacks;
bool started = false;
bool acked_first = false;
bool acked_second = false;
EXPECT_FALSE(callbacks.ping_requested());
callbacks.OnPing(
[&started] {
EXPECT_FALSE(started);
started = true;
},
[&acked_first] {
EXPECT_FALSE(acked_first);
acked_first = true;
});
EXPECT_TRUE(callbacks.ping_requested());
EXPECT_FALSE(started);
EXPECT_FALSE(acked_first);
EXPECT_FALSE(acked_second);
EXPECT_CALL(event_engine, RunAfter(EventEngine::Duration(Duration::Hours(24)),
Matcher<absl::AnyInvocable<void()>>(_)))
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started);
EXPECT_FALSE(acked_first);
EXPECT_FALSE(acked_second);
callbacks.OnPingAck([&acked_second] {
EXPECT_FALSE(acked_second);
acked_second = true;
});
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started);
EXPECT_FALSE(acked_first);
EXPECT_FALSE(acked_second);
EXPECT_CALL(event_engine, Cancel(EventEngine::TaskHandle{123, 456}))
.WillOnce(Return(true));
EXPECT_TRUE(callbacks.AckPing(id, &event_engine));
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started);
EXPECT_TRUE(acked_first);
EXPECT_TRUE(acked_second);
}
TEST(PingCallbacksTest, PingAckRoundtrips) {
StrictMock<MockEventEngine> event_engine;
absl::BitGen bitgen;
Chttp2PingCallbacks callbacks;
bool acked = false;
EXPECT_FALSE(callbacks.ping_requested());
callbacks.OnPingAck([&acked] {
EXPECT_FALSE(acked);
acked = true;
});
EXPECT_TRUE(callbacks.ping_requested());
EXPECT_FALSE(acked);
EXPECT_CALL(event_engine, RunAfter(EventEngine::Duration(Duration::Hours(24)),
Matcher<absl::AnyInvocable<void()>>(_)))
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_FALSE(acked);
EXPECT_CALL(event_engine, Cancel(EventEngine::TaskHandle{123, 456}))
.WillOnce(Return(true));
EXPECT_TRUE(callbacks.AckPing(id, &event_engine));
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(acked);
}
TEST(PingCallbacksTest, MultiPingRoundtrips) {
StrictMock<MockEventEngine> event_engine;
absl::BitGen bitgen;
Chttp2PingCallbacks callbacks;
bool started1 = false;
bool acked1 = false;
bool started2 = false;
bool acked2 = false;
EXPECT_FALSE(callbacks.ping_requested());
callbacks.OnPing(
[&started1] {
EXPECT_FALSE(started1);
started1 = true;
},
[&acked1] {
EXPECT_FALSE(acked1);
acked1 = true;
});
EXPECT_TRUE(callbacks.ping_requested());
EXPECT_FALSE(started1);
EXPECT_FALSE(acked1);
EXPECT_FALSE(started2);
EXPECT_FALSE(acked2);
EXPECT_CALL(event_engine, RunAfter(EventEngine::Duration(Duration::Hours(24)),
Matcher<absl::AnyInvocable<void()>>(_)))
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id1 = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started1);
EXPECT_FALSE(acked1);
EXPECT_FALSE(started2);
EXPECT_FALSE(acked2);
callbacks.OnPing(
[&started2] {
EXPECT_FALSE(started2);
started2 = true;
},
[&acked2] {
EXPECT_FALSE(acked2);
acked2 = true;
});
EXPECT_TRUE(callbacks.ping_requested());
EXPECT_TRUE(started1);
EXPECT_FALSE(acked1);
EXPECT_FALSE(started2);
EXPECT_FALSE(acked2);
EXPECT_CALL(event_engine, RunAfter(EventEngine::Duration(Duration::Hours(24)),
Matcher<absl::AnyInvocable<void()>>(_)))
.WillOnce([]() {
return EventEngine::TaskHandle{123, 789};
});
auto id2 = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
EXPECT_NE(id1, id2);
EXPECT_TRUE(started1);
EXPECT_FALSE(acked1);
EXPECT_TRUE(started2);
EXPECT_FALSE(acked2);
EXPECT_CALL(event_engine, Cancel(EventEngine::TaskHandle{123, 456}))
.WillOnce(Return(true));
EXPECT_TRUE(callbacks.AckPing(id1, &event_engine));
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started1);
EXPECT_TRUE(acked1);
EXPECT_TRUE(started2);
EXPECT_FALSE(acked2);
EXPECT_CALL(event_engine, Cancel(EventEngine::TaskHandle{123, 789}))
.WillOnce(Return(true));
EXPECT_TRUE(callbacks.AckPing(id2, &event_engine));
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started1);
EXPECT_TRUE(acked1);
EXPECT_TRUE(started2);
EXPECT_TRUE(acked2);
}
TEST(PingCallbacksTest, MultiPingRoundtripsWithOutOfOrderAcks) {
StrictMock<MockEventEngine> event_engine;
absl::BitGen bitgen;
Chttp2PingCallbacks callbacks;
bool started1 = false;
bool acked1 = false;
bool started2 = false;
bool acked2 = false;
EXPECT_FALSE(callbacks.ping_requested());
callbacks.OnPing(
[&started1] {
EXPECT_FALSE(started1);
started1 = true;
},
[&acked1] {
EXPECT_FALSE(acked1);
acked1 = true;
});
EXPECT_TRUE(callbacks.ping_requested());
EXPECT_FALSE(started1);
EXPECT_FALSE(acked1);
EXPECT_FALSE(started2);
EXPECT_FALSE(acked2);
EXPECT_CALL(event_engine, RunAfter(EventEngine::Duration(Duration::Hours(24)),
Matcher<absl::AnyInvocable<void()>>(_)))
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id1 = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started1);
EXPECT_FALSE(acked1);
EXPECT_FALSE(started2);
EXPECT_FALSE(acked2);
callbacks.OnPing(
[&started2] {
EXPECT_FALSE(started2);
started2 = true;
},
[&acked2] {
EXPECT_FALSE(acked2);
acked2 = true;
});
EXPECT_TRUE(callbacks.ping_requested());
EXPECT_TRUE(started1);
EXPECT_FALSE(acked1);
EXPECT_FALSE(started2);
EXPECT_FALSE(acked2);
EXPECT_CALL(event_engine, RunAfter(EventEngine::Duration(Duration::Hours(24)),
Matcher<absl::AnyInvocable<void()>>(_)))
.WillOnce([]() {
return EventEngine::TaskHandle{123, 789};
});
auto id2 = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
EXPECT_NE(id1, id2);
EXPECT_TRUE(started1);
EXPECT_FALSE(acked1);
EXPECT_TRUE(started2);
EXPECT_FALSE(acked2);
EXPECT_CALL(event_engine, Cancel(EventEngine::TaskHandle{123, 789}))
.WillOnce(Return(true));
EXPECT_TRUE(callbacks.AckPing(id2, &event_engine));
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started1);
EXPECT_FALSE(acked1);
EXPECT_TRUE(started2);
EXPECT_TRUE(acked2);
EXPECT_CALL(event_engine, Cancel(EventEngine::TaskHandle{123, 456}))
.WillOnce(Return(true));
EXPECT_TRUE(callbacks.AckPing(id1, &event_engine));
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started1);
EXPECT_TRUE(acked1);
EXPECT_TRUE(started2);
EXPECT_TRUE(acked2);
}
TEST(PingCallbacksTest, CoalescedPingsRoundtrip) {
StrictMock<MockEventEngine> event_engine;
absl::BitGen bitgen;
Chttp2PingCallbacks callbacks;
bool started1 = false;
bool acked1 = false;
bool started2 = false;
bool acked2 = false;
EXPECT_FALSE(callbacks.ping_requested());
callbacks.OnPing(
[&started1] {
EXPECT_FALSE(started1);
started1 = true;
},
[&acked1] {
EXPECT_FALSE(acked1);
acked1 = true;
});
callbacks.OnPing(
[&started2] {
EXPECT_FALSE(started2);
started2 = true;
},
[&acked2] {
EXPECT_FALSE(acked2);
acked2 = true;
});
EXPECT_TRUE(callbacks.ping_requested());
EXPECT_FALSE(started1);
EXPECT_FALSE(acked1);
EXPECT_FALSE(started2);
EXPECT_FALSE(acked2);
EXPECT_CALL(event_engine, RunAfter(EventEngine::Duration(Duration::Hours(24)),
Matcher<absl::AnyInvocable<void()>>(_)))
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started1);
EXPECT_FALSE(acked1);
EXPECT_TRUE(started2);
EXPECT_FALSE(acked2);
EXPECT_CALL(event_engine, Cancel(EventEngine::TaskHandle{123, 456}))
.WillOnce(Return(true));
EXPECT_TRUE(callbacks.AckPing(id, &event_engine));
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started1);
EXPECT_TRUE(acked1);
EXPECT_TRUE(started2);
EXPECT_TRUE(acked2);
}
TEST(PingCallbacksTest, CancelAllCancelsCallbacks) {
StrictMock<MockEventEngine> event_engine;
absl::BitGen bitgen;
Chttp2PingCallbacks callbacks;
bool started = false;
bool acked = false;
EXPECT_FALSE(callbacks.ping_requested());
callbacks.OnPing(
[&started] {
EXPECT_FALSE(started);
started = true;
},
[&acked] {
EXPECT_FALSE(acked);
acked = true;
});
EXPECT_TRUE(callbacks.ping_requested());
callbacks.CancelAll(&event_engine);
EXPECT_FALSE(started);
EXPECT_FALSE(acked);
EXPECT_FALSE(callbacks.ping_requested());
// Can still send a ping, no callback should be invoked
EXPECT_CALL(event_engine, RunAfter(EventEngine::Duration(Duration::Hours(24)),
Matcher<absl::AnyInvocable<void()>>(_)))
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
EXPECT_FALSE(started);
EXPECT_FALSE(acked);
EXPECT_CALL(event_engine, Cancel(EventEngine::TaskHandle{123, 456}))
.WillOnce(Return(true));
EXPECT_TRUE(callbacks.AckPing(id, &event_engine));
EXPECT_FALSE(started);
EXPECT_FALSE(acked);
EXPECT_FALSE(callbacks.ping_requested());
}
TEST(PingCallbacksTest, CancelAllCancelsInflightPings) {
StrictMock<MockEventEngine> event_engine;
absl::BitGen bitgen;
Chttp2PingCallbacks callbacks;
bool started = false;
bool acked = false;
EXPECT_FALSE(callbacks.ping_requested());
callbacks.OnPing(
[&started] {
EXPECT_FALSE(started);
started = true;
},
[&acked] {
EXPECT_FALSE(acked);
acked = true;
});
EXPECT_TRUE(callbacks.ping_requested());
EXPECT_FALSE(started);
EXPECT_FALSE(acked);
EXPECT_CALL(event_engine, RunAfter(EventEngine::Duration(Duration::Hours(24)),
Matcher<absl::AnyInvocable<void()>>(_)))
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started);
EXPECT_FALSE(acked);
EXPECT_CALL(event_engine, Cancel(EventEngine::TaskHandle{123, 456}))
.WillOnce(Return(true));
callbacks.CancelAll(&event_engine);
// Ensure Cancel call comes from CancelAll
testing::Mock::VerifyAndClearExpectations(&event_engine);
EXPECT_FALSE(acked);
EXPECT_FALSE(callbacks.ping_requested());
// Ping should still be valid, but no callback should be invoked
EXPECT_TRUE(callbacks.AckPing(id, &event_engine));
EXPECT_FALSE(acked);
EXPECT_FALSE(callbacks.ping_requested());
}
} // namespace
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -58,7 +58,7 @@ TEST_F(ConfigurationTest, ClientKeepaliveDefaults) {
grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(
grpc_create_chttp2_transport(args_, mock_endpoint_, /*is_client=*/true));
EXPECT_EQ(t->keepalive_time, Duration::Infinity());
EXPECT_EQ(t->keepalive_timeout, Duration::Seconds(20));
EXPECT_EQ(t->keepalive_timeout, Duration::Infinity());
EXPECT_EQ(t->keepalive_permit_without_calls, false);
EXPECT_EQ(t->ping_rate_policy.TestOnlyMaxPingsWithoutData(), 2);
grpc_transport_destroy(&t->base);

@ -42,32 +42,45 @@ TEST(PingRatePolicy, NoOpServer) {
TEST(PingRatePolicy, ServerCanSendAtStart) {
Chttp2PingRatePolicy policy{ChannelArgs(), false};
EXPECT_EQ(policy.RequestSendPing(Duration::Milliseconds(100)), SendGranted());
EXPECT_EQ(policy.RequestSendPing(Duration::Milliseconds(100), 0),
SendGranted());
}
TEST(PingRatePolicy, ClientBlockedUntilDataSent) {
Chttp2PingRatePolicy policy{ChannelArgs(), true};
EXPECT_EQ(policy.RequestSendPing(Duration::Milliseconds(10)),
EXPECT_EQ(policy.RequestSendPing(Duration::Milliseconds(10), 0),
TooManyRecentPings());
policy.ResetPingsBeforeDataRequired();
EXPECT_EQ(policy.RequestSendPing(Duration::Milliseconds(10)), SendGranted());
EXPECT_EQ(policy.RequestSendPing(Duration::Zero()), SendGranted());
EXPECT_EQ(policy.RequestSendPing(Duration::Zero()), TooManyRecentPings());
EXPECT_EQ(policy.RequestSendPing(Duration::Milliseconds(10), 0),
SendGranted());
policy.SentPing();
EXPECT_EQ(policy.RequestSendPing(Duration::Zero(), 0), SendGranted());
policy.SentPing();
EXPECT_EQ(policy.RequestSendPing(Duration::Zero(), 0), TooManyRecentPings());
}
TEST(PingRatePolicy, RateThrottlingWorks) {
Chttp2PingRatePolicy policy{ChannelArgs(), false};
// Observe that we can fail if we send in a tight loop
while (policy.RequestSendPing(Duration::Milliseconds(10)) == SendGranted()) {
while (policy.RequestSendPing(Duration::Milliseconds(10), 0) ==
SendGranted()) {
policy.SentPing();
}
// Observe that we can succeed if we wait a bit between pings
for (int i = 0; i < 100; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
EXPECT_EQ(policy.RequestSendPing(Duration::Milliseconds(10)),
EXPECT_EQ(policy.RequestSendPing(Duration::Milliseconds(10), 0),
SendGranted());
policy.SentPing();
}
}
TEST(PingRatePolicy, TooManyPingsInflightBlocksSendingPings) {
Chttp2PingRatePolicy policy{ChannelArgs(), false};
EXPECT_EQ(policy.RequestSendPing(Duration::Milliseconds(1), 100000000),
TooManyRecentPings());
}
} // namespace
} // namespace grpc_core

@ -1317,6 +1317,8 @@ src/core/ext/transport/chttp2/transport/legacy_frame.h \
src/core/ext/transport/chttp2/transport/parsing.cc \
src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc \
src/core/ext/transport/chttp2/transport/ping_abuse_policy.h \
src/core/ext/transport/chttp2/transport/ping_callbacks.cc \
src/core/ext/transport/chttp2/transport/ping_callbacks.h \
src/core/ext/transport/chttp2/transport/ping_rate_policy.cc \
src/core/ext/transport/chttp2/transport/ping_rate_policy.h \
src/core/ext/transport/chttp2/transport/stream_lists.cc \

@ -1093,6 +1093,8 @@ src/core/ext/transport/chttp2/transport/legacy_frame.h \
src/core/ext/transport/chttp2/transport/parsing.cc \
src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc \
src/core/ext/transport/chttp2/transport/ping_abuse_policy.h \
src/core/ext/transport/chttp2/transport/ping_callbacks.cc \
src/core/ext/transport/chttp2/transport/ping_callbacks.h \
src/core/ext/transport/chttp2/transport/ping_rate_policy.cc \
src/core/ext/transport/chttp2/transport/ping_rate_policy.h \
src/core/ext/transport/chttp2/transport/stream_lists.cc \

@ -6411,6 +6411,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "ping_callbacks_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save