Revert "Revert "[EventEngine] RunAfter migration: grpc_chttp2_transport"" (#32341)

Reverts grpc/grpc#32339

Rolling this forward after https://github.com/grpc/grpc/pull/32350.
pull/31933/head^2
Yijie Ma 2 years ago committed by GitHub
parent 007c4073c8
commit b1619f1dc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 268
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 29
      src/core/ext/transport/chttp2/transport/internal.h
  3. 17
      src/core/ext/transport/chttp2/transport/writing.cc

@ -39,6 +39,7 @@
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/slice_buffer.h>
@ -174,24 +175,28 @@ static void start_bdp_ping(void* tp, grpc_error_handle error);
static void finish_bdp_ping(void* tp, grpc_error_handle error);
static void start_bdp_ping_locked(void* tp, grpc_error_handle error);
static void finish_bdp_ping_locked(void* tp, grpc_error_handle error);
static void next_bdp_ping_timer_expired(void* tp, grpc_error_handle error);
static void next_bdp_ping_timer_expired_locked(void* tp,
grpc_error_handle error);
static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t);
static void next_bdp_ping_timer_expired_locked(
void* tp, GRPC_UNUSED grpc_error_handle error);
static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error);
static void send_ping_locked(grpc_chttp2_transport* t,
grpc_closure* on_initiate, grpc_closure* on_ack);
static void retry_initiate_ping_locked(void* tp, grpc_error_handle error);
static void retry_initiate_ping_locked(void* tp,
GRPC_UNUSED grpc_error_handle error);
// keepalive-relevant functions
static void init_keepalive_ping(void* arg, grpc_error_handle error);
static void init_keepalive_ping_locked(void* arg, grpc_error_handle error);
static void init_keepalive_ping(grpc_chttp2_transport* t);
static void init_keepalive_ping_locked(void* arg,
GRPC_UNUSED grpc_error_handle error);
static void start_keepalive_ping(void* arg, grpc_error_handle error);
static void finish_keepalive_ping(void* arg, grpc_error_handle error);
static void start_keepalive_ping_locked(void* arg, grpc_error_handle error);
static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error);
static void keepalive_watchdog_fired(void* arg, grpc_error_handle error);
static void keepalive_watchdog_fired_locked(void* arg, grpc_error_handle error);
static void keepalive_watchdog_fired(grpc_chttp2_transport* t);
static void keepalive_watchdog_fired_locked(
void* arg, GRPC_UNUSED grpc_error_handle error);
static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t);
namespace grpc_core {
@ -226,6 +231,8 @@ void TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(
grpc_chttp2_transport::~grpc_chttp2_transport() {
size_t i;
event_engine.reset();
if (channelz_socket != nullptr) {
channelz_socket.reset();
}
@ -447,18 +454,22 @@ static void configure_transport_ping_policy(grpc_chttp2_transport* t) {
g_default_min_recv_ping_interval_without_data_ms);
}
static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
static void init_keepalive_pings_if_enabled_locked(
void* arg, GRPC_UNUSED grpc_error_handle error) {
GPR_DEBUG_ASSERT(error.ok());
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
if (t->keepalive_time != grpc_core::Duration::Infinity()) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::Timestamp::Now() + t->keepalive_time,
&t->init_keepalive_ping_locked);
t->keepalive_ping_timer_handle =
t->event_engine->RunAfter(t->keepalive_time, [t] {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
init_keepalive_ping(t);
});
} else {
// Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
// inflight keeaplive timers
// inflight keepalive timers
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
}
}
@ -486,7 +497,10 @@ grpc_chttp2_transport::grpc_chttp2_transport(
peer_string.c_str(),
channel_args.GetBool(GRPC_ARG_HTTP2_BDP_PROBE).value_or(true),
&memory_owner),
deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) {
deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0),
event_engine(
channel_args
.GetObjectRef<grpc_event_engine::experimental::EventEngine>()) {
GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
base.vtable = get_vtable();
@ -531,13 +545,16 @@ grpc_chttp2_transport::grpc_chttp2_transport(
// No pings allowed before receiving a header or data frame.
ping_state.pings_before_data_required = 0;
ping_state.is_delayed_ping_timer_set = false;
ping_state.last_ping_sent_time = grpc_core::Timestamp::InfPast();
ping_recv_state.last_ping_recv_time = grpc_core::Timestamp::InfPast();
ping_recv_state.ping_strikes = 0;
init_keepalive_pings_if_enabled(this);
grpc_core::ExecCtx exec_ctx;
combiner->Run(
GRPC_CLOSURE_INIT(&init_keepalive_ping_locked,
init_keepalive_pings_if_enabled_locked, this, nullptr),
absl::OkStatus());
if (flow_control.bdp_probe()) {
bdp_ping_blocked = true;
@ -593,19 +610,40 @@ static void close_transport_locked(grpc_chttp2_transport* t,
t->closed_with_error = error;
connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(),
"close_transport");
if (t->ping_state.is_delayed_ping_timer_set) {
grpc_timer_cancel(&t->ping_state.delayed_ping_timer);
if (t->ping_state.delayed_ping_timer_handle.has_value()) {
if (t->event_engine->Cancel(*t->ping_state.delayed_ping_timer_handle)) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
t->ping_state.delayed_ping_timer_handle.reset();
}
}
if (t->have_next_bdp_ping_timer) {
grpc_timer_cancel(&t->next_bdp_ping_timer);
if (t->next_bdp_ping_timer_handle.has_value()) {
if (t->event_engine->Cancel(*t->next_bdp_ping_timer_handle)) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
t->next_bdp_ping_timer_handle.reset();
}
}
switch (t->keepalive_state) {
case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
grpc_timer_cancel(&t->keepalive_ping_timer);
if (t->keepalive_ping_timer_handle.has_value()) {
if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
t->keepalive_ping_timer_handle.reset();
}
}
break;
case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
grpc_timer_cancel(&t->keepalive_ping_timer);
grpc_timer_cancel(&t->keepalive_watchdog_timer);
if (t->keepalive_ping_timer_handle.has_value()) {
if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
t->keepalive_ping_timer_handle.reset();
}
}
if (t->keepalive_watchdog_timer_handle.has_value()) {
if (t->event_engine->Cancel(*t->keepalive_watchdog_timer_handle)) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
t->keepalive_watchdog_timer_handle.reset();
}
}
break;
case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
@ -1556,19 +1594,19 @@ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
absl::OkStatus());
}
void grpc_chttp2_retry_initiate_ping(void* tp, grpc_error_handle error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
void grpc_chttp2_retry_initiate_ping(grpc_chttp2_transport* t) {
t->combiner->Run(GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked,
retry_initiate_ping_locked, t, nullptr),
error);
absl::OkStatus());
}
static void retry_initiate_ping_locked(void* tp, grpc_error_handle error) {
static void retry_initiate_ping_locked(void* tp,
GRPC_UNUSED grpc_error_handle error) {
GPR_DEBUG_ASSERT(error.ok());
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
t->ping_state.is_delayed_ping_timer_set = false;
if (error.ok()) {
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
}
GPR_ASSERT(t->ping_state.delayed_ping_timer_handle.has_value());
t->ping_state.delayed_ping_timer_handle.reset();
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
}
@ -2457,7 +2495,7 @@ static void read_action_locked(void* tp, grpc_error_handle error) {
keep_reading = true;
// Since we have read a byte, reset the keepalive timer
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
grpc_timer_cancel(&t->keepalive_ping_timer);
maybe_reset_keepalive_ping_timer_locked(t);
}
}
grpc_slice_buffer_reset_and_unref(&t->read_buffer);
@ -2517,7 +2555,7 @@ static void start_bdp_ping_locked(void* tp, grpc_error_handle error) {
}
// Reset the keepalive ping timer
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
grpc_timer_cancel(&t->keepalive_ping_timer);
maybe_reset_keepalive_ping_timer_locked(t);
}
t->flow_control.bdp_estimator()->StartPing();
t->bdp_ping_started = true;
@ -2553,31 +2591,28 @@ static void finish_bdp_ping_locked(void* tp, grpc_error_handle error) {
t->flow_control.bdp_estimator()->CompletePing();
grpc_chttp2_act_on_flowctl_action(t->flow_control.PeriodicUpdate(), t,
nullptr);
GPR_ASSERT(!t->have_next_bdp_ping_timer);
t->have_next_bdp_ping_timer = true;
GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
next_bdp_ping_timer_expired, t, grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->next_bdp_ping_timer, next_ping,
&t->next_bdp_ping_timer_expired_locked);
GPR_ASSERT(!t->next_bdp_ping_timer_handle.has_value());
t->next_bdp_ping_timer_handle =
t->event_engine->RunAfter(next_ping - grpc_core::Timestamp::Now(), [t] {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
next_bdp_ping_timer_expired(t);
});
}
static void next_bdp_ping_timer_expired(void* tp, grpc_error_handle error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t) {
t->combiner->Run(
GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
next_bdp_ping_timer_expired_locked, t, nullptr),
error);
absl::OkStatus());
}
static void next_bdp_ping_timer_expired_locked(void* tp,
grpc_error_handle error) {
static void next_bdp_ping_timer_expired_locked(
void* tp, GRPC_UNUSED grpc_error_handle error) {
GPR_DEBUG_ASSERT(error.ok());
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
GPR_ASSERT(t->have_next_bdp_ping_timer);
t->have_next_bdp_ping_timer = false;
if (!error.ok()) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
return;
}
GPR_ASSERT(t->next_bdp_ping_timer_handle.has_value());
t->next_bdp_ping_timer_handle.reset();
if (t->flow_control.bdp_estimator()->accumulator() == 0) {
// Block the bdp ping till we receive more data.
t->bdp_ping_blocked = true;
@ -2646,47 +2681,37 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
}
}
static void init_keepalive_ping(void* arg, grpc_error_handle error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
static void init_keepalive_ping(grpc_chttp2_transport* t) {
t->combiner->Run(GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked,
init_keepalive_ping_locked, t, nullptr),
error);
absl::OkStatus());
}
static void init_keepalive_ping_locked(void* arg, grpc_error_handle error) {
static void init_keepalive_ping_locked(void* arg,
GRPC_UNUSED grpc_error_handle error) {
GPR_DEBUG_ASSERT(error.ok());
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
GPR_ASSERT(t->keepalive_ping_timer_handle.has_value());
t->keepalive_ping_timer_handle.reset();
if (t->destroying || !t->closed_with_error.ok()) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
} else if (error.ok()) {
} else {
if (t->keepalive_permit_without_calls ||
grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
grpc_timer_init_unset(&t->keepalive_watchdog_timer);
send_keepalive_ping_locked(t);
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
} else {
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::Timestamp::Now() + t->keepalive_time,
&t->init_keepalive_ping_locked);
}
} else if (error == absl::CancelledError()) {
// The keepalive ping timer may be cancelled by bdp
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
gpr_log(GPR_INFO, "%s: Keepalive ping cancelled. Resetting timer.",
t->peer_string.c_str());
t->keepalive_ping_timer_handle =
t->event_engine->RunAfter(t->keepalive_time, [t] {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
init_keepalive_ping(t);
});
}
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::Timestamp::Now() + t->keepalive_time,
&t->init_keepalive_ping_locked);
}
GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
}
@ -2711,11 +2736,12 @@ static void start_keepalive_ping_locked(void* arg, grpc_error_handle error) {
gpr_log(GPR_INFO, "%s: Start keepalive ping", t->peer_string.c_str());
}
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
keepalive_watchdog_fired, t, grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->keepalive_watchdog_timer,
grpc_core::Timestamp::Now() + t->keepalive_timeout,
&t->keepalive_watchdog_fired_locked);
t->keepalive_watchdog_timer_handle =
t->event_engine->RunAfter(t->keepalive_timeout, [t] {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
keepalive_watchdog_fired(t);
});
t->keepalive_ping_started = true;
}
@ -2745,50 +2771,76 @@ static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error) {
}
t->keepalive_ping_started = false;
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
grpc_timer_cancel(&t->keepalive_watchdog_timer);
if (t->keepalive_watchdog_timer_handle.has_value()) {
if (t->event_engine->Cancel(*t->keepalive_watchdog_timer_handle)) {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
t->keepalive_watchdog_timer_handle.reset();
}
}
GPR_ASSERT(!t->keepalive_ping_timer_handle.has_value());
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->keepalive_ping_timer,
grpc_core::Timestamp::Now() + t->keepalive_time,
&t->init_keepalive_ping_locked);
t->keepalive_ping_timer_handle =
t->event_engine->RunAfter(t->keepalive_time, [t] {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
init_keepalive_ping(t);
});
}
}
GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
}
static void keepalive_watchdog_fired(void* arg, grpc_error_handle error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
static void keepalive_watchdog_fired(grpc_chttp2_transport* t) {
t->combiner->Run(
GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
keepalive_watchdog_fired_locked, t, nullptr),
error);
absl::OkStatus());
}
static void keepalive_watchdog_fired_locked(void* arg,
grpc_error_handle error) {
static void keepalive_watchdog_fired_locked(
void* arg, GRPC_UNUSED grpc_error_handle error) {
GPR_DEBUG_ASSERT(error.ok());
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
GPR_ASSERT(t->keepalive_watchdog_timer_handle.has_value());
t->keepalive_watchdog_timer_handle.reset();
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
if (error.ok()) {
gpr_log(GPR_INFO, "%s: Keepalive watchdog fired. Closing transport.",
t->peer_string.c_str());
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
close_transport_locked(
t, grpc_error_set_int(GRPC_ERROR_CREATE("keepalive watchdog timeout"),
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE));
}
gpr_log(GPR_INFO, "%s: Keepalive watchdog fired. Closing transport.",
t->peer_string.c_str());
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
close_transport_locked(
t, grpc_error_set_int(GRPC_ERROR_CREATE("keepalive watchdog timeout"),
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE));
} else {
// The watchdog timer should have been cancelled by
// finish_keepalive_ping_locked.
if (GPR_UNLIKELY(error != absl::CancelledError())) {
gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
}
// If keepalive_state is not PINGING, we consider it as an error. Maybe the
// cancellation failed in finish_keepalive_ping_locked. Users have seen
// other states: https://github.com/grpc/grpc/issues/32085.
gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
}
GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
}
static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t) {
if (t->keepalive_ping_timer_handle.has_value()) {
if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) {
// Cancel succeeds, resets the keepalive ping timer. Note that we don't
// need to Ref or Unref here since we still hold the Ref.
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
gpr_log(GPR_INFO, "%s: Keepalive ping cancelled. Resetting timer.",
t->peer_string.c_str());
}
t->keepalive_ping_timer_handle =
t->event_engine->RunAfter(t->keepalive_time, [t] {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
init_keepalive_ping(t);
});
}
}
}
//
// CALLBACK LOOP
//
@ -2857,7 +2909,7 @@ static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
t->combiner->Run(&t->destructive_reclaimer_locked,
absl::OkStatus());
} else {
GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
}
});
}

