[transport] Centralize ref-counting between transports (#36460)

Closes #36460

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36460 from ctiller:transport-refs-2 b43cdc63ac
PiperOrigin-RevId: 629561507
pull/36492/head
Craig Tiller 7 months ago committed by Copybara-Service
parent 6e981d7460
commit 03e2bf22c5
  1. 2
      src/core/ext/transport/chaotic_good/client_transport.h
  2. 2
      src/core/ext/transport/chaotic_good/server_transport.h
  3. 9
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  4. 12
      src/core/ext/transport/chttp2/transport/internal.h
  5. 28
      src/core/ext/transport/inproc/inproc_transport.cc
  6. 2
      src/core/lib/transport/transport.h

@ -85,7 +85,7 @@ class ChaoticGoodClientTransport final : public ClientTransport {
grpc_endpoint* GetEndpoint() override { return nullptr; }
void Orphan() override {
AbortWithError();
delete this;
Unref();
}
void StartCall(CallHandler call_handler) override;

@ -96,7 +96,7 @@ class ChaoticGoodServerTransport final : public ServerTransport {
void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
void PerformOp(grpc_transport_op*) override;
grpc_endpoint* GetEndpoint() override { return nullptr; }
void Orphan() override { delete this; }
void Orphan() override { Unref(); }
void SetAcceptor(Acceptor* acceptor) override;
void AbortWithError();

@ -142,8 +142,6 @@ static bool g_default_server_keepalive_permit_without_calls = false;
#define MAX_CLIENT_STREAM_ID 0x7fffffffu
grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive");
grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
"chttp2_refcount");
// forward declarations of various callbacks that we'll build closures around
static void write_action_begin_locked(
@ -594,12 +592,7 @@ static void init_keepalive_pings_if_enabled_locked(
grpc_chttp2_transport::grpc_chttp2_transport(
const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep,
bool is_client)
: grpc_core::RefCounted<grpc_chttp2_transport,
grpc_core::NonPolymorphicRefCount>(
GRPC_TRACE_FLAG_ENABLED(grpc_trace_chttp2_refcount)
? "chttp2_refcount"
: nullptr),
ep(ep),
: ep(ep),
peer_string(
grpc_core::Slice::FromCopiedString(grpc_endpoint_get_peer(ep))),
memory_owner(channel_args.GetObject<grpc_core::ResourceQuota>()

@ -223,17 +223,19 @@ typedef enum {
GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
} grpc_chttp2_keepalive_state;
struct grpc_chttp2_transport final
: public grpc_core::FilterStackTransport,
public grpc_core::RefCounted<grpc_chttp2_transport,
grpc_core::NonPolymorphicRefCount>,
public grpc_core::KeepsGrpcInitialized {
struct grpc_chttp2_transport final : public grpc_core::FilterStackTransport,
public grpc_core::KeepsGrpcInitialized {
grpc_chttp2_transport(const grpc_core::ChannelArgs& channel_args,
grpc_endpoint* ep, bool is_client);
~grpc_chttp2_transport() override;
void Orphan() override;
grpc_core::RefCountedPtr<grpc_chttp2_transport> Ref() {
return grpc_core::FilterStackTransport::RefAsSubclass<
grpc_chttp2_transport>();
}
size_t SizeOfStream() const override;
bool HackyDisableStreamOpBatchCoalescingInConnectedChannel() const override;
void PerformStreamOp(grpc_stream* gs,

@ -33,8 +33,9 @@
namespace grpc_core {
namespace {
class InprocServerTransport final : public RefCounted<InprocServerTransport>,
public ServerTransport {
class InprocClientTransport;
class InprocServerTransport final : public ServerTransport {
public:
void SetAcceptor(Acceptor* acceptor) override {
acceptor_ = acceptor;
@ -95,6 +96,8 @@ class InprocServerTransport final : public RefCounted<InprocServerTransport>,
return acceptor_->CreateCall(std::move(md), acceptor_->CreateArena());
}
OrphanablePtr<InprocClientTransport> MakeClientTransport();
private:
enum class ConnectionState : uint8_t { kInitial, kReady, kDisconnected };
@ -109,6 +112,10 @@ class InprocServerTransport final : public RefCounted<InprocServerTransport>,
class InprocClientTransport final : public ClientTransport {
public:
explicit InprocClientTransport(
RefCountedPtr<InprocServerTransport> server_transport)
: server_transport_(std::move(server_transport)) {}
void StartCall(CallHandler call_handler) override {
call_handler.SpawnGuarded(
"pull_initial_metadata",
@ -125,10 +132,6 @@ class InprocClientTransport final : public ClientTransport {
void Orphan() override { delete this; }
OrphanablePtr<Transport> GetServerTransport() {
return OrphanablePtr<Transport>(server_transport_->Ref().release());
}
FilterStackTransport* filter_stack_transport() override { return nullptr; }
ClientTransport* client_transport() override { return this; }
ServerTransport* server_transport() override { return nullptr; }
@ -144,8 +147,7 @@ class InprocClientTransport final : public ClientTransport {
absl::UnavailableError("Client transport closed"));
}
RefCountedPtr<InprocServerTransport> server_transport_ =
MakeRefCounted<InprocServerTransport>();
const RefCountedPtr<InprocServerTransport> server_transport_;
};
bool UsePromiseBasedTransport() {
@ -155,6 +157,12 @@ bool UsePromiseBasedTransport() {
return true;
}
OrphanablePtr<InprocClientTransport>
InprocServerTransport::MakeClientTransport() {
return MakeOrphanable<InprocClientTransport>(
RefAsSubclass<InprocServerTransport>());
}
OrphanablePtr<Channel> MakeLameChannel(absl::string_view why,
absl::Status error) {
gpr_log(GPR_ERROR, "%s: %s", std::string(why).c_str(),
@ -196,8 +204,8 @@ OrphanablePtr<Channel> MakeInprocChannel(Server* server,
std::pair<OrphanablePtr<Transport>, OrphanablePtr<Transport>>
MakeInProcessTransportPair() {
auto client_transport = MakeOrphanable<InprocClientTransport>();
auto server_transport = client_transport->GetServerTransport();
auto server_transport = MakeOrphanable<InprocServerTransport>();
auto client_transport = server_transport->MakeClientTransport();
return std::make_pair(std::move(client_transport),
std::move(server_transport));
}

@ -502,7 +502,7 @@ class FilterStackTransport;
class ClientTransport;
class ServerTransport;
class Transport : public Orphanable {
class Transport : public InternallyRefCounted<Transport> {
public:
struct RawPointerChannelArgTag {};
static absl::string_view ChannelArgName() { return GRPC_ARG_TRANSPORT; }

Loading…
Cancel
Save