diff --git a/include/grpc/event_engine/memory_allocator.h b/include/grpc/event_engine/memory_allocator.h index b31bed0ed21..b3143d8dd6a 100644 --- a/include/grpc/event_engine/memory_allocator.h +++ b/include/grpc/event_engine/memory_allocator.h @@ -56,8 +56,8 @@ class MemoryAllocator { /// The object will not be usable after this call unless it's a valid /// allocator is moved into it. void Reset() { - if (allocator_ != nullptr) allocator_->Shutdown(); - allocator_.reset(); + auto a = std::move(allocator_); + if (a != nullptr) a->Shutdown(); } /// Reserve bytes from the quota. diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index a4718fc769a..7ba070e479e 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -210,7 +210,8 @@ class Chttp2ServerListener : public Server::ListenerInterface { OrphanablePtr handshaking_state_ ABSL_GUARDED_BY(&mu_); // Set by HandshakingState when handshaking is done and a valid transport // is created. - grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr; + RefCountedPtr transport_ ABSL_GUARDED_BY(&mu_) = + nullptr; grpc_closure on_close_; absl::optional drain_grace_timer_handle_ ABSL_GUARDED_BY(&mu_); @@ -419,7 +420,7 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout() { { MutexLock lock(&connection_->mu_); if (timer_handle_.has_value()) { - transport = connection_->transport_; + transport = connection_->transport_.get(); timer_handle_.reset(); } } @@ -490,9 +491,7 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone( // TODO(roth): Change to static_cast<> when we C++-ify the // transport API. self->connection_->transport_ = - reinterpret_cast(transport); - GRPC_CHTTP2_REF_TRANSPORT(self->connection_->transport_, - "ActiveConnection"); // Held by connection_ + reinterpret_cast(transport)->Ref(); self->Ref().release(); // Held by OnReceiveSettings(). GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self, grpc_schedule_on_exec_ctx); @@ -572,11 +571,7 @@ Chttp2ServerListener::ActiveConnection::ActiveConnection( grpc_schedule_on_exec_ctx); } -Chttp2ServerListener::ActiveConnection::~ActiveConnection() { - if (transport_ != nullptr) { - GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "ActiveConnection"); - } -} +Chttp2ServerListener::ActiveConnection::~ActiveConnection() {} void Chttp2ServerListener::ActiveConnection::Orphan() { OrphanablePtr handshaking_state; @@ -595,7 +590,7 @@ void Chttp2ServerListener::ActiveConnection::SendGoAway() { { MutexLock lock(&mu_); if (transport_ != nullptr && !shutdown_) { - transport = transport_; + transport = transport_.get(); drain_grace_timer_handle_ = event_engine_->RunAfter( std::max(Duration::Zero(), listener_->args_ @@ -667,7 +662,7 @@ void Chttp2ServerListener::ActiveConnection::OnDrainGraceTimeExpiry() { { MutexLock lock(&mu_); if (drain_grace_timer_handle_.has_value()) { - transport = transport_; + transport = transport_.get(); drain_grace_timer_handle_.reset(); } } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 06f9d90478d..092fb6b86ce 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -132,14 +133,20 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false, "chttp2_refcount"); // forward declarations of various callbacks that we'll build closures around -static void write_action_begin_locked(void* t, grpc_error_handle error); -static void write_action(void* t, grpc_error_handle error); -static void write_action_end(void* t, grpc_error_handle error); -static void write_action_end_locked(void* t, grpc_error_handle error); - -static void read_action(void* t, grpc_error_handle error); -static void read_action_locked(void* t, grpc_error_handle error); -static void continue_read_action_locked(grpc_chttp2_transport* t); +static void write_action_begin_locked( + grpc_core::RefCountedPtr, grpc_error_handle error); +static void write_action(grpc_chttp2_transport* t); +static void write_action_end(grpc_core::RefCountedPtr, + grpc_error_handle error); +static void write_action_end_locked( + grpc_core::RefCountedPtr, grpc_error_handle error); + +static void read_action(grpc_core::RefCountedPtr, + grpc_error_handle error); +static void read_action_locked(grpc_core::RefCountedPtr, + grpc_error_handle error); +static void continue_read_action_locked( + grpc_core::RefCountedPtr t); // Set a transport level setting, and push it to our peer static void queue_setting_update(grpc_chttp2_transport* t, @@ -156,8 +163,10 @@ static void connectivity_state_set(grpc_chttp2_transport* t, const absl::Status& status, const char* reason); -static void benign_reclaimer_locked(void* arg, grpc_error_handle error); -static void destructive_reclaimer_locked(void* arg, grpc_error_handle error); +static void benign_reclaimer_locked( + grpc_core::RefCountedPtr, grpc_error_handle error); +static void destructive_reclaimer_locked( + grpc_core::RefCountedPtr, grpc_error_handle error); static void post_benign_reclaimer(grpc_chttp2_transport* t); static void post_destructive_reclaimer(grpc_chttp2_transport* t); @@ -167,31 +176,45 @@ static void close_transport_locked(grpc_chttp2_transport* t, static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error_handle error); -static void start_bdp_ping(void* tp, grpc_error_handle error); -static void finish_bdp_ping(void* tp, grpc_error_handle error); -static void start_bdp_ping_locked(void* tp, grpc_error_handle error); -static void finish_bdp_ping_locked(void* tp, grpc_error_handle error); +static void start_bdp_ping(grpc_core::RefCountedPtr, + grpc_error_handle error); +static void finish_bdp_ping(grpc_core::RefCountedPtr, + grpc_error_handle error); +static void start_bdp_ping_locked( + grpc_core::RefCountedPtr, grpc_error_handle error); +static void finish_bdp_ping_locked( + grpc_core::RefCountedPtr, grpc_error_handle error); static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t); static void next_bdp_ping_timer_expired_locked( - void* tp, GRPC_UNUSED grpc_error_handle error); + grpc_core::RefCountedPtr tp, + GRPC_UNUSED grpc_error_handle error); static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error); static void send_ping_locked(grpc_chttp2_transport* t, grpc_closure* on_initiate, grpc_closure* on_ack); -static void retry_initiate_ping_locked(void* tp, - GRPC_UNUSED grpc_error_handle error); +static void retry_initiate_ping_locked( + grpc_core::RefCountedPtr t, + GRPC_UNUSED grpc_error_handle error); // keepalive-relevant functions -static void init_keepalive_ping(grpc_chttp2_transport* t); -static void init_keepalive_ping_locked(void* arg, - GRPC_UNUSED grpc_error_handle error); -static void start_keepalive_ping(void* arg, grpc_error_handle error); -static void finish_keepalive_ping(void* arg, grpc_error_handle error); -static void start_keepalive_ping_locked(void* arg, grpc_error_handle error); -static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error); -static void keepalive_watchdog_fired(grpc_chttp2_transport* t); +static void init_keepalive_ping( + grpc_core::RefCountedPtr t); +static void init_keepalive_ping_locked( + grpc_core::RefCountedPtr t, + GRPC_UNUSED grpc_error_handle error); +static void start_keepalive_ping( + grpc_core::RefCountedPtr t, grpc_error_handle error); +static void finish_keepalive_ping( + grpc_core::RefCountedPtr t, grpc_error_handle error); +static void start_keepalive_ping_locked( + grpc_core::RefCountedPtr t, grpc_error_handle error); +static void finish_keepalive_ping_locked( + grpc_core::RefCountedPtr t, grpc_error_handle error); +static void keepalive_watchdog_fired( + grpc_core::RefCountedPtr t); static void keepalive_watchdog_fired_locked( - void* arg, GRPC_UNUSED grpc_error_handle error); + grpc_core::RefCountedPtr t, + GRPC_UNUSED grpc_error_handle error); static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t); namespace { @@ -211,6 +234,24 @@ grpc_core::CopyContextFn g_get_copied_context_fn = nullptr; namespace grpc_core { +namespace { +// Initialize a grpc_closure \a c to call \a Fn with \a t and \a error. Holds +// the passed in reference to \a t until it's moved into Fn. +template , grpc_error_handle)> +grpc_closure* InitTransportClosure(RefCountedPtr t, + grpc_closure* c) { + GRPC_CLOSURE_INIT( + c, + [](void* tp, grpc_error_handle error) { + Fn(RefCountedPtr( + static_cast(tp)), + std::move(error)); + }, + t.release(), nullptr); + return c; +} +} // namespace + namespace { TestOnlyGlobalHttp2TransportInitCallback test_only_init_callback = nullptr; TestOnlyGlobalHttp2TransportDestructCallback test_only_destruct_callback = @@ -493,17 +534,16 @@ static void read_channel_args(grpc_chttp2_transport* t, } static void init_keepalive_pings_if_enabled_locked( - void* arg, GRPC_UNUSED grpc_error_handle error) { + grpc_core::RefCountedPtr t, + GRPC_UNUSED grpc_error_handle error) { GPR_DEBUG_ASSERT(error.ok()); - grpc_chttp2_transport* t = static_cast(arg); if (t->keepalive_time != grpc_core::Duration::Infinity()) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; - GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); t->keepalive_ping_timer_handle = - t->event_engine->RunAfter(t->keepalive_time, [t] { + t->event_engine->RunAfter(t->keepalive_time, [t = t->Ref()]() mutable { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; - init_keepalive_ping(t); + init_keepalive_ping(std::move(t)); }); } else { // Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no @@ -578,8 +618,8 @@ grpc_chttp2_transport::grpc_chttp2_transport( grpc_core::ExecCtx exec_ctx; combiner->Run( - GRPC_CLOSURE_INIT(&init_keepalive_ping_locked, - init_keepalive_pings_if_enabled_locked, this, nullptr), + grpc_core::InitTransportClosure( + Ref(), &init_keepalive_ping_locked), absl::OkStatus()); if (flow_control.bdp_probe()) { @@ -604,15 +644,15 @@ grpc_chttp2_transport::grpc_chttp2_transport( } static void destroy_transport_locked(void* tp, grpc_error_handle /*error*/) { - grpc_chttp2_transport* t = static_cast(tp); + grpc_core::RefCountedPtr t( + static_cast(tp)); t->destroying = 1; close_transport_locked( - t, grpc_error_set_int(GRPC_ERROR_CREATE("Transport destroyed"), - grpc_core::StatusIntProperty::kOccurredDuringWrite, - t->write_state)); + t.get(), + grpc_error_set_int(GRPC_ERROR_CREATE("Transport destroyed"), + grpc_core::StatusIntProperty::kOccurredDuringWrite, + t->write_state)); t->memory_owner.Reset(); - // Must be the last line. - GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy"); } static void destroy_transport(grpc_transport* gt) { @@ -646,13 +686,11 @@ static void close_transport_locked(grpc_chttp2_transport* t, "close_transport"); if (t->delayed_ping_timer_handle.has_value()) { if (t->event_engine->Cancel(*t->delayed_ping_timer_handle)) { - GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked"); t->delayed_ping_timer_handle.reset(); } } if (t->next_bdp_ping_timer_handle.has_value()) { if (t->event_engine->Cancel(*t->next_bdp_ping_timer_handle)) { - GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); t->next_bdp_ping_timer_handle.reset(); } } @@ -660,7 +698,6 @@ static void close_transport_locked(grpc_chttp2_transport* t, case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: if (t->keepalive_ping_timer_handle.has_value()) { if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) { - GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); t->keepalive_ping_timer_handle.reset(); } } @@ -668,13 +705,11 @@ static void close_transport_locked(grpc_chttp2_transport* t, case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING: if (t->keepalive_ping_timer_handle.has_value()) { if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) { - GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); t->keepalive_ping_timer_handle.reset(); } } if (t->keepalive_watchdog_timer_handle.has_value()) { if (t->event_engine->Cancel(*t->keepalive_watchdog_timer_handle)) { - GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); t->keepalive_watchdog_timer_handle.reset(); } } @@ -720,21 +755,23 @@ void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) { } #endif -grpc_chttp2_stream::Reffer::Reffer(grpc_chttp2_stream* s) { - // We reserve one 'active stream' that's dropped when the stream is - // read-closed. The others are for Chttp2IncomingByteStreams that are - // actively reading - GRPC_CHTTP2_STREAM_REF(s, "chttp2"); - GRPC_CHTTP2_REF_TRANSPORT(s->t, "stream"); -} - grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_stream_refcount* refcount, const void* server_data, grpc_core::Arena* arena) - : t(t), - refcount(refcount), - reffer(this), + : t(t->Ref()), + refcount([refcount]() { +// We reserve one 'active stream' that's dropped when the stream is +// read-closed. The others are for Chttp2IncomingByteStreams that are +// actively reading +// We do this here to avoid cache misses. +#ifndef NDEBUG + grpc_stream_ref(refcount, "chttp2"); +#else + grpc_stream_ref(refcount); +#endif + return refcount; + }()), initial_metadata_buffer(arena), trailing_metadata_buffer(arena), flow_control(&t->flow_control) { @@ -754,8 +791,8 @@ grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, } grpc_chttp2_stream::~grpc_chttp2_stream() { - grpc_chttp2_list_remove_stalled_by_stream(t, this); - grpc_chttp2_list_remove_stalled_by_transport(t, this); + grpc_chttp2_list_remove_stalled_by_stream(t.get(), this); + grpc_chttp2_list_remove_stalled_by_transport(t.get(), this); if (t->channelz_socket != nullptr) { if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) { @@ -786,7 +823,6 @@ grpc_chttp2_stream::~grpc_chttp2_stream() { GPR_ASSERT(recv_message_ready == nullptr); GPR_ASSERT(recv_trailing_metadata_finished == nullptr); grpc_slice_buffer_destroy(&flow_controlled_buffer); - GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream"); grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_stream_arg, absl::OkStatus()); } @@ -874,7 +910,6 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t, case GRPC_CHTTP2_WRITE_STATE_IDLE: set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, grpc_chttp2_initiate_write_reason_string(reason)); - GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); // Note that the 'write_action_begin_locked' closure is being scheduled // on the 'finally_scheduler' of t->combiner. This means that // 'write_action_begin_locked' is called only *after* all the other @@ -892,8 +927,8 @@ void grpc_chttp2_initiate_write(grpc_chttp2_transport* t, // It does not call the endpoint to write the bytes. That is done by the // 'write_action' (which is scheduled by 'write_action_begin_locked') t->combiner->FinallyRun( - GRPC_CLOSURE_INIT(&t->write_action_begin_locked, - write_action_begin_locked, t, nullptr), + grpc_core::InitTransportClosure( + t->Ref(), &t->write_action_begin_locked), absl::OkStatus()); break; case GRPC_CHTTP2_WRITE_STATE_WRITING: @@ -920,22 +955,22 @@ static const char* begin_writing_desc(bool partial) { } } -static void write_action_begin_locked(void* gt, - grpc_error_handle /*error_ignored*/) { - grpc_chttp2_transport* t = static_cast(gt); +static void write_action_begin_locked( + grpc_core::RefCountedPtr t, + grpc_error_handle /*error_ignored*/) { GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); grpc_chttp2_begin_write_result r; if (!t->closed_with_error.ok()) { r.writing = false; } else { - r = grpc_chttp2_begin_write(t); + r = grpc_chttp2_begin_write(t.get()); } if (r.writing) { - set_write_state(t, + set_write_state(t.get(), r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE : GRPC_CHTTP2_WRITE_STATE_WRITING, begin_writing_desc(r.partial)); - write_action(t, absl::OkStatus()); + write_action(t.get()); if (t->reading_paused_on_pending_induced_frames) { GPR_ASSERT(t->num_pending_induced_frames == 0); // We had paused reading, because we had many induced frames (SETTINGS @@ -945,18 +980,17 @@ static void write_action_begin_locked(void* gt, GPR_INFO, "transport %p : Resuming reading after being paused due to too " "many unwritten SETTINGS ACK, PINGS ACK and RST_STREAM frames", - t)); + t.get())); t->reading_paused_on_pending_induced_frames = false; - continue_read_action_locked(t); + continue_read_action_locked(std::move(t)); } } else { - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); - GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); + set_write_state(t.get(), GRPC_CHTTP2_WRITE_STATE_IDLE, + "begin writing nothing"); } } -static void write_action(void* gt, grpc_error_handle /*error*/) { - grpc_chttp2_transport* t = static_cast(gt); +static void write_action(grpc_chttp2_transport* t) { void* cl = t->cl; if (!t->cl->empty()) { // Transfer the ownership of the context list to the endpoint and create and @@ -980,28 +1014,28 @@ static void write_action(void* gt, grpc_error_handle /*error*/) { if (max_frame_size == 0) { max_frame_size = INT_MAX; } - grpc_endpoint_write( - t->ep, &t->outbuf, - GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end, t, - grpc_schedule_on_exec_ctx), - cl, max_frame_size); + grpc_endpoint_write(t->ep, &t->outbuf, + grpc_core::InitTransportClosure( + t->Ref(), &t->write_action_end_locked), + cl, max_frame_size); } -static void write_action_end(void* tp, grpc_error_handle error) { - grpc_chttp2_transport* t = static_cast(tp); - t->combiner->Run(GRPC_CLOSURE_INIT(&t->write_action_end_locked, - write_action_end_locked, t, nullptr), - error); +static void write_action_end(grpc_core::RefCountedPtr t, + grpc_error_handle error) { + auto* tp = t.get(); + tp->combiner->Run(grpc_core::InitTransportClosure( + std::move(t), &tp->write_action_end_locked), + error); } // Callback from the grpc_endpoint after bytes have been written by calling // sendmsg -static void write_action_end_locked(void* tp, grpc_error_handle error) { - grpc_chttp2_transport* t = static_cast(tp); - +static void write_action_end_locked( + grpc_core::RefCountedPtr t, + grpc_error_handle error) { bool closed = false; if (!error.ok()) { - close_transport_locked(t, error); + close_transport_locked(t.get(), error); closed = true; } @@ -1009,7 +1043,7 @@ static void write_action_end_locked(void* tp, grpc_error_handle error) { t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SENT; closed = true; if (t->stream_map.empty()) { - close_transport_locked(t, GRPC_ERROR_CREATE("goaway sent")); + close_transport_locked(t.get(), GRPC_ERROR_CREATE("goaway sent")); } } @@ -1017,11 +1051,11 @@ static void write_action_end_locked(void* tp, grpc_error_handle error) { case GRPC_CHTTP2_WRITE_STATE_IDLE: GPR_UNREACHABLE_CODE(break); case GRPC_CHTTP2_WRITE_STATE_WRITING: - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing"); + set_write_state(t.get(), GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing"); break; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: - set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing"); - GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); + set_write_state(t.get(), GRPC_CHTTP2_WRITE_STATE_WRITING, + "continue writing"); // If the transport is closed, we will retry writing on the endpoint // and next write may contain part of the currently serialized frames. // So, we should only call the run_after_write callbacks when the next @@ -1031,14 +1065,13 @@ static void write_action_end_locked(void* tp, grpc_error_handle error) { grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write); } t->combiner->FinallyRun( - GRPC_CLOSURE_INIT(&t->write_action_begin_locked, - write_action_begin_locked, t, nullptr), + grpc_core::InitTransportClosure( + t, &t->write_action_begin_locked), absl::OkStatus()); break; } - grpc_chttp2_end_write(t, error); - GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); + grpc_chttp2_end_write(t.get(), error); } // Dirties an HTTP2 setting to be sent out next time a writing path occurs. @@ -1111,7 +1144,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t, s->trailing_metadata_buffer.Set( grpc_core::GrpcStreamNetworkState(), grpc_core::GrpcStreamNetworkState::kNotSeenByServer); - grpc_chttp2_cancel_stream(s->t, s, s->t->goaway_error); + grpc_chttp2_cancel_stream(s->t.get(), s, s->t->goaway_error); } } absl::Status status = grpc_error_to_absl_status(t->goaway_error); @@ -1300,7 +1333,7 @@ static void perform_stream_op_locked(void* stream_op, grpc_chttp2_stream* s = static_cast(op->handler_private.extra_arg); grpc_transport_stream_op_batch_payload* op_payload = op->payload; - grpc_chttp2_transport* t = s->t; + grpc_chttp2_transport* t = s->t.get(); s->context = op->payload->context; s->traced = op->is_traced; @@ -1608,56 +1641,61 @@ static void send_ping_locked(grpc_chttp2_transport* t, // Specialized form of send_ping_locked for keepalive ping. If there is already // a ping in progress, the keepalive ping would piggyback onto that ping, // instead of waiting for that ping to complete and then starting a new ping. -static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { +static void send_keepalive_ping_locked( + grpc_core::RefCountedPtr t) { if (!t->closed_with_error.ok()) { - t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, - start_keepalive_ping_locked, t, nullptr), - t->closed_with_error); t->combiner->Run( - GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, - finish_keepalive_ping_locked, t, nullptr), + grpc_core::InitTransportClosure( + t->Ref(), &t->start_keepalive_ping_locked), + t->closed_with_error); + t->combiner->Run( + grpc_core::InitTransportClosure( + t->Ref(), &t->finish_keepalive_ping_locked), t->closed_with_error); return; } grpc_chttp2_ping_queue* pq = &t->ping_queue; if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) { // There is a ping in flight. Add yourself to the inflight closure list. - t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, - start_keepalive_ping_locked, t, nullptr), - t->closed_with_error); + t->combiner->Run( + grpc_core::InitTransportClosure( + t->Ref(), &t->start_keepalive_ping_locked), + t->closed_with_error); grpc_closure_list_append( &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT], - GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, - finish_keepalive_ping, t, grpc_schedule_on_exec_ctx), + grpc_core::InitTransportClosure( + t->Ref(), &t->finish_keepalive_ping_locked), absl::OkStatus()); return; } grpc_closure_list_append( &pq->lists[GRPC_CHTTP2_PCL_INITIATE], - GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, start_keepalive_ping, - t, grpc_schedule_on_exec_ctx), + grpc_core::InitTransportClosure( + t->Ref(), &t->start_keepalive_ping_locked), absl::OkStatus()); grpc_closure_list_append( &pq->lists[GRPC_CHTTP2_PCL_NEXT], - GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, finish_keepalive_ping, - t, grpc_schedule_on_exec_ctx), + grpc_core::InitTransportClosure( + t->Ref(), &t->finish_keepalive_ping_locked), absl::OkStatus()); } -void grpc_chttp2_retry_initiate_ping(grpc_chttp2_transport* t) { - t->combiner->Run(GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, - retry_initiate_ping_locked, t, nullptr), - absl::OkStatus()); +void grpc_chttp2_retry_initiate_ping( + grpc_core::RefCountedPtr t) { + auto tp = t.get(); + tp->combiner->Run(grpc_core::InitTransportClosure( + std::move(t), &tp->retry_initiate_ping_locked), + absl::OkStatus()); } -static void retry_initiate_ping_locked(void* tp, - GRPC_UNUSED grpc_error_handle error) { +static void retry_initiate_ping_locked( + grpc_core::RefCountedPtr t, + GRPC_UNUSED grpc_error_handle error) { GPR_DEBUG_ASSERT(error.ok()); - grpc_chttp2_transport* t = static_cast(tp); GPR_ASSERT(t->delayed_ping_timer_handle.has_value()); t->delayed_ping_timer_handle.reset(); - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); - GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked"); + grpc_chttp2_initiate_write(t.get(), + GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); } void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) { @@ -1687,16 +1725,11 @@ class GracefulGoaway : public grpc_core::RefCounted { public: static void Start(grpc_chttp2_transport* t) { new GracefulGoaway(t); } - ~GracefulGoaway() override { - GRPC_CHTTP2_UNREF_TRANSPORT(t_, "graceful goaway"); - } - private: using TaskHandle = ::grpc_event_engine::experimental::EventEngine::TaskHandle; - explicit GracefulGoaway(grpc_chttp2_transport* t) : t_(t) { + explicit GracefulGoaway(grpc_chttp2_transport* t) : t_(t->Ref()) { t->sent_goaway_state = GRPC_CHTTP2_GRACEFUL_GOAWAY; - GRPC_CHTTP2_REF_TRANSPORT(t_, "graceful goaway"); grpc_chttp2_goaway_append((1u << 31) - 1, 0, grpc_empty_slice(), &t->qbuf); send_ping_locked( t, nullptr, GRPC_CLOSURE_INIT(&on_ping_ack_, OnPingAck, this, nullptr)); @@ -1724,7 +1757,7 @@ class GracefulGoaway : public grpc_core::RefCounted { gpr_log(GPR_INFO, "transport:%p %s peer:%s Transport already shutting down. " "Graceful GOAWAY abandoned.", - t_, t_->is_client ? "CLIENT" : "SERVER", + t_.get(), t_->is_client ? "CLIENT" : "SERVER", std::string(t_->peer_string.as_string_view()).c_str())); return; } @@ -1733,13 +1766,14 @@ class GracefulGoaway : public grpc_core::RefCounted { gpr_log(GPR_INFO, "transport:%p %s peer:%s Graceful shutdown: Ping received. " "Sending final GOAWAY with stream_id:%d", - t_, t_->is_client ? "CLIENT" : "SERVER", + t_.get(), t_->is_client ? "CLIENT" : "SERVER", std::string(t_->peer_string.as_string_view()).c_str(), t_->last_new_stream_id)); t_->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED; grpc_chttp2_goaway_append(t_->last_new_stream_id, 0, grpc_empty_slice(), &t_->qbuf); - grpc_chttp2_initiate_write(t_, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT); + grpc_chttp2_initiate_write(t_.get(), + GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT); } static void OnPingAck(void* arg, grpc_error_handle /* error */) { @@ -1767,7 +1801,7 @@ class GracefulGoaway : public grpc_core::RefCounted { self->Unref(); } - grpc_chttp2_transport* t_; + const grpc_core::RefCountedPtr t_; grpc_closure on_ping_ack_; TaskHandle timer_handle_ = TaskHandle::kInvalid; grpc_closure on_timer_; @@ -1829,11 +1863,11 @@ void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t) { static void perform_transport_op_locked(void* stream_op, grpc_error_handle /*error_ignored*/) { grpc_transport_op* op = static_cast(stream_op); - grpc_chttp2_transport* t = - static_cast(op->handler_private.extra_arg); + grpc_core::RefCountedPtr t( + static_cast(op->handler_private.extra_arg)); if (!op->goaway_error.ok()) { - send_goaway(t, op->goaway_error, /*immediate_disconnect_hint=*/false); + send_goaway(t.get(), op->goaway_error, /*immediate_disconnect_hint=*/false); } if (op->set_accept_stream) { @@ -1850,8 +1884,9 @@ static void perform_transport_op_locked(void* stream_op, } if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { - send_ping_locked(t, op->send_ping.on_initiate, op->send_ping.on_ack); - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); + send_ping_locked(t.get(), op->send_ping.on_initiate, op->send_ping.on_ack); + grpc_chttp2_initiate_write(t.get(), + GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); } if (op->start_connectivity_watch != nullptr) { @@ -1863,14 +1898,12 @@ static void perform_transport_op_locked(void* stream_op, } if (!op->disconnect_with_error.ok()) { - send_goaway(t, op->disconnect_with_error, + send_goaway(t.get(), op->disconnect_with_error, /*immediate_disconnect_hint=*/true); - close_transport_locked(t, op->disconnect_with_error); + close_transport_locked(t.get(), op->disconnect_with_error); } grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus()); - - GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op"); } static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { @@ -1880,10 +1913,10 @@ static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { grpc_transport_op_string(op).c_str()); } op->handler_private.extra_arg = gt; - GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); - t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure, - perform_transport_op_locked, op, nullptr), - absl::OkStatus()); + t->Ref().release()->combiner->Run( + GRPC_CLOSURE_INIT(&op->handler_private.closure, + perform_transport_op_locked, op, nullptr), + absl::OkStatus()); } // @@ -2480,16 +2513,17 @@ static grpc_error_handle try_http_parsing(grpc_chttp2_transport* t) { return error; } -static void read_action(void* tp, grpc_error_handle error) { - grpc_chttp2_transport* t = static_cast(tp); - t->combiner->Run( - GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr), - error); +static void read_action(grpc_core::RefCountedPtr t, + grpc_error_handle error) { + auto* tp = t.get(); + tp->combiner->Run(grpc_core::InitTransportClosure( + std::move(t), &tp->read_action_locked), + error); } -static void read_action_locked(void* tp, grpc_error_handle error) { - grpc_chttp2_transport* t = static_cast(tp); - +static void read_action_locked( + grpc_core::RefCountedPtr t, + grpc_error_handle error) { grpc_error_handle err = error; if (!err.ok()) { err = grpc_error_set_int( @@ -2501,10 +2535,10 @@ static void read_action_locked(void* tp, grpc_error_handle error) { size_t i = 0; grpc_error_handle errors[3] = {error, absl::OkStatus(), absl::OkStatus()}; for (; i < t->read_buffer.count && errors[1] == absl::OkStatus(); i++) { - errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]); + errors[1] = grpc_chttp2_perform_read(t.get(), t->read_buffer.slices[i]); } if (errors[1] != absl::OkStatus()) { - errors[2] = try_http_parsing(t); + errors[2] = try_http_parsing(t.get()); error = GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors, GPR_ARRAY_SIZE(errors)); } @@ -2512,10 +2546,11 @@ static void read_action_locked(void* tp, grpc_error_handle error) { if (t->initial_window_update != 0) { if (t->initial_window_update > 0) { grpc_chttp2_stream* s; - while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { - grpc_chttp2_mark_stream_writable(t, s); + while (grpc_chttp2_list_pop_stalled_by_stream(t.get(), &s)) { + grpc_chttp2_mark_stream_writable(t.get(), s); grpc_chttp2_initiate_write( - t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING); + t.get(), + GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING); } } t->initial_window_update = 0; @@ -2534,13 +2569,13 @@ static void read_action_locked(void* tp, grpc_error_handle error) { error = grpc_error_add_child(error, t->goaway_error); } - close_transport_locked(t, error); + close_transport_locked(t.get(), error); t->endpoint_reading = 0; } else if (t->closed_with_error.ok()) { keep_reading = true; // Since we have read a byte, reset the keepalive timer if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { - maybe_reset_keepalive_ping_timer_locked(t); + maybe_reset_keepalive_ping_timer_locked(t.get()); } } grpc_slice_buffer_reset_and_unref(&t->read_buffer); @@ -2552,45 +2587,48 @@ static void read_action_locked(void* tp, grpc_error_handle error) { gpr_log(GPR_INFO, "transport %p : Pausing reading due to too " "many unwritten SETTINGS ACK and RST_STREAM frames", - t)); + t.get())); } else { - continue_read_action_locked(t); + continue_read_action_locked(std::move(t)); } - } else { - GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action"); } } -static void continue_read_action_locked(grpc_chttp2_transport* t) { +static void continue_read_action_locked( + grpc_core::RefCountedPtr t) { const bool urgent = !t->goaway_error.ok(); - GRPC_CLOSURE_INIT(&t->read_action_locked, read_action, t, - grpc_schedule_on_exec_ctx); - grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent, - grpc_chttp2_min_read_progress_size(t)); + auto* tp = t.get(); + grpc_endpoint_read(tp->ep, &tp->read_buffer, + grpc_core::InitTransportClosure( + std::move(t), &tp->read_action_locked), + urgent, grpc_chttp2_min_read_progress_size(tp)); } // t is reffed prior to calling the first time, and once the callback chain // that kicks off finishes, it's unreffed -void schedule_bdp_ping_locked(grpc_chttp2_transport* t) { - t->flow_control.bdp_estimator()->SchedulePing(); - send_ping_locked( - t, - GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping, t, - grpc_schedule_on_exec_ctx), - GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping, t, - grpc_schedule_on_exec_ctx)); - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_BDP_PING); -} - -static void start_bdp_ping(void* tp, grpc_error_handle error) { - grpc_chttp2_transport* t = static_cast(tp); - t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, - start_bdp_ping_locked, t, nullptr), - error); -} - -static void start_bdp_ping_locked(void* tp, grpc_error_handle error) { - grpc_chttp2_transport* t = static_cast(tp); +void schedule_bdp_ping_locked( + grpc_core::RefCountedPtr t) { + auto* tp = t.get(); + tp->flow_control.bdp_estimator()->SchedulePing(); + send_ping_locked(tp, + grpc_core::InitTransportClosure( + tp->Ref(), &tp->start_bdp_ping_locked), + grpc_core::InitTransportClosure( + std::move(t), &tp->finish_bdp_ping_locked)); + grpc_chttp2_initiate_write(tp, GRPC_CHTTP2_INITIATE_WRITE_BDP_PING); +} + +static void start_bdp_ping(grpc_core::RefCountedPtr t, + grpc_error_handle error) { + grpc_chttp2_transport* tp = t.get(); + tp->combiner->Run(grpc_core::InitTransportClosure( + std::move(t), &tp->start_bdp_ping_locked), + error); +} + +static void start_bdp_ping_locked( + grpc_core::RefCountedPtr t, + grpc_error_handle error) { if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", std::string(t->peer_string.as_string_view()).c_str(), @@ -2601,71 +2639,69 @@ static void start_bdp_ping_locked(void* tp, grpc_error_handle error) { } // Reset the keepalive ping timer if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { - maybe_reset_keepalive_ping_timer_locked(t); + maybe_reset_keepalive_ping_timer_locked(t.get()); } t->flow_control.bdp_estimator()->StartPing(); t->bdp_ping_started = true; } -static void finish_bdp_ping(void* tp, grpc_error_handle error) { - grpc_chttp2_transport* t = static_cast(tp); - t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, - finish_bdp_ping_locked, t, nullptr), - error); +static void finish_bdp_ping(grpc_core::RefCountedPtr t, + grpc_error_handle error) { + grpc_chttp2_transport* tp = t.get(); + tp->combiner->Run(grpc_core::InitTransportClosure( + std::move(t), &tp->finish_bdp_ping_locked), + error); } -static void finish_bdp_ping_locked(void* tp, grpc_error_handle error) { - grpc_chttp2_transport* t = static_cast(tp); +static void finish_bdp_ping_locked( + grpc_core::RefCountedPtr t, + grpc_error_handle error) { if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", std::string(t->peer_string.as_string_view()).c_str(), grpc_core::StatusToString(error).c_str()); } if (!error.ok() || !t->closed_with_error.ok()) { - GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); return; } if (!t->bdp_ping_started) { // start_bdp_ping_locked has not been run yet. Schedule // finish_bdp_ping_locked to be run later. - t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, - finish_bdp_ping_locked, t, nullptr), - error); + finish_bdp_ping(std::move(t), std::move(error)); return; } t->bdp_ping_started = false; grpc_core::Timestamp next_ping = t->flow_control.bdp_estimator()->CompletePing(); - grpc_chttp2_act_on_flowctl_action(t->flow_control.PeriodicUpdate(), t, + grpc_chttp2_act_on_flowctl_action(t->flow_control.PeriodicUpdate(), t.get(), nullptr); GPR_ASSERT(!t->next_bdp_ping_timer_handle.has_value()); t->next_bdp_ping_timer_handle = t->event_engine->RunAfter(next_ping - grpc_core::Timestamp::Now(), [t] { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; - next_bdp_ping_timer_expired(t); + next_bdp_ping_timer_expired(t.get()); }); } static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t) { t->combiner->Run( - GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked, - next_bdp_ping_timer_expired_locked, t, nullptr), + grpc_core::InitTransportClosure( + t->Ref(), &t->next_bdp_ping_timer_expired_locked), absl::OkStatus()); } static void next_bdp_ping_timer_expired_locked( - void* tp, GRPC_UNUSED grpc_error_handle error) { + grpc_core::RefCountedPtr t, + GRPC_UNUSED grpc_error_handle error) { GPR_DEBUG_ASSERT(error.ok()); - grpc_chttp2_transport* t = static_cast(tp); GPR_ASSERT(t->next_bdp_ping_timer_handle.has_value()); t->next_bdp_ping_timer_handle.reset(); if (t->flow_control.bdp_estimator()->accumulator() == 0) { // Block the bdp ping till we receive more data. t->bdp_ping_blocked = true; - GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); } else { - schedule_bdp_ping_locked(t); + schedule_bdp_ping_locked(std::move(t)); } } @@ -2716,16 +2752,18 @@ void grpc_chttp2_config_default_keepalive_args( grpc_core::Chttp2PingRatePolicy::SetDefaults(channel_args); } -static void init_keepalive_ping(grpc_chttp2_transport* t) { - t->combiner->Run(GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, - init_keepalive_ping_locked, t, nullptr), - absl::OkStatus()); +static void init_keepalive_ping( + grpc_core::RefCountedPtr t) { + auto* tp = t.get(); + tp->combiner->Run(grpc_core::InitTransportClosure( + std::move(t), &tp->init_keepalive_ping_locked), + absl::OkStatus()); } -static void init_keepalive_ping_locked(void* arg, - GRPC_UNUSED grpc_error_handle error) { +static void init_keepalive_ping_locked( + grpc_core::RefCountedPtr t, + GRPC_UNUSED grpc_error_handle error) { GPR_DEBUG_ASSERT(error.ok()); - grpc_chttp2_transport* t = static_cast(arg); GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); GPR_ASSERT(t->keepalive_ping_timer_handle.has_value()); t->keepalive_ping_timer_handle.reset(); @@ -2734,11 +2772,10 @@ static void init_keepalive_ping_locked(void* arg, } else { if (t->keepalive_permit_without_calls || !t->stream_map.empty()) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; - GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); send_keepalive_ping_locked(t); - grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); + grpc_chttp2_initiate_write(t.get(), + GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); } else { - GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); t->keepalive_ping_timer_handle = t->event_engine->RunAfter(t->keepalive_time, [t] { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; @@ -2747,18 +2784,21 @@ static void init_keepalive_ping_locked(void* arg, }); } } - GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); } -static void start_keepalive_ping(void* arg, grpc_error_handle error) { - grpc_chttp2_transport* t = static_cast(arg); - t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, - start_keepalive_ping_locked, t, nullptr), - error); +static void start_keepalive_ping( + grpc_core::RefCountedPtr t, + grpc_error_handle error) { + auto* tp = t.get(); + tp->combiner->Run( + grpc_core::InitTransportClosure( + std::move(t), &tp->start_keepalive_ping_locked), + error); } -static void start_keepalive_ping_locked(void* arg, grpc_error_handle error) { - grpc_chttp2_transport* t = static_cast(arg); +static void start_keepalive_ping_locked( + grpc_core::RefCountedPtr t, + grpc_error_handle error) { if (!error.ok()) { return; } @@ -2770,25 +2810,28 @@ static void start_keepalive_ping_locked(void* arg, grpc_error_handle error) { gpr_log(GPR_INFO, "%s: Start keepalive ping", std::string(t->peer_string.as_string_view()).c_str()); } - GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); t->keepalive_watchdog_timer_handle = - t->event_engine->RunAfter(t->keepalive_timeout, [t] { + t->event_engine->RunAfter(t->keepalive_timeout, [t]() mutable { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; - keepalive_watchdog_fired(t); + keepalive_watchdog_fired(std::move(t)); }); t->keepalive_ping_started = true; } -static void finish_keepalive_ping(void* arg, grpc_error_handle error) { - grpc_chttp2_transport* t = static_cast(arg); - t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, - finish_keepalive_ping_locked, t, nullptr), - error); +static void finish_keepalive_ping( + grpc_core::RefCountedPtr t, + grpc_error_handle error) { + auto* tp = t.get(); + tp->combiner->Run( + grpc_core::InitTransportClosure( + std::move(t), &tp->finish_keepalive_ping_locked), + error); } -static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error) { - grpc_chttp2_transport* t = static_cast(arg); +static void finish_keepalive_ping_locked( + grpc_core::RefCountedPtr t, + grpc_error_handle error) { if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (error.ok()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) || @@ -2799,22 +2842,17 @@ static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error) { if (!t->keepalive_ping_started) { // start_keepalive_ping_locked has not run yet. Reschedule // finish_keepalive_ping_locked for it to be run later. - t->combiner->Run( - GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, - finish_keepalive_ping_locked, t, nullptr), - error); + finish_keepalive_ping(std::move(t), std::move(error)); return; } t->keepalive_ping_started = false; t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; if (t->keepalive_watchdog_timer_handle.has_value()) { if (t->event_engine->Cancel(*t->keepalive_watchdog_timer_handle)) { - GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); t->keepalive_watchdog_timer_handle.reset(); } } GPR_ASSERT(!t->keepalive_ping_timer_handle.has_value()); - GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); t->keepalive_ping_timer_handle = t->event_engine->RunAfter(t->keepalive_time, [t] { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; @@ -2823,20 +2861,21 @@ static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error) { }); } } - GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end"); } -static void keepalive_watchdog_fired(grpc_chttp2_transport* t) { - t->combiner->Run( - GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked, - keepalive_watchdog_fired_locked, t, nullptr), +static void keepalive_watchdog_fired( + grpc_core::RefCountedPtr t) { + auto* tp = t.get(); + tp->combiner->Run( + grpc_core::InitTransportClosure( + std::move(t), &tp->keepalive_watchdog_fired_locked), absl::OkStatus()); } static void keepalive_watchdog_fired_locked( - void* arg, GRPC_UNUSED grpc_error_handle error) { + grpc_core::RefCountedPtr t, + GRPC_UNUSED grpc_error_handle error) { GPR_DEBUG_ASSERT(error.ok()); - grpc_chttp2_transport* t = static_cast(arg); GPR_ASSERT(t->keepalive_watchdog_timer_handle.has_value()); t->keepalive_watchdog_timer_handle.reset(); if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { @@ -2844,9 +2883,10 @@ static void keepalive_watchdog_fired_locked( std::string(t->peer_string.as_string_view()).c_str()); t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; close_transport_locked( - t, grpc_error_set_int(GRPC_ERROR_CREATE("keepalive watchdog timeout"), - grpc_core::StatusIntProperty::kRpcStatus, - GRPC_STATUS_UNAVAILABLE)); + t.get(), + grpc_error_set_int(GRPC_ERROR_CREATE("keepalive watchdog timeout"), + grpc_core::StatusIntProperty::kRpcStatus, + GRPC_STATUS_UNAVAILABLE)); } else { // If keepalive_state is not PINGING, we consider it as an error. Maybe the // cancellation failed in finish_keepalive_ping_locked. Users have seen @@ -2854,7 +2894,6 @@ static void keepalive_watchdog_fired_locked( gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); } - GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); } static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t) { @@ -2867,11 +2906,11 @@ static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t) { gpr_log(GPR_INFO, "%s: Keepalive ping cancelled. Resetting timer.", std::string(t->peer_string.as_string_view()).c_str()); } - t->keepalive_ping_timer_handle = - t->event_engine->RunAfter(t->keepalive_time, [t] { + t->keepalive_ping_timer_handle = t->event_engine->RunAfter( + t->keepalive_time, [t = t->Ref()]() mutable { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; - init_keepalive_ping(t); + init_keepalive_ping(std::move(t)); }); } } @@ -2914,18 +2953,17 @@ static void set_pollset_set(grpc_transport* gt, grpc_stream* /*gs*/, static void post_benign_reclaimer(grpc_chttp2_transport* t) { if (!t->benign_reclaimer_registered) { t->benign_reclaimer_registered = true; - GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer"); t->memory_owner.PostReclaimer( grpc_core::ReclamationPass::kBenign, - [t](absl::optional sweep) { + [t = t->Ref()]( + absl::optional sweep) mutable { if (sweep.has_value()) { - GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, - benign_reclaimer_locked, t, - grpc_schedule_on_exec_ctx); - t->active_reclamation = std::move(*sweep); - t->combiner->Run(&t->benign_reclaimer_locked, absl::OkStatus()); - } else { - GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer"); + auto* tp = t.get(); + tp->active_reclamation = std::move(*sweep); + tp->combiner->Run( + grpc_core::InitTransportClosure( + std::move(t), &tp->benign_reclaimer_locked), + absl::OkStatus()); } }); } @@ -2934,26 +2972,25 @@ static void post_benign_reclaimer(grpc_chttp2_transport* t) { static void post_destructive_reclaimer(grpc_chttp2_transport* t) { if (!t->destructive_reclaimer_registered) { t->destructive_reclaimer_registered = true; - GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer"); t->memory_owner.PostReclaimer( grpc_core::ReclamationPass::kDestructive, - [t](absl::optional sweep) { + [t = t->Ref()]( + absl::optional sweep) mutable { if (sweep.has_value()) { - GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, - destructive_reclaimer_locked, t, - grpc_schedule_on_exec_ctx); - t->active_reclamation = std::move(*sweep); - t->combiner->Run(&t->destructive_reclaimer_locked, - absl::OkStatus()); - } else { - GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer"); + auto* tp = t.get(); + tp->active_reclamation = std::move(*sweep); + tp->combiner->Run( + grpc_core::InitTransportClosure( + std::move(t), &tp->destructive_reclaimer_locked), + absl::OkStatus()); } }); } } -static void benign_reclaimer_locked(void* arg, grpc_error_handle error) { - grpc_chttp2_transport* t = static_cast(arg); +static void benign_reclaimer_locked( + grpc_core::RefCountedPtr t, + grpc_error_handle error) { if (error.ok() && t->stream_map.empty()) { // Channel with no active streams: send a goaway to try and make it // disconnect cleanly @@ -2961,7 +2998,7 @@ static void benign_reclaimer_locked(void* arg, grpc_error_handle error) { gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory", std::string(t->peer_string.as_string_view()).c_str()); } - send_goaway(t, + send_goaway(t.get(), grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"), grpc_core::StatusIntProperty::kHttp2Error, GRPC_HTTP2_ENHANCE_YOUR_CALM), @@ -2977,11 +3014,11 @@ static void benign_reclaimer_locked(void* arg, grpc_error_handle error) { if (error != absl::CancelledError()) { t->active_reclamation.Finish(); } - GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer"); } -static void destructive_reclaimer_locked(void* arg, grpc_error_handle error) { - grpc_chttp2_transport* t = static_cast(arg); +static void destructive_reclaimer_locked( + grpc_core::RefCountedPtr t, + grpc_error_handle error) { t->destructive_reclaimer_registered = false; if (error.ok() && !t->stream_map.empty()) { // As stream_map is a hash map, this selects effectively a random stream. @@ -2991,7 +3028,7 @@ static void destructive_reclaimer_locked(void* arg, grpc_error_handle error) { std::string(t->peer_string.as_string_view()).c_str(), s->id); } grpc_chttp2_cancel_stream( - t, s, + t.get(), s, grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"), grpc_core::StatusIntProperty::kHttp2Error, GRPC_HTTP2_ENHANCE_YOUR_CALM)); @@ -3000,13 +3037,12 @@ static void destructive_reclaimer_locked(void* arg, grpc_error_handle error) { // there are more streams left, we can immediately post a new // reclaimer in case the resource quota needs to free more // memory - post_destructive_reclaimer(t); + post_destructive_reclaimer(t.get()); } } if (error != absl::CancelledError()) { t->active_reclamation.Finish(); } - GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer"); } // @@ -3100,17 +3136,15 @@ grpc_transport* grpc_create_chttp2_transport( void grpc_chttp2_transport_start_reading( grpc_transport* transport, grpc_slice_buffer* read_buffer, grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close) { - grpc_chttp2_transport* t = - reinterpret_cast(transport); - GRPC_CHTTP2_REF_TRANSPORT( - t, "reading_action"); // matches unref inside reading_action + auto t = reinterpret_cast(transport)->Ref(); if (read_buffer != nullptr) { grpc_slice_buffer_move_into(read_buffer, &t->read_buffer); gpr_free(read_buffer); } - t->combiner->Run( - grpc_core::NewClosure([t, notify_on_receive_settings, - notify_on_close](grpc_error_handle) { + auto* tp = t.get(); + tp->combiner->Run( + grpc_core::NewClosure([t = std::move(t), notify_on_receive_settings, + notify_on_close](grpc_error_handle) mutable { if (!t->closed_with_error.ok()) { if (notify_on_receive_settings != nullptr) { grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_receive_settings, @@ -3120,12 +3154,11 @@ void grpc_chttp2_transport_start_reading( grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_close, t->closed_with_error); } - GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action"); return; } t->notify_on_receive_settings = notify_on_receive_settings; t->notify_on_close = notify_on_close; - read_action_locked(t, absl::OkStatus()); + read_action_locked(std::move(t), absl::OkStatus()); }), absl::OkStatus()); } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 807a105ebda..5f67941c67b 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -245,6 +245,19 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { grpc_endpoint* ep, bool is_client); ~grpc_chttp2_transport(); + // Make this be able to be contained in RefCountedPtr<> + // Can't yet make this derive from RefCounted because we need to keep + // `grpc_transport base` first. + // TODO(ctiller): Make a transport interface. + void IncrementRefCount() { refs.Ref(); } + void Unref() { + if (refs.Unref()) delete this; + } + grpc_core::RefCountedPtr Ref() { + IncrementRefCount(); + return grpc_core::RefCountedPtr(this); + } + grpc_transport base; // must be first grpc_core::RefCount refs; grpc_endpoint* ep; @@ -483,14 +496,8 @@ struct grpc_chttp2_stream { ~grpc_chttp2_stream(); void* context; - grpc_chttp2_transport* t; + const grpc_core::RefCountedPtr t; grpc_stream_refcount* refcount; - // Reffer is a 0-len structure, simply reffing `t` and `refcount` in its ctor - // before initializing the rest of the stream, to avoid cache misses. This - // field MUST be right after `t` and `refcount`. - struct Reffer { - explicit Reffer(grpc_chttp2_stream* s); - } reffer; grpc_closure destroy_stream; grpc_closure* destroy_stream_arg; @@ -736,36 +743,6 @@ void grpc_chttp2_stream_ref(grpc_chttp2_stream* s); void grpc_chttp2_stream_unref(grpc_chttp2_stream* s); #endif -#ifndef NDEBUG -#define GRPC_CHTTP2_REF_TRANSPORT(t, r) \ - grpc_chttp2_ref_transport(t, r, __FILE__, __LINE__) -#define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) \ - grpc_chttp2_unref_transport(t, r, __FILE__, __LINE__) -inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t, - const char* reason, const char* file, - int line) { - if (t->refs.Unref(grpc_core::DebugLocation(file, line), reason)) { - delete t; - } -} -inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t, - const char* reason, const char* file, - int line) { - t->refs.Ref(grpc_core::DebugLocation(file, line), reason); -} -#else -#define GRPC_CHTTP2_REF_TRANSPORT(t, r) grpc_chttp2_ref_transport(t) -#define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) grpc_chttp2_unref_transport(t) -inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t) { - if (t->refs.Unref()) { - delete t; - } -} -inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { - t->refs.Ref(); -} -#endif - void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id); /// Sends GOAWAY with error code ENHANCE_YOUR_CALM and additional debug data @@ -804,9 +781,11 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, void grpc_chttp2_config_default_keepalive_args( const grpc_core::ChannelArgs& channel_args, bool is_client); -void grpc_chttp2_retry_initiate_ping(grpc_chttp2_transport* t); +void grpc_chttp2_retry_initiate_ping( + grpc_core::RefCountedPtr t); -void schedule_bdp_ping_locked(grpc_chttp2_transport* t); +void schedule_bdp_ping_locked( + grpc_core::RefCountedPtr t); uint32_t grpc_chttp2_min_read_progress_size(grpc_chttp2_transport* t); diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index f5ed79108e9..ff5bf80427c 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -510,8 +510,7 @@ static grpc_error_handle init_data_frame_parser(grpc_chttp2_transport* t) { if (bdp_est) { if (t->bdp_ping_blocked) { t->bdp_ping_blocked = false; - GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); - schedule_bdp_ping_locked(t); + schedule_bdp_ping_locked(t->Ref()); } bdp_est->AddIncomingBytes(t->incoming_frame_size); } @@ -841,7 +840,7 @@ static const maybe_complete_func_type maybe_complete_funcs[] = { static void force_client_rst_stream(void* sp, grpc_error_handle /*error*/) { grpc_chttp2_stream* s = static_cast(sp); - grpc_chttp2_transport* t = s->t; + grpc_chttp2_transport* t = s->t.get(); if (!s->write_closed) { grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing); diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index e949f410246..89aec5539d9 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -173,12 +173,11 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) { too_soon.wait.ToString().c_str()); } if (!t->delayed_ping_timer_handle.has_value()) { - GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked"); - t->delayed_ping_timer_handle = - t->event_engine->RunAfter(too_soon.wait, [t] { + t->delayed_ping_timer_handle = t->event_engine->RunAfter( + too_soon.wait, [t = t->Ref()]() mutable { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; - grpc_chttp2_retry_initiate_ping(t); + grpc_chttp2_retry_initiate_ping(std::move(t)); }); } });