[EventEngine] RunAfter migration: grpc_chttp2_transport (#32240)

* initial commit, bdp timer

* migrate keepalive_ping_timer and keepalive_watchdog_timer

* still has some issue with refcount

* fix refcount

with the help from refcount_solver.py :^)

* migrate delayed_ping_timer

* add some GPR_ASSERTs

* comment

* review

* try mutex/lock

* fix build deps

* use GRPC_UNUSED

* review
pull/32331/head
Yijie Ma 2 years ago committed by GitHub
parent b453af1ecf
commit 1b3eadedfe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 268
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 29
      src/core/ext/transport/chttp2/transport/internal.h
  3. 17
      src/core/ext/transport/chttp2/transport/writing.cc

@ -39,6 +39,7 @@
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include "absl/types/variant.h" #include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/impl/connectivity_state.h> #include <grpc/impl/connectivity_state.h>
#include <grpc/slice_buffer.h> #include <grpc/slice_buffer.h>
@ -174,24 +175,28 @@ static void start_bdp_ping(void* tp, grpc_error_handle error);
static void finish_bdp_ping(void* tp, grpc_error_handle error); static void finish_bdp_ping(void* tp, grpc_error_handle error);
static void start_bdp_ping_locked(void* tp, grpc_error_handle error); static void start_bdp_ping_locked(void* tp, grpc_error_handle error);
static void finish_bdp_ping_locked(void* tp, grpc_error_handle error); static void finish_bdp_ping_locked(void* tp, grpc_error_handle error);
static void next_bdp_ping_timer_expired(void* tp, grpc_error_handle error); static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t);
static void next_bdp_ping_timer_expired_locked(void* tp, static void next_bdp_ping_timer_expired_locked(
grpc_error_handle error); void* tp, GRPC_UNUSED grpc_error_handle error);
static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error); static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error);
static void send_ping_locked(grpc_chttp2_transport* t, static void send_ping_locked(grpc_chttp2_transport* t,
grpc_closure* on_initiate, grpc_closure* on_ack); grpc_closure* on_initiate, grpc_closure* on_ack);
static void retry_initiate_ping_locked(void* tp, grpc_error_handle error); static void retry_initiate_ping_locked(void* tp,
GRPC_UNUSED grpc_error_handle error);
// keepalive-relevant functions // keepalive-relevant functions
static void init_keepalive_ping(void* arg, grpc_error_handle error); static void init_keepalive_ping(grpc_chttp2_transport* t);
static void init_keepalive_ping_locked(void* arg, grpc_error_handle error); static void init_keepalive_ping_locked(void* arg,
GRPC_UNUSED grpc_error_handle error);
static void start_keepalive_ping(void* arg, grpc_error_handle error); static void start_keepalive_ping(void* arg, grpc_error_handle error);
static void finish_keepalive_ping(void* arg, grpc_error_handle error); static void finish_keepalive_ping(void* arg, grpc_error_handle error);
static void start_keepalive_ping_locked(void* arg, grpc_error_handle error); static void start_keepalive_ping_locked(void* arg, grpc_error_handle error);
static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error); static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error);
static void keepalive_watchdog_fired(void* arg, grpc_error_handle error); static void keepalive_watchdog_fired(grpc_chttp2_transport* t);
static void keepalive_watchdog_fired_locked(void* arg, grpc_error_handle error); static void keepalive_watchdog_fired_locked(
void* arg, GRPC_UNUSED grpc_error_handle error);
static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t);
namespace grpc_core { namespace grpc_core {
@ -226,6 +231,8 @@ void TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(
grpc_chttp2_transport::~grpc_chttp2_transport() { grpc_chttp2_transport::~grpc_chttp2_transport() {
size_t i; size_t i;
event_engine.reset();
if (channelz_socket != nullptr) { if (channelz_socket != nullptr) {
channelz_socket.reset(); channelz_socket.reset();
} }
@ -447,18 +454,22 @@ static void configure_transport_ping_policy(grpc_chttp2_transport* t) {
g_default_min_recv_ping_interval_without_data_ms); g_default_min_recv_ping_interval_without_data_ms);
} }
static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) { static void init_keepalive_pings_if_enabled_locked(
void* arg, GRPC_UNUSED grpc_error_handle error) {
GPR_DEBUG_ASSERT(error.ok());
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
if (t->keepalive_time != grpc_core::Duration::Infinity()) { if (t->keepalive_time != grpc_core::Duration::Infinity()) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t, t->keepalive_ping_timer_handle =
grpc_schedule_on_exec_ctx); t->event_engine->RunAfter(t->keepalive_time, [t] {
grpc_timer_init(&t->keepalive_ping_timer, grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::Timestamp::Now() + t->keepalive_time, grpc_core::ExecCtx exec_ctx;
&t->init_keepalive_ping_locked); init_keepalive_ping(t);
});
} else { } else {
// Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no // Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
// inflight keeaplive timers // inflight keepalive timers
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
} }
} }
@ -486,7 +497,10 @@ grpc_chttp2_transport::grpc_chttp2_transport(
peer_string.c_str(), peer_string.c_str(),
channel_args.GetBool(GRPC_ARG_HTTP2_BDP_PROBE).value_or(true), channel_args.GetBool(GRPC_ARG_HTTP2_BDP_PROBE).value_or(true),
&memory_owner), &memory_owner),
deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) { deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0),
event_engine(
channel_args
.GetObjectRef<grpc_event_engine::experimental::EventEngine>()) {
GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
base.vtable = get_vtable(); base.vtable = get_vtable();
@ -531,13 +545,16 @@ grpc_chttp2_transport::grpc_chttp2_transport(
// No pings allowed before receiving a header or data frame. // No pings allowed before receiving a header or data frame.
ping_state.pings_before_data_required = 0; ping_state.pings_before_data_required = 0;
ping_state.is_delayed_ping_timer_set = false;
ping_state.last_ping_sent_time = grpc_core::Timestamp::InfPast(); ping_state.last_ping_sent_time = grpc_core::Timestamp::InfPast();
ping_recv_state.last_ping_recv_time = grpc_core::Timestamp::InfPast(); ping_recv_state.last_ping_recv_time = grpc_core::Timestamp::InfPast();
ping_recv_state.ping_strikes = 0; ping_recv_state.ping_strikes = 0;
init_keepalive_pings_if_enabled(this); grpc_core::ExecCtx exec_ctx;
combiner->Run(
GRPC_CLOSURE_INIT(&init_keepalive_ping_locked,
init_keepalive_pings_if_enabled_locked, this, nullptr),
absl::OkStatus());
if (flow_control.bdp_probe()) { if (flow_control.bdp_probe()) {
bdp_ping_blocked = true; bdp_ping_blocked = true;
@ -593,19 +610,40 @@ static void close_transport_locked(grpc_chttp2_transport* t,
t->closed_with_error = error; t->closed_with_error = error;
connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(), connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(),
"close_transport"); "close_transport");
if (t->ping_state.is_delayed_ping_timer_set) { if (t->ping_state.delayed_ping_timer_handle.has_value()) {
grpc_timer_cancel(&t->ping_state.delayed_ping_timer); if (t->event_engine->Cancel(*t->ping_state.delayed_ping_timer_handle)) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
t->ping_state.delayed_ping_timer_handle.reset();
}
} }
if (t->have_next_bdp_ping_timer) { if (t->next_bdp_ping_timer_handle.has_value()) {
grpc_timer_cancel(&t->next_bdp_ping_timer); if (t->event_engine->Cancel(*t->next_bdp_ping_timer_handle)) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
t->next_bdp_ping_timer_handle.reset();
}
} }
switch (t->keepalive_state) { switch (t->keepalive_state) {
case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
grpc_timer_cancel(&t->keepalive_ping_timer); if (t->keepalive_ping_timer_handle.has_value()) {
if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
t->keepalive_ping_timer_handle.reset();
}
}
break; break;
case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING: case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
grpc_timer_cancel(&t->keepalive_ping_timer); if (t->keepalive_ping_timer_handle.has_value()) {
grpc_timer_cancel(&t->keepalive_watchdog_timer); if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
t->keepalive_ping_timer_handle.reset();
}
}
if (t->keepalive_watchdog_timer_handle.has_value()) {
if (t->event_engine->Cancel(*t->keepalive_watchdog_timer_handle)) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
t->keepalive_watchdog_timer_handle.reset();
}
}
break; break;
case GRPC_CHTTP2_KEEPALIVE_STATE_DYING: case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED: case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
@ -1556,19 +1594,19 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
absl::OkStatus()); absl::OkStatus());
} }
void grpc_chttp2_retry_initiate_ping(void* tp, grpc_error_handle error) { void grpc_chttp2_retry_initiate_ping(grpc_chttp2_transport* t) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
t->combiner->Run(GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, t->combiner->Run(GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked,
retry_initiate_ping_locked, t, nullptr), retry_initiate_ping_locked, t, nullptr),
error); absl::OkStatus());
} }
static void retry_initiate_ping_locked(void* tp, grpc_error_handle error) { static void retry_initiate_ping_locked(void* tp,
GRPC_UNUSED grpc_error_handle error) {
GPR_DEBUG_ASSERT(error.ok());
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
t->ping_state.is_delayed_ping_timer_set = false; GPR_ASSERT(t->ping_state.delayed_ping_timer_handle.has_value());
if (error.ok()) { t->ping_state.delayed_ping_timer_handle.reset();
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
}
GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
} }
@ -2457,7 +2495,7 @@ static void read_action_locked(void* tp, grpc_error_handle error) {
keep_reading = true; keep_reading = true;
// Since we have read a byte, reset the keepalive timer // Since we have read a byte, reset the keepalive timer
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
grpc_timer_cancel(&t->keepalive_ping_timer); maybe_reset_keepalive_ping_timer_locked(t);
} }
} }
grpc_slice_buffer_reset_and_unref(&t->read_buffer); grpc_slice_buffer_reset_and_unref(&t->read_buffer);
@ -2517,7 +2555,7 @@ static void start_bdp_ping_locked(void* tp, grpc_error_handle error) {
} }
// Reset the keepalive ping timer // Reset the keepalive ping timer
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
grpc_timer_cancel(&t->keepalive_ping_timer); maybe_reset_keepalive_ping_timer_locked(t);
} }
t->flow_control.bdp_estimator()->StartPing(); t->flow_control.bdp_estimator()->StartPing();
t->bdp_ping_started = true; t->bdp_ping_started = true;
@ -2553,31 +2591,28 @@ static void finish_bdp_ping_locked(void* tp, grpc_error_handle error) {
t->flow_control.bdp_estimator()->CompletePing(); t->flow_control.bdp_estimator()->CompletePing();
grpc_chttp2_act_on_flowctl_action(t->flow_control.PeriodicUpdate(), t, grpc_chttp2_act_on_flowctl_action(t->flow_control.PeriodicUpdate(), t,
nullptr); nullptr);
GPR_ASSERT(!t->have_next_bdp_ping_timer); GPR_ASSERT(!t->next_bdp_ping_timer_handle.has_value());
t->have_next_bdp_ping_timer = true; t->next_bdp_ping_timer_handle =
GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked, t->event_engine->RunAfter(next_ping - grpc_core::Timestamp::Now(), [t] {
next_bdp_ping_timer_expired, t, grpc_schedule_on_exec_ctx); grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_timer_init(&t->next_bdp_ping_timer, next_ping, grpc_core::ExecCtx exec_ctx;
&t->next_bdp_ping_timer_expired_locked); next_bdp_ping_timer_expired(t);
});
} }
static void next_bdp_ping_timer_expired(void* tp, grpc_error_handle error) { static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
t->combiner->Run( t->combiner->Run(
GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked, GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
next_bdp_ping_timer_expired_locked, t, nullptr), next_bdp_ping_timer_expired_locked, t, nullptr),
error); absl::OkStatus());
} }
static void next_bdp_ping_timer_expired_locked(void* tp, static void next_bdp_ping_timer_expired_locked(
grpc_error_handle error) { void* tp, GRPC_UNUSED grpc_error_handle error) {
GPR_DEBUG_ASSERT(error.ok());
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
GPR_ASSERT(t->have_next_bdp_ping_timer); GPR_ASSERT(t->next_bdp_ping_timer_handle.has_value());
t->have_next_bdp_ping_timer = false; t->next_bdp_ping_timer_handle.reset();
if (!error.ok()) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
return;
}
if (t->flow_control.bdp_estimator()->accumulator() == 0) { if (t->flow_control.bdp_estimator()->accumulator() == 0) {
// Block the bdp ping till we receive more data. // Block the bdp ping till we receive more data.
t->bdp_ping_blocked = true; t->bdp_ping_blocked = true;
@ -2646,47 +2681,37 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
} }
} }
static void init_keepalive_ping(void* arg, grpc_error_handle error) { static void init_keepalive_ping(grpc_chttp2_transport* t) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
t->combiner->Run(GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, t->combiner->Run(GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked,
init_keepalive_ping_locked, t, nullptr), init_keepalive_ping_locked, t, nullptr),
error); absl::OkStatus());
} }
static void init_keepalive_ping_locked(void* arg, grpc_error_handle error) { static void init_keepalive_ping_locked(void* arg,
GRPC_UNUSED grpc_error_handle error) {
GPR_DEBUG_ASSERT(error.ok());
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
GPR_ASSERT(t->keepalive_ping_timer_handle.has_value());
t->keepalive_ping_timer_handle.reset();
if (t->destroying || !t->closed_with_error.ok()) { if (t->destroying || !t->closed_with_error.ok()) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
} else if (error.ok()) { } else {
if (t->keepalive_permit_without_calls || if (t->keepalive_permit_without_calls ||
grpc_chttp2_stream_map_size(&t->stream_map) > 0) { grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
grpc_timer_init_unset(&t->keepalive_watchdog_timer);
send_keepalive_ping_locked(t); send_keepalive_ping_locked(t);
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
} else { } else {
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t, t->keepalive_ping_timer_handle =
grpc_schedule_on_exec_ctx); t->event_engine->RunAfter(t->keepalive_time, [t] {
grpc_timer_init(&t->keepalive_ping_timer, grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::Timestamp::Now() + t->keepalive_time, grpc_core::ExecCtx exec_ctx;
&t->init_keepalive_ping_locked); init_keepalive_ping(t);
} });
} else if (error == absl::CancelledError()) {
// The keepalive ping timer may be cancelled by bdp
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
gpr_log(GPR_INFO, "%s: Keepalive ping cancelled. Resetting timer.",
t->peer_string.c_str());
} }
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::Timestamp::Now() + t->keepalive_time,
&t->init_keepalive_ping_locked);
} }
GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
} }
@ -2711,11 +2736,12 @@ static void start_keepalive_ping_locked(void* arg, grpc_error_handle error) {
gpr_log(GPR_INFO, "%s: Start keepalive ping", t->peer_string.c_str()); gpr_log(GPR_INFO, "%s: Start keepalive ping", t->peer_string.c_str());
} }
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked, t->keepalive_watchdog_timer_handle =
keepalive_watchdog_fired, t, grpc_schedule_on_exec_ctx); t->event_engine->RunAfter(t->keepalive_timeout, [t] {
grpc_timer_init(&t->keepalive_watchdog_timer, grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::Timestamp::Now() + t->keepalive_timeout, grpc_core::ExecCtx exec_ctx;
&t->keepalive_watchdog_fired_locked); keepalive_watchdog_fired(t);
});
t->keepalive_ping_started = true; t->keepalive_ping_started = true;
} }
@ -2745,50 +2771,76 @@ static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error) {
} }
t->keepalive_ping_started = false; t->keepalive_ping_started = false;
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
grpc_timer_cancel(&t->keepalive_watchdog_timer); if (t->keepalive_watchdog_timer_handle.has_value()) {
if (t->event_engine->Cancel(*t->keepalive_watchdog_timer_handle)) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
t->keepalive_watchdog_timer_handle.reset();
}
}
GPR_ASSERT(!t->keepalive_ping_timer_handle.has_value());
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t, t->keepalive_ping_timer_handle =
grpc_schedule_on_exec_ctx); t->event_engine->RunAfter(t->keepalive_time, [t] {
grpc_timer_init(&t->keepalive_ping_timer, grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::Timestamp::Now() + t->keepalive_time, grpc_core::ExecCtx exec_ctx;
&t->init_keepalive_ping_locked); init_keepalive_ping(t);
});
} }
} }
GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
} }
static void keepalive_watchdog_fired(void* arg, grpc_error_handle error) { static void keepalive_watchdog_fired(grpc_chttp2_transport* t) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
t->combiner->Run( t->combiner->Run(
GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked, GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
keepalive_watchdog_fired_locked, t, nullptr), keepalive_watchdog_fired_locked, t, nullptr),
error); absl::OkStatus());
} }
static void keepalive_watchdog_fired_locked(void* arg, static void keepalive_watchdog_fired_locked(
grpc_error_handle error) { void* arg, GRPC_UNUSED grpc_error_handle error) {
GPR_DEBUG_ASSERT(error.ok());
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
GPR_ASSERT(t->keepalive_watchdog_timer_handle.has_value());
t->keepalive_watchdog_timer_handle.reset();
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
if (error.ok()) { gpr_log(GPR_INFO, "%s: Keepalive watchdog fired. Closing transport.",
gpr_log(GPR_INFO, "%s: Keepalive watchdog fired. Closing transport.", t->peer_string.c_str());
t->peer_string.c_str()); t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; close_transport_locked(
close_transport_locked( t, grpc_error_set_int(GRPC_ERROR_CREATE("keepalive watchdog timeout"),
t, grpc_error_set_int(GRPC_ERROR_CREATE("keepalive watchdog timeout"), grpc_core::StatusIntProperty::kRpcStatus,
grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE));
GRPC_STATUS_UNAVAILABLE));
}
} else { } else {
// The watchdog timer should have been cancelled by // If keepalive_state is not PINGING, we consider it as an error. Maybe the
// finish_keepalive_ping_locked. // cancellation failed in finish_keepalive_ping_locked. Users have seen
if (GPR_UNLIKELY(error != absl::CancelledError())) { // other states: https://github.com/grpc/grpc/issues/32085.
gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
}
} }
GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
} }
static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t) {
if (t->keepalive_ping_timer_handle.has_value()) {
if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) {
// Cancel succeeds, resets the keepalive ping timer. Note that we don't
// need to Ref or Unref here since we still hold the Ref.
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
gpr_log(GPR_INFO, "%s: Keepalive ping cancelled. Resetting timer.",
t->peer_string.c_str());
}
t->keepalive_ping_timer_handle =
t->event_engine->RunAfter(t->keepalive_time, [t] {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
init_keepalive_ping(t);
});
}
}
}
// //
// CALLBACK LOOP // CALLBACK LOOP
// //
@ -2857,7 +2909,7 @@ static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
t->combiner->Run(&t->destructive_reclaimer_locked, t->combiner->Run(&t->destructive_reclaimer_locked,
absl::OkStatus()); absl::OkStatus());
} else { } else {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer"); GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
} }
}); });
} }

