From 03e2bf22c512a9f6e722d48bff5008c076727cbc Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 30 Apr 2024 16:25:46 -0700 Subject: [PATCH] [transport] Centralize ref-counting between transports (#36460) Closes #36460 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36460 from ctiller:transport-refs-2 b43cdc63ace6d425efcd34406a0e67d24d83c842 PiperOrigin-RevId: 629561507 --- .../transport/chaotic_good/client_transport.h | 2 +- .../transport/chaotic_good/server_transport.h | 2 +- .../chttp2/transport/chttp2_transport.cc | 9 +----- .../ext/transport/chttp2/transport/internal.h | 12 ++++---- .../ext/transport/inproc/inproc_transport.cc | 28 ++++++++++++------- src/core/lib/transport/transport.h | 2 +- 6 files changed, 29 insertions(+), 26 deletions(-) diff --git a/src/core/ext/transport/chaotic_good/client_transport.h b/src/core/ext/transport/chaotic_good/client_transport.h index d08929b0f73..f07099e296f 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.h +++ b/src/core/ext/transport/chaotic_good/client_transport.h @@ -85,7 +85,7 @@ class ChaoticGoodClientTransport final : public ClientTransport { grpc_endpoint* GetEndpoint() override { return nullptr; } void Orphan() override { AbortWithError(); - delete this; + Unref(); } void StartCall(CallHandler call_handler) override; diff --git a/src/core/ext/transport/chaotic_good/server_transport.h b/src/core/ext/transport/chaotic_good/server_transport.h index acdc88ef9fd..a34ac92b73e 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.h +++ b/src/core/ext/transport/chaotic_good/server_transport.h @@ -96,7 +96,7 @@ class ChaoticGoodServerTransport final : public ServerTransport { void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {} void PerformOp(grpc_transport_op*) override; grpc_endpoint* GetEndpoint() override { return nullptr; } - void Orphan() override { delete this; } + void Orphan() override { Unref(); } void SetAcceptor(Acceptor* acceptor) override; void AbortWithError(); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index dff513f47da..1af3a893c6a 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -142,8 +142,6 @@ static bool g_default_server_keepalive_permit_without_calls = false; #define MAX_CLIENT_STREAM_ID 0x7fffffffu grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive"); -grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false, - "chttp2_refcount"); // forward declarations of various callbacks that we'll build closures around static void write_action_begin_locked( @@ -594,12 +592,7 @@ static void init_keepalive_pings_if_enabled_locked( grpc_chttp2_transport::grpc_chttp2_transport( const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep, bool is_client) - : grpc_core::RefCounted( - GRPC_TRACE_FLAG_ENABLED(grpc_trace_chttp2_refcount) - ? "chttp2_refcount" - : nullptr), - ep(ep), + : ep(ep), peer_string( grpc_core::Slice::FromCopiedString(grpc_endpoint_get_peer(ep))), memory_owner(channel_args.GetObject() diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 4714d4c7370..058928a596d 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -223,17 +223,19 @@ typedef enum { GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED, } grpc_chttp2_keepalive_state; -struct grpc_chttp2_transport final - : public grpc_core::FilterStackTransport, - public grpc_core::RefCounted, - public grpc_core::KeepsGrpcInitialized { +struct grpc_chttp2_transport final : public grpc_core::FilterStackTransport, + public grpc_core::KeepsGrpcInitialized { grpc_chttp2_transport(const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep, bool is_client); ~grpc_chttp2_transport() override; void Orphan() override; + grpc_core::RefCountedPtr Ref() { + return grpc_core::FilterStackTransport::RefAsSubclass< + grpc_chttp2_transport>(); + } + size_t SizeOfStream() const override; bool HackyDisableStreamOpBatchCoalescingInConnectedChannel() const override; void PerformStreamOp(grpc_stream* gs, diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 731ea4c8309..6069283efb4 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -33,8 +33,9 @@ namespace grpc_core { namespace { -class InprocServerTransport final : public RefCounted, - public ServerTransport { +class InprocClientTransport; + +class InprocServerTransport final : public ServerTransport { public: void SetAcceptor(Acceptor* acceptor) override { acceptor_ = acceptor; @@ -95,6 +96,8 @@ class InprocServerTransport final : public RefCounted, return acceptor_->CreateCall(std::move(md), acceptor_->CreateArena()); } + OrphanablePtr MakeClientTransport(); + private: enum class ConnectionState : uint8_t { kInitial, kReady, kDisconnected }; @@ -109,6 +112,10 @@ class InprocServerTransport final : public RefCounted, class InprocClientTransport final : public ClientTransport { public: + explicit InprocClientTransport( + RefCountedPtr server_transport) + : server_transport_(std::move(server_transport)) {} + void StartCall(CallHandler call_handler) override { call_handler.SpawnGuarded( "pull_initial_metadata", @@ -125,10 +132,6 @@ class InprocClientTransport final : public ClientTransport { void Orphan() override { delete this; } - OrphanablePtr GetServerTransport() { - return OrphanablePtr(server_transport_->Ref().release()); - } - FilterStackTransport* filter_stack_transport() override { return nullptr; } ClientTransport* client_transport() override { return this; } ServerTransport* server_transport() override { return nullptr; } @@ -144,8 +147,7 @@ class InprocClientTransport final : public ClientTransport { absl::UnavailableError("Client transport closed")); } - RefCountedPtr server_transport_ = - MakeRefCounted(); + const RefCountedPtr server_transport_; }; bool UsePromiseBasedTransport() { @@ -155,6 +157,12 @@ bool UsePromiseBasedTransport() { return true; } +OrphanablePtr +InprocServerTransport::MakeClientTransport() { + return MakeOrphanable( + RefAsSubclass()); +} + OrphanablePtr MakeLameChannel(absl::string_view why, absl::Status error) { gpr_log(GPR_ERROR, "%s: %s", std::string(why).c_str(), @@ -196,8 +204,8 @@ OrphanablePtr MakeInprocChannel(Server* server, std::pair, OrphanablePtr> MakeInProcessTransportPair() { - auto client_transport = MakeOrphanable(); - auto server_transport = client_transport->GetServerTransport(); + auto server_transport = MakeOrphanable(); + auto client_transport = server_transport->MakeClientTransport(); return std::make_pair(std::move(client_transport), std::move(server_transport)); } diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index fe7f72ef1ed..65f57fdd475 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -502,7 +502,7 @@ class FilterStackTransport; class ClientTransport; class ServerTransport; -class Transport : public Orphanable { +class Transport : public InternallyRefCounted { public: struct RawPointerChannelArgTag {}; static absl::string_view ChannelArgName() { return GRPC_ARG_TRANSPORT; }