[chttp2] Delay starting ping timeout timer until writes complete (#34589)

We probably shouldn't count the time it takes us to write out data as
part of the ping timeout
pull/34598/head^2
Craig Tiller 1 year ago committed by GitHub
parent 3b0916fc0a
commit 7fabc61f07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/core/BUILD
  2. 26
      src/core/ext/transport/chttp2/transport/ping_callbacks.cc
  3. 28
      src/core/ext/transport/chttp2/transport/ping_callbacks.h
  4. 21
      src/core/ext/transport/chttp2/transport/writing.cc
  5. 109
      test/core/transport/chttp2/ping_callbacks_test.cc

@ -5704,6 +5704,7 @@ grpc_cc_library(
deps = [
"time",
"//:event_engine_base_hdrs",
"//:gpr",
"//:gpr_platform",
],
)

@ -21,6 +21,8 @@
#include "absl/meta/type_traits.h"
#include "absl/random/distributions.h"
#include <grpc/support/log.h>
namespace grpc_core {
void Chttp2PingCallbacks::OnPing(Callback on_start, Callback on_ack) {
@ -39,9 +41,7 @@ void Chttp2PingCallbacks::OnPingAck(Callback on_ack) {
on_ack_.emplace_back(std::move(on_ack));
}
uint64_t Chttp2PingCallbacks::StartPing(
absl::BitGenRef bitgen, Duration ping_timeout, Callback on_timeout,
grpc_event_engine::experimental::EventEngine* event_engine) {
uint64_t Chttp2PingCallbacks::StartPing(absl::BitGenRef bitgen) {
uint64_t id;
do {
id = absl::Uniform<uint64_t>(bitgen);
@ -50,13 +50,7 @@ uint64_t Chttp2PingCallbacks::StartPing(
CallbackVec().swap(on_start_);
InflightPing inflight;
inflight.on_ack.swap(on_ack_);
if (ping_timeout != Duration::Infinity()) {
inflight.on_timeout =
event_engine->RunAfter(ping_timeout, std::move(on_timeout));
} else {
inflight.on_timeout =
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
}
started_new_ping_without_setting_timeout_ = true;
inflight_.emplace(id, std::move(inflight));
most_recent_inflight_ = id;
ping_requested_ = false;
@ -96,4 +90,16 @@ void Chttp2PingCallbacks::CancelAll(
ping_requested_ = false;
}
void Chttp2PingCallbacks::OnPingTimeout(
Duration ping_timeout,
grpc_event_engine::experimental::EventEngine* event_engine,
Callback callback) {
GPR_ASSERT(started_new_ping_without_setting_timeout_);
started_new_ping_without_setting_timeout_ = false;
auto it = inflight_.find(most_recent_inflight_);
if (it == inflight_.end()) return;
it->second.on_timeout =
event_engine->RunAfter(ping_timeout, std::move(callback));
}
} // namespace grpc_core

@ -42,10 +42,12 @@ class Chttp2PingCallbacks {
// Request a ping (but one we don't need any notification for when it begins
// or ends).
void RequestPing() { ping_requested_ = true; }
// Request a ping, and specify callbacks for when it begins and ends.
// on_start is invoked during the call to StartPing.
// on_ack is invoked during the call to AckPing.
void OnPing(Callback on_start, Callback on_ack);
// Request a notification when *some* ping is acked:
// If there is no ping in flight, one will be scheduled and the callback
// will be invoked when it is acked. (ie as per OnPing([]{}, on_ack)).
@ -56,14 +58,8 @@ class Chttp2PingCallbacks {
// Write path: begin a ping.
// Uses bitgen to generate a randomized id for the ping.
// If ping_timeout is finite, after ping_timeout seconds from when the
// ping is sent the on_timeout callback will be invoked.
GRPC_MUST_USE_RESULT uint64_t
StartPing(absl::BitGenRef bitgen, Duration ping_timeout, Callback on_timeout,
grpc_event_engine::experimental::EventEngine* event_engine);
// Process the ack for one incoming ping.
// Cancel the timeout for the ping.
// Return true if the ping was expected, false otherwise.
// Sets started_new_ping_without_setting_timeout.
GRPC_MUST_USE_RESULT uint64_t StartPing(absl::BitGenRef bitgen);
bool AckPing(uint64_t id,
grpc_event_engine::experimental::EventEngine* event_engine);
@ -80,15 +76,29 @@ class Chttp2PingCallbacks {
// Returns the number of pings currently in flight.
size_t pings_inflight() const { return inflight_.size(); }
// Returns true if a ping was started without setting a timeout yet.
bool started_new_ping_without_setting_timeout() const {
return started_new_ping_without_setting_timeout_;
}
// Add a ping timeout for the most recently started ping.
// started_new_ping_without_setting_timeout must be set.
// Clears started_new_ping_without_setting_timeout.
void OnPingTimeout(Duration ping_timeout,
grpc_event_engine::experimental::EventEngine* event_engine,
Callback callback);
private:
using CallbackVec = std::vector<Callback>;
struct InflightPing {
grpc_event_engine::experimental::EventEngine::TaskHandle on_timeout;
grpc_event_engine::experimental::EventEngine::TaskHandle on_timeout =
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
CallbackVec on_ack;
};
absl::flat_hash_map<uint64_t, InflightPing> inflight_;
uint64_t most_recent_inflight_ = 0;
bool ping_requested_ = false;
bool started_new_ping_without_setting_timeout_ = false;
CallbackVec on_start_;
CallbackVec on_ack_;
};

@ -122,14 +122,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
t->ping_callbacks.pings_inflight()),
[t](grpc_core::Chttp2PingRatePolicy::SendGranted) {
t->ping_rate_policy.SentPing();
const uint64_t id = t->ping_callbacks.StartPing(
t->bitgen, t->keepalive_timeout,
[t = t->Ref()] {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_chttp2_ping_timeout(t);
},
t->event_engine.get());
const uint64_t id = t->ping_callbacks.StartPing(t->bitgen);
grpc_slice_buffer_add(t->outbuf.c_slice_buffer(),
grpc_chttp2_ping_create(false, id));
if (t->channelz_socket != nullptr) {
@ -685,6 +678,18 @@ void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error) {
}
t->num_messages_in_next_write = 0;
if (t->ping_callbacks.started_new_ping_without_setting_timeout() &&
t->keepalive_timeout != grpc_core::Duration::Infinity()) {
// Set ping timeout after finishing write so we don't measure our own send
// time.
t->ping_callbacks.OnPingTimeout(
t->keepalive_timeout, t->event_engine.get(), [t = t->Ref()] {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_chttp2_ping_timeout(t);
});
}
while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
if (s->sending_bytes != 0) {
update_list(t, s, static_cast<int64_t>(s->sending_bytes),

@ -54,6 +54,45 @@ TEST(PingCallbacksTest, OnPingAckRequestsPing) {
EXPECT_TRUE(callbacks.ping_requested());
}
TEST(PingCallbacksTest, PingAckBeforeTimerStarted) {
StrictMock<MockEventEngine> event_engine;
absl::BitGen bitgen;
Chttp2PingCallbacks callbacks;
bool started = false;
bool acked = false;
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_FALSE(callbacks.started_new_ping_without_setting_timeout());
// Request ping
callbacks.OnPing(
[&started] {
EXPECT_FALSE(started);
started = true;
},
[&acked] {
EXPECT_FALSE(acked);
acked = true;
});
EXPECT_TRUE(callbacks.ping_requested());
EXPECT_FALSE(callbacks.started_new_ping_without_setting_timeout());
EXPECT_EQ(callbacks.pings_inflight(), 0);
EXPECT_FALSE(started);
EXPECT_FALSE(acked);
auto id = callbacks.StartPing(bitgen);
EXPECT_TRUE(callbacks.started_new_ping_without_setting_timeout());
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_EQ(callbacks.pings_inflight(), 1);
EXPECT_TRUE(started);
EXPECT_FALSE(acked);
callbacks.AckPing(id, &event_engine);
EXPECT_TRUE(callbacks.started_new_ping_without_setting_timeout());
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_EQ(callbacks.pings_inflight(), 0);
EXPECT_TRUE(started);
EXPECT_TRUE(acked);
callbacks.OnPingTimeout(Duration::Milliseconds(1), &event_engine,
[] { Crash("should never reach here"); });
}
TEST(PingCallbacksTest, PingRoundtrips) {
StrictMock<MockEventEngine> event_engine;
absl::BitGen bitgen;
@ -82,9 +121,9 @@ TEST(PingCallbacksTest, PingRoundtrips) {
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
auto id = callbacks.StartPing(bitgen);
callbacks.OnPingTimeout(Duration::Hours(24), &event_engine,
[] { Crash("should not reach here"); });
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_EQ(callbacks.pings_inflight(), 1);
EXPECT_TRUE(started);
@ -120,9 +159,7 @@ TEST(PingCallbacksTest, PingRoundtripsWithInfiniteTimeout) {
EXPECT_EQ(callbacks.pings_inflight(), 0);
EXPECT_FALSE(started);
EXPECT_FALSE(acked);
auto id = callbacks.StartPing(
bitgen, Duration::Infinity(), [] { Crash("should not reach here"); },
&event_engine);
auto id = callbacks.StartPing(bitgen);
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_EQ(callbacks.pings_inflight(), 1);
EXPECT_TRUE(started);
@ -164,9 +201,9 @@ TEST(PingCallbacksTest, DuplicatePingIdFlagsError) {
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
auto id = callbacks.StartPing(bitgen);
callbacks.OnPingTimeout(Duration::Hours(24), &event_engine,
[] { Crash("should not reach here"); });
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started);
EXPECT_FALSE(acked);
@ -206,9 +243,9 @@ TEST(PingCallbacksTest, OnPingAckCanPiggybackInflightPings) {
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
auto id = callbacks.StartPing(bitgen);
callbacks.OnPingTimeout(Duration::Hours(24), &event_engine,
[] { Crash("should not reach here"); });
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started);
EXPECT_FALSE(acked_first);
@ -247,9 +284,9 @@ TEST(PingCallbacksTest, PingAckRoundtrips) {
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
auto id = callbacks.StartPing(bitgen);
callbacks.OnPingTimeout(Duration::Hours(24), &event_engine,
[] { Crash("should not reach here"); });
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_FALSE(acked);
EXPECT_CALL(event_engine, Cancel(EventEngine::TaskHandle{123, 456}))
@ -287,9 +324,9 @@ TEST(PingCallbacksTest, MultiPingRoundtrips) {
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id1 = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
auto id1 = callbacks.StartPing(bitgen);
callbacks.OnPingTimeout(Duration::Hours(24), &event_engine,
[] { Crash("should not reach here"); });
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started1);
EXPECT_FALSE(acked1);
@ -314,9 +351,9 @@ TEST(PingCallbacksTest, MultiPingRoundtrips) {
.WillOnce([]() {
return EventEngine::TaskHandle{123, 789};
});
auto id2 = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
auto id2 = callbacks.StartPing(bitgen);
callbacks.OnPingTimeout(Duration::Hours(24), &event_engine,
[] { Crash("should not reach here"); });
EXPECT_NE(id1, id2);
EXPECT_TRUE(started1);
EXPECT_FALSE(acked1);
@ -368,9 +405,9 @@ TEST(PingCallbacksTest, MultiPingRoundtripsWithOutOfOrderAcks) {
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id1 = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
auto id1 = callbacks.StartPing(bitgen);
callbacks.OnPingTimeout(Duration::Hours(24), &event_engine,
[] { Crash("should not reach here"); });
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started1);
EXPECT_FALSE(acked1);
@ -395,9 +432,9 @@ TEST(PingCallbacksTest, MultiPingRoundtripsWithOutOfOrderAcks) {
.WillOnce([]() {
return EventEngine::TaskHandle{123, 789};
});
auto id2 = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
auto id2 = callbacks.StartPing(bitgen);
callbacks.OnPingTimeout(Duration::Hours(24), &event_engine,
[] { Crash("should not reach here"); });
EXPECT_NE(id1, id2);
EXPECT_TRUE(started1);
EXPECT_FALSE(acked1);
@ -458,9 +495,9 @@ TEST(PingCallbacksTest, CoalescedPingsRoundtrip) {
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
auto id = callbacks.StartPing(bitgen);
callbacks.OnPingTimeout(Duration::Hours(24), &event_engine,
[] { Crash("should not reach here"); });
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started1);
EXPECT_FALSE(acked1);
@ -503,9 +540,9 @@ TEST(PingCallbacksTest, CancelAllCancelsCallbacks) {
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
auto id = callbacks.StartPing(bitgen);
callbacks.OnPingTimeout(Duration::Hours(24), &event_engine,
[] { Crash("should not reach here"); });
EXPECT_FALSE(started);
EXPECT_FALSE(acked);
EXPECT_CALL(event_engine, Cancel(EventEngine::TaskHandle{123, 456}))
@ -540,9 +577,9 @@ TEST(PingCallbacksTest, CancelAllCancelsInflightPings) {
.WillOnce([]() {
return EventEngine::TaskHandle{123, 456};
});
auto id = callbacks.StartPing(
bitgen, Duration::Hours(24), [] { Crash("should not reach here"); },
&event_engine);
auto id = callbacks.StartPing(bitgen);
callbacks.OnPingTimeout(Duration::Hours(24), &event_engine,
[] { Crash("should not reach here"); });
EXPECT_FALSE(callbacks.ping_requested());
EXPECT_TRUE(started);
EXPECT_FALSE(acked);

Loading…
Cancel
Save