@ -24,11 +24,13 @@
#include <stddef.h> #include <stddef.h>
#include <stdint.h> #include <stdint.h>
#include <memory>
#include <string> #include <string>
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h> #include <grpc/event_engine/memory_allocator.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/slice.h> #include <grpc/slice.h>
@ -57,7 +59,6 @@
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h" #include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_buffer.h"
@ -147,8 +148,8 @@ struct grpc_chttp2_repeated_ping_policy {
struct grpc_chttp2_repeated_ping_state { struct grpc_chttp2_repeated_ping_state {
grpc_core::Timestamp last_ping_sent_time; grpc_core::Timestamp last_ping_sent_time;
int pings_before_data_required; int pings_before_data_required;
grpc_timer delayed_ping_timer; absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
bool is_delayed_ping_timer_set; delayed_ping_timer_handle;
}; };
struct grpc_chttp2_server_ping_recv_state { struct grpc_chttp2_server_ping_recv_state {
grpc_core::Timestamp last_ping_recv_time; grpc_core::Timestamp last_ping_recv_time;
@ -410,7 +411,6 @@ struct grpc_chttp2_transport
grpc_closure finish_bdp_ping_locked; grpc_closure finish_bdp_ping_locked;
// if non-NULL, close the transport with this error when writes are finished // if non-NULL, close the transport with this error when writes are finished
//
grpc_error_handle close_transport_on_writes_finished; grpc_error_handle close_transport_on_writes_finished;
// a list of closures to run after writes are finished // a list of closures to run after writes are finished
@ -426,25 +426,27 @@ struct grpc_chttp2_transport
/// destructive cleanup closure /// destructive cleanup closure
grpc_closure destructive_reclaimer_locked; grpc_closure destructive_reclaimer_locked;
// next bdp ping timer
bool have_next_bdp_ping_timer = false;
/// If start_bdp_ping_locked has been called /// If start_bdp_ping_locked has been called
bool bdp_ping_started = false; bool bdp_ping_started = false;
grpc_timer next_bdp_ping_timer; // next bdp ping timer handle
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
next_bdp_ping_timer_handle;
// keep-alive ping support // keep-alive ping support
/// Closure to initialize a keepalive ping /// Closure to initialize a keepalive ping
grpc_closure init_keepalive_ping_locked; grpc_closure init_keepalive_ping_locked;
/// Closure to run when the keepalive ping is sent /// Closure to run when the keepalive ping is sent
grpc_closure start_keepalive_ping_locked; grpc_closure start_keepalive_ping_locked;
/// Cousure to run when the keepalive ping ack is received /// Closure to run when the keepalive ping ack is received
grpc_closure finish_keepalive_ping_locked; grpc_closure finish_keepalive_ping_locked;
/// Closrue to run when the keepalive ping timeouts /// Closure to run when the keepalive ping timeouts
grpc_closure keepalive_watchdog_fired_locked; grpc_closure keepalive_watchdog_fired_locked;
/// timer to initiate ping events /// timer to initiate ping events
grpc_timer keepalive_ping_timer; absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
keepalive_ping_timer_handle;
/// watchdog to kill the transport when waiting for the keepalive ping /// watchdog to kill the transport when waiting for the keepalive ping
grpc_timer keepalive_watchdog_timer; absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
keepalive_watchdog_timer_handle;
/// time duration in between pings /// time duration in between pings
grpc_core::Duration keepalive_time; grpc_core::Duration keepalive_time;
/// grace period for a ping to complete before watchdog kicks in /// grace period for a ping to complete before watchdog kicks in
@ -467,8 +469,9 @@ struct grpc_chttp2_transport
bool reading_paused_on_pending_induced_frames = false; bool reading_paused_on_pending_induced_frames = false;
/// Based on channel args, preferred_rx_crypto_frame_sizes are advertised to /// Based on channel args, preferred_rx_crypto_frame_sizes are advertised to
/// the peer /// the peer
///
bool enable_preferred_rx_crypto_frame_advertisement = false; bool enable_preferred_rx_crypto_frame_advertisement = false;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine;
}; };
typedef enum { typedef enum {
@ -803,7 +806,7 @@ void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
bool is_client); bool is_client);
void grpc_chttp2_retry_initiate_ping(void* tp, grpc_error_handle error); void grpc_chttp2_retry_initiate_ping(grpc_chttp2_transport* t);
void schedule_bdp_ping_locked(grpc_chttp2_transport* t); void schedule_bdp_ping_locked(grpc_chttp2_transport* t);

@ -22,11 +22,13 @@
#include <stddef.h> #include <stddef.h>
#include <algorithm> #include <algorithm>
#include <memory>
#include <string> #include <string>
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/slice.h> #include <grpc/slice.h>
#include <grpc/slice_buffer.h> #include <grpc/slice_buffer.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
@ -60,7 +62,6 @@
#include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/bdp_estimator.h" #include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/http2_errors.h" #include "src/core/lib/transport/http2_errors.h"
@ -152,14 +153,14 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
next_allowed_ping.milliseconds_after_process_epoch(), next_allowed_ping.milliseconds_after_process_epoch(),
now.milliseconds_after_process_epoch()); now.milliseconds_after_process_epoch());
} }
if (!t->ping_state.is_delayed_ping_timer_set) { if (!t->ping_state.delayed_ping_timer_handle.has_value()) {
t->ping_state.is_delayed_ping_timer_set = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked"); GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked");
GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, t->ping_state.delayed_ping_timer_handle =
grpc_chttp2_retry_initiate_ping, t, t->event_engine->RunAfter(next_allowed_ping - now, [t] {
grpc_schedule_on_exec_ctx); grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping, grpc_core::ExecCtx exec_ctx;
&t->retry_initiate_ping_locked); grpc_chttp2_retry_initiate_ping(t);
});
} }
return; return;
} }

Loading…
Cancel
Save