[chttp2] Remove absl::optional for TaskHandle (#34892)

veblush-patch-3
Alisha Nanda 2 years ago committed by GitHub
parent 720d7a0653
commit a2128da294
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 104
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 16
      src/core/ext/transport/chttp2/transport/internal.h
  3. 4
      src/core/ext/transport/chttp2/transport/writing.cc

@ -229,6 +229,9 @@ static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error,
#define GRPC_ARG_SETTINGS_TIMEOUT "grpc.http2.settings_timeout" #define GRPC_ARG_SETTINGS_TIMEOUT "grpc.http2.settings_timeout"
namespace { namespace {
using TaskHandle = ::grpc_event_engine::experimental::EventEngine::TaskHandle;
grpc_core::CallTracerInterface* CallTracerIfSampled(grpc_chttp2_stream* s) { grpc_core::CallTracerInterface* CallTracerIfSampled(grpc_chttp2_stream* s) {
if (s->context == nullptr || !grpc_core::IsTraceRecordCallopsEnabled()) { if (s->context == nullptr || !grpc_core::IsTraceRecordCallopsEnabled()) {
return nullptr; return nullptr;
@ -803,41 +806,33 @@ static void close_transport_locked(grpc_chttp2_transport* t,
t->closed_with_error = error; t->closed_with_error = error;
connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(), connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(),
"close_transport"); "close_transport");
if (t->keepalive_ping_timeout_handle != if (t->keepalive_ping_timeout_handle != TaskHandle::kInvalid) {
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid) { t->event_engine->Cancel(std::exchange(t->keepalive_ping_timeout_handle,
t->event_engine->Cancel(std::exchange( TaskHandle::kInvalid));
t->keepalive_ping_timeout_handle,
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid));
}
if (t->settings_ack_watchdog !=
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid) {
t->event_engine->Cancel(std::exchange(
t->settings_ack_watchdog,
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid));
}
if (t->delayed_ping_timer_handle.has_value()) {
if (t->event_engine->Cancel(*t->delayed_ping_timer_handle)) {
t->delayed_ping_timer_handle.reset();
}
} }
if (t->next_bdp_ping_timer_handle.has_value()) { if (t->settings_ack_watchdog != TaskHandle::kInvalid) {
if (t->event_engine->Cancel(*t->next_bdp_ping_timer_handle)) { t->event_engine->Cancel(
t->next_bdp_ping_timer_handle.reset(); std::exchange(t->settings_ack_watchdog, TaskHandle::kInvalid));
} }
if (t->delayed_ping_timer_handle != TaskHandle::kInvalid &&
t->event_engine->Cancel(t->delayed_ping_timer_handle)) {
t->delayed_ping_timer_handle = TaskHandle::kInvalid;
}
if (t->next_bdp_ping_timer_handle != TaskHandle::kInvalid &&
t->event_engine->Cancel(t->next_bdp_ping_timer_handle)) {
t->next_bdp_ping_timer_handle = TaskHandle::kInvalid;
} }
switch (t->keepalive_state) { switch (t->keepalive_state) {
case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
if (t->keepalive_ping_timer_handle.has_value()) { if (t->keepalive_ping_timer_handle != TaskHandle::kInvalid &&
if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) { t->event_engine->Cancel(t->keepalive_ping_timer_handle)) {
t->keepalive_ping_timer_handle.reset(); t->keepalive_ping_timer_handle = TaskHandle::kInvalid;
}
} }
break; break;
case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING: case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
if (t->keepalive_ping_timer_handle.has_value()) { if (t->keepalive_ping_timer_handle != TaskHandle::kInvalid &&
if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) { t->event_engine->Cancel(t->keepalive_ping_timer_handle)) {
t->keepalive_ping_timer_handle.reset(); t->keepalive_ping_timer_handle = TaskHandle::kInvalid;
}
} }
break; break;
case GRPC_CHTTP2_KEEPALIVE_STATE_DYING: case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
@ -1837,8 +1832,8 @@ static void retry_initiate_ping_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t, grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
GRPC_UNUSED grpc_error_handle error) { GRPC_UNUSED grpc_error_handle error) {
GPR_DEBUG_ASSERT(error.ok()); GPR_DEBUG_ASSERT(error.ok());
GPR_ASSERT(t->delayed_ping_timer_handle.has_value()); GPR_ASSERT(t->delayed_ping_timer_handle != TaskHandle::kInvalid);
t->delayed_ping_timer_handle.reset(); t->delayed_ping_timer_handle = TaskHandle::kInvalid;
grpc_chttp2_initiate_write(t.get(), grpc_chttp2_initiate_write(t.get(),
GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
} }
@ -2859,17 +2854,15 @@ static void read_action_locked(
grpc_error_handle error) { grpc_error_handle error) {
// got an incoming read, cancel any pending keepalive timers // got an incoming read, cancel any pending keepalive timers
t->keepalive_incoming_data_wanted = false; t->keepalive_incoming_data_wanted = false;
if (t->keepalive_ping_timeout_handle != if (t->keepalive_ping_timeout_handle != TaskHandle::kInvalid) {
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_ping_trace) || if (GRPC_TRACE_FLAG_ENABLED(grpc_ping_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) { GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"%s[%p]: Clear keepalive timer because data was received", "%s[%p]: Clear keepalive timer because data was received",
t->is_client ? "CLIENT" : "SERVER", t.get()); t->is_client ? "CLIENT" : "SERVER", t.get());
} }
t->event_engine->Cancel(std::exchange( t->event_engine->Cancel(
t->keepalive_ping_timeout_handle, std::exchange(t->keepalive_ping_timeout_handle, TaskHandle::kInvalid));
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid));
} }
grpc_error_handle err = error; grpc_error_handle err = error;
if (!err.ok()) { if (!err.ok()) {
@ -2962,7 +2955,7 @@ static void finish_bdp_ping_locked(
t->flow_control.bdp_estimator()->CompletePing(); t->flow_control.bdp_estimator()->CompletePing();
grpc_chttp2_act_on_flowctl_action(t->flow_control.PeriodicUpdate(), t.get(), grpc_chttp2_act_on_flowctl_action(t->flow_control.PeriodicUpdate(), t.get(),
nullptr); nullptr);
GPR_ASSERT(!t->next_bdp_ping_timer_handle.has_value()); GPR_ASSERT(t->next_bdp_ping_timer_handle == TaskHandle::kInvalid);
t->next_bdp_ping_timer_handle = t->next_bdp_ping_timer_handle =
t->event_engine->RunAfter(next_ping - grpc_core::Timestamp::Now(), [t] { t->event_engine->RunAfter(next_ping - grpc_core::Timestamp::Now(), [t] {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
@ -2982,8 +2975,8 @@ static void next_bdp_ping_timer_expired_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t, grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
GRPC_UNUSED grpc_error_handle error) { GRPC_UNUSED grpc_error_handle error) {
GPR_DEBUG_ASSERT(error.ok()); GPR_DEBUG_ASSERT(error.ok());
GPR_ASSERT(t->next_bdp_ping_timer_handle.has_value()); GPR_ASSERT(t->next_bdp_ping_timer_handle != TaskHandle::kInvalid);
t->next_bdp_ping_timer_handle.reset(); t->next_bdp_ping_timer_handle = TaskHandle::kInvalid;
if (t->flow_control.bdp_estimator()->accumulator() == 0) { if (t->flow_control.bdp_estimator()->accumulator() == 0) {
// Block the bdp ping till we receive more data. // Block the bdp ping till we receive more data.
t->bdp_ping_blocked = true; t->bdp_ping_blocked = true;
@ -3052,8 +3045,8 @@ static void init_keepalive_ping_locked(
GRPC_UNUSED grpc_error_handle error) { GRPC_UNUSED grpc_error_handle error) {
GPR_DEBUG_ASSERT(error.ok()); GPR_DEBUG_ASSERT(error.ok());
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
GPR_ASSERT(t->keepalive_ping_timer_handle.has_value()); GPR_ASSERT(t->keepalive_ping_timer_handle != TaskHandle::kInvalid);
t->keepalive_ping_timer_handle.reset(); t->keepalive_ping_timer_handle = TaskHandle::kInvalid;
if (t->destroying || !t->closed_with_error.ok()) { if (t->destroying || !t->closed_with_error.ok()) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
} else { } else {
@ -3094,7 +3087,7 @@ static void finish_keepalive_ping_locked(
std::string(t->peer_string.as_string_view()).c_str()); std::string(t->peer_string.as_string_view()).c_str());
} }
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
GPR_ASSERT(!t->keepalive_ping_timer_handle.has_value()); GPR_ASSERT(t->keepalive_ping_timer_handle == TaskHandle::kInvalid);
t->keepalive_ping_timer_handle = t->keepalive_ping_timer_handle =
t->event_engine->RunAfter(t->keepalive_time, [t] { t->event_engine->RunAfter(t->keepalive_time, [t] {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
@ -3106,22 +3099,21 @@ static void finish_keepalive_ping_locked(
} }
static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t) { static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t) {
if (t->keepalive_ping_timer_handle.has_value()) { if (t->keepalive_ping_timer_handle != TaskHandle::kInvalid &&
if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) { t->event_engine->Cancel(t->keepalive_ping_timer_handle)) {
// Cancel succeeds, resets the keepalive ping timer. Note that we don't // Cancel succeeds, resets the keepalive ping timer. Note that we don't
// need to Ref or Unref here since we still hold the Ref. // need to Ref or Unref here since we still hold the Ref.
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) || if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) { GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
gpr_log(GPR_INFO, "%s: Keepalive ping cancelled. Resetting timer.", gpr_log(GPR_INFO, "%s: Keepalive ping cancelled. Resetting timer.",
std::string(t->peer_string.as_string_view()).c_str()); std::string(t->peer_string.as_string_view()).c_str());
}
t->keepalive_ping_timer_handle = t->event_engine->RunAfter(
t->keepalive_time, [t = t->Ref()]() mutable {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
init_keepalive_ping(std::move(t));
});
} }
t->keepalive_ping_timer_handle =
t->event_engine->RunAfter(t->keepalive_time, [t = t->Ref()]() mutable {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
init_keepalive_ping(std::move(t));
});
} }
} }

@ -384,8 +384,9 @@ struct grpc_chttp2_transport final
grpc_core::Chttp2PingAbusePolicy ping_abuse_policy; grpc_core::Chttp2PingAbusePolicy ping_abuse_policy;
grpc_core::Chttp2PingRatePolicy ping_rate_policy; grpc_core::Chttp2PingRatePolicy ping_rate_policy;
grpc_core::Chttp2PingCallbacks ping_callbacks; grpc_core::Chttp2PingCallbacks ping_callbacks;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle> grpc_event_engine::experimental::EventEngine::TaskHandle
delayed_ping_timer_handle; delayed_ping_timer_handle =
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
grpc_closure retry_initiate_ping_locked; grpc_closure retry_initiate_ping_locked;
grpc_core::Chttp2MaxConcurrentStreamsPolicy max_concurrent_streams_policy; grpc_core::Chttp2MaxConcurrentStreamsPolicy max_concurrent_streams_policy;
@ -457,8 +458,9 @@ struct grpc_chttp2_transport final
grpc_closure destructive_reclaimer_locked; grpc_closure destructive_reclaimer_locked;
// next bdp ping timer handle // next bdp ping timer handle
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle> grpc_event_engine::experimental::EventEngine::TaskHandle
next_bdp_ping_timer_handle; next_bdp_ping_timer_handle =
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
// keep-alive ping support // keep-alive ping support
/// Closure to initialize a keepalive ping /// Closure to initialize a keepalive ping
@ -466,8 +468,10 @@ struct grpc_chttp2_transport final
/// Closure to run when the keepalive ping ack is received /// Closure to run when the keepalive ping ack is received
grpc_closure finish_keepalive_ping_locked; grpc_closure finish_keepalive_ping_locked;
/// timer to initiate ping events /// timer to initiate ping events
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle> grpc_event_engine::experimental::EventEngine::TaskHandle
keepalive_ping_timer_handle; keepalive_ping_timer_handle =
grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
;
/// time duration in between pings /// time duration in between pings
grpc_core::Duration keepalive_time; grpc_core::Duration keepalive_time;
/// grace period to wait for data after sending a ping before keepalives /// grace period to wait for data after sending a ping before keepalives

@ -172,7 +172,9 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
too_soon.next_allowed_ping_interval.ToString().c_str(), too_soon.next_allowed_ping_interval.ToString().c_str(),
too_soon.wait.ToString().c_str()); too_soon.wait.ToString().c_str());
} }
if (!t->delayed_ping_timer_handle.has_value()) { if (t->delayed_ping_timer_handle ==
grpc_event_engine::experimental::EventEngine::TaskHandle::
kInvalid) {
t->delayed_ping_timer_handle = t->event_engine->RunAfter( t->delayed_ping_timer_handle = t->event_engine->RunAfter(
too_soon.wait, [t = t->Ref()]() mutable { too_soon.wait, [t = t->Ref()]() mutable {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;

Loading…
Cancel
Save