@ -24,11 +24,13 @@
#include <stddef.h>
#include <stdint.h>
#include <memory>
#include <string>
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/grpc.h>
#include <grpc/slice.h>
@ -57,7 +59,6 @@
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/slice/slice_buffer.h"
@ -147,8 +148,8 @@ struct grpc_chttp2_repeated_ping_policy {
struct grpc_chttp2_repeated_ping_state {
grpc_core::Timestamp last_ping_sent_time;
int pings_before_data_required;
grpc_timer delayed_ping_timer;
bool is_delayed_ping_timer_set;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
delayed_ping_timer_handle;
};
struct grpc_chttp2_server_ping_recv_state {
grpc_core::Timestamp last_ping_recv_time;
@ -410,7 +411,6 @@ struct grpc_chttp2_transport
grpc_closure finish_bdp_ping_locked;
// if non-NULL, close the transport with this error when writes are finished
//
grpc_error_handle close_transport_on_writes_finished;
// a list of closures to run after writes are finished
@ -426,25 +426,27 @@ struct grpc_chttp2_transport
/// destructive cleanup closure
grpc_closure destructive_reclaimer_locked;
// next bdp ping timer
bool have_next_bdp_ping_timer = false;
/// If start_bdp_ping_locked has been called
bool bdp_ping_started = false;
grpc_timer next_bdp_ping_timer;
// next bdp ping timer handle
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
next_bdp_ping_timer_handle;
// keep-alive ping support
/// Closure to initialize a keepalive ping
grpc_closure init_keepalive_ping_locked;
/// Closure to run when the keepalive ping is sent
grpc_closure start_keepalive_ping_locked;
/// Cousure to run when the keepalive ping ack is received
/// Closure to run when the keepalive ping ack is received
grpc_closure finish_keepalive_ping_locked;
/// Closrue to run when the keepalive ping timeouts
/// Closure to run when the keepalive ping timeouts
grpc_closure keepalive_watchdog_fired_locked;
/// timer to initiate ping events
grpc_timer keepalive_ping_timer;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
keepalive_ping_timer_handle;
/// watchdog to kill the transport when waiting for the keepalive ping
grpc_timer keepalive_watchdog_timer;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
keepalive_watchdog_timer_handle;
/// time duration in between pings
grpc_core::Duration keepalive_time;
/// grace period for a ping to complete before watchdog kicks in
@ -467,8 +469,9 @@ struct grpc_chttp2_transport
bool reading_paused_on_pending_induced_frames = false;
/// Based on channel args, preferred_rx_crypto_frame_sizes are advertised to
/// the peer
///
bool enable_preferred_rx_crypto_frame_advertisement = false;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine;
};
typedef enum {
@ -803,7 +806,7 @@ void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
bool is_client);
void grpc_chttp2_retry_initiate_ping(void* tp, grpc_error_handle error);
void grpc_chttp2_retry_initiate_ping(grpc_chttp2_transport* t);
void schedule_bdp_ping_locked(grpc_chttp2_transport* t);

@ -22,11 +22,13 @@
#include <stddef.h>
#include <algorithm>
#include <memory>
#include <string>
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/log.h>
@ -60,7 +62,6 @@
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/http2_errors.h"
@ -152,14 +153,14 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
next_allowed_ping.milliseconds_after_process_epoch(),
now.milliseconds_after_process_epoch());
}
if (!t->ping_state.is_delayed_ping_timer_set) {
t->ping_state.is_delayed_ping_timer_set = true;
if (!t->ping_state.delayed_ping_timer_handle.has_value()) {
GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked");
GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked,
grpc_chttp2_retry_initiate_ping, t,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping,
&t->retry_initiate_ping_locked);
t->ping_state.delayed_ping_timer_handle =
t->event_engine->RunAfter(next_allowed_ping - now, [t] {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_chttp2_retry_initiate_ping(t);
});
}
return;
}

Loading…
Cancel
Save