[chttp2] Use RefCountedPtr for grpc_chttp2_transport (#33746)

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/33909/head
Craig Tiller 1 year ago committed by GitHub
parent 0e9553cf4e
commit 6b2de0fa4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      include/grpc/event_engine/memory_allocator.h
  2. 19
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  3. 619
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  4. 57
      src/core/ext/transport/chttp2/transport/internal.h
  5. 5
      src/core/ext/transport/chttp2/transport/parsing.cc
  6. 7
      src/core/ext/transport/chttp2/transport/writing.cc

@ -56,8 +56,8 @@ class MemoryAllocator {
/// The object will not be usable after this call unless it's a valid
/// allocator is moved into it.
void Reset() {
if (allocator_ != nullptr) allocator_->Shutdown();
allocator_.reset();
auto a = std::move(allocator_);
if (a != nullptr) a->Shutdown();
}
/// Reserve bytes from the quota.

@ -210,7 +210,8 @@ class Chttp2ServerListener : public Server::ListenerInterface {
OrphanablePtr<HandshakingState> handshaking_state_ ABSL_GUARDED_BY(&mu_);
// Set by HandshakingState when handshaking is done and a valid transport
// is created.
grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr;
RefCountedPtr<grpc_chttp2_transport> transport_ ABSL_GUARDED_BY(&mu_) =
nullptr;
grpc_closure on_close_;
absl::optional<EventEngine::TaskHandle> drain_grace_timer_handle_
ABSL_GUARDED_BY(&mu_);
@ -419,7 +420,7 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout() {
{
MutexLock lock(&connection_->mu_);
if (timer_handle_.has_value()) {
transport = connection_->transport_;
transport = connection_->transport_.get();
timer_handle_.reset();
}
}
@ -490,9 +491,7 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
// TODO(roth): Change to static_cast<> when we C++-ify the
// transport API.
self->connection_->transport_ =
reinterpret_cast<grpc_chttp2_transport*>(transport);
GRPC_CHTTP2_REF_TRANSPORT(self->connection_->transport_,
"ActiveConnection"); // Held by connection_
reinterpret_cast<grpc_chttp2_transport*>(transport)->Ref();
self->Ref().release(); // Held by OnReceiveSettings().
GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings,
self, grpc_schedule_on_exec_ctx);
@ -572,11 +571,7 @@ Chttp2ServerListener::ActiveConnection::ActiveConnection(
grpc_schedule_on_exec_ctx);
}
Chttp2ServerListener::ActiveConnection::~ActiveConnection() {
if (transport_ != nullptr) {
GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "ActiveConnection");
}
}
Chttp2ServerListener::ActiveConnection::~ActiveConnection() {}
void Chttp2ServerListener::ActiveConnection::Orphan() {
OrphanablePtr<HandshakingState> handshaking_state;
@ -595,7 +590,7 @@ void Chttp2ServerListener::ActiveConnection::SendGoAway() {
{
MutexLock lock(&mu_);
if (transport_ != nullptr && !shutdown_) {
transport = transport_;
transport = transport_.get();
drain_grace_timer_handle_ = event_engine_->RunAfter(
std::max(Duration::Zero(),
listener_->args_
@ -667,7 +662,7 @@ void Chttp2ServerListener::ActiveConnection::OnDrainGraceTimeExpiry() {
{
MutexLock lock(&mu_);
if (drain_grace_timer_handle_.has_value()) {
transport = transport_;
transport = transport_.get();
drain_grace_timer_handle_.reset();
}
}

File diff suppressed because it is too large Load Diff

@ -245,6 +245,19 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
grpc_endpoint* ep, bool is_client);
~grpc_chttp2_transport();
// Make this be able to be contained in RefCountedPtr<>
// Can't yet make this derive from RefCounted because we need to keep
// `grpc_transport base` first.
// TODO(ctiller): Make a transport interface.
void IncrementRefCount() { refs.Ref(); }
void Unref() {
if (refs.Unref()) delete this;
}
grpc_core::RefCountedPtr<grpc_chttp2_transport> Ref() {
IncrementRefCount();
return grpc_core::RefCountedPtr<grpc_chttp2_transport>(this);
}
grpc_transport base; // must be first
grpc_core::RefCount refs;
grpc_endpoint* ep;
@ -483,14 +496,8 @@ struct grpc_chttp2_stream {
~grpc_chttp2_stream();
void* context;
grpc_chttp2_transport* t;
const grpc_core::RefCountedPtr<grpc_chttp2_transport> t;
grpc_stream_refcount* refcount;
// Reffer is a 0-len structure, simply reffing `t` and `refcount` in its ctor
// before initializing the rest of the stream, to avoid cache misses. This
// field MUST be right after `t` and `refcount`.
struct Reffer {
explicit Reffer(grpc_chttp2_stream* s);
} reffer;
grpc_closure destroy_stream;
grpc_closure* destroy_stream_arg;
@ -736,36 +743,6 @@ void grpc_chttp2_stream_ref(grpc_chttp2_stream* s);
void grpc_chttp2_stream_unref(grpc_chttp2_stream* s);
#endif
#ifndef NDEBUG
#define GRPC_CHTTP2_REF_TRANSPORT(t, r) \
grpc_chttp2_ref_transport(t, r, __FILE__, __LINE__)
#define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) \
grpc_chttp2_unref_transport(t, r, __FILE__, __LINE__)
inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t,
const char* reason, const char* file,
int line) {
if (t->refs.Unref(grpc_core::DebugLocation(file, line), reason)) {
delete t;
}
}
inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t,
const char* reason, const char* file,
int line) {
t->refs.Ref(grpc_core::DebugLocation(file, line), reason);
}
#else
#define GRPC_CHTTP2_REF_TRANSPORT(t, r) grpc_chttp2_ref_transport(t)
#define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) grpc_chttp2_unref_transport(t)
inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t) {
if (t->refs.Unref()) {
delete t;
}
}
inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) {
t->refs.Ref();
}
#endif
void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id);
/// Sends GOAWAY with error code ENHANCE_YOUR_CALM and additional debug data
@ -804,9 +781,11 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
void grpc_chttp2_config_default_keepalive_args(
const grpc_core::ChannelArgs& channel_args, bool is_client);
void grpc_chttp2_retry_initiate_ping(grpc_chttp2_transport* t);
void grpc_chttp2_retry_initiate_ping(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
void schedule_bdp_ping_locked(grpc_chttp2_transport* t);
void schedule_bdp_ping_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
uint32_t grpc_chttp2_min_read_progress_size(grpc_chttp2_transport* t);

@ -510,8 +510,7 @@ static grpc_error_handle init_data_frame_parser(grpc_chttp2_transport* t) {
if (bdp_est) {
if (t->bdp_ping_blocked) {
t->bdp_ping_blocked = false;
GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
schedule_bdp_ping_locked(t);
schedule_bdp_ping_locked(t->Ref());
}
bdp_est->AddIncomingBytes(t->incoming_frame_size);
}
@ -841,7 +840,7 @@ static const maybe_complete_func_type maybe_complete_funcs[] = {
static void force_client_rst_stream(void* sp, grpc_error_handle /*error*/) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
grpc_chttp2_transport* t = s->t;
grpc_chttp2_transport* t = s->t.get();
if (!s->write_closed) {
grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
&s->stats.outgoing);

@ -173,12 +173,11 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
too_soon.wait.ToString().c_str());
}
if (!t->delayed_ping_timer_handle.has_value()) {
GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked");
t->delayed_ping_timer_handle =
t->event_engine->RunAfter(too_soon.wait, [t] {
t->delayed_ping_timer_handle = t->event_engine->RunAfter(
too_soon.wait, [t = t->Ref()]() mutable {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_chttp2_retry_initiate_ping(t);
grpc_chttp2_retry_initiate_ping(std::move(t));
});
}
});

Loading…
Cancel
Save