pull/36460/head
Craig Tiller 11 months ago
parent 5e408f1eb6
commit 45b3e8e814
  1. 2
      src/core/ext/transport/chaotic_good/client_transport.h
  2. 2
      src/core/ext/transport/chaotic_good/server_transport.h
  3. 9
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  4. 95
      src/core/ext/transport/inproc/inproc_transport.cc

@ -85,7 +85,7 @@ class ChaoticGoodClientTransport final : public ClientTransport {
grpc_endpoint* GetEndpoint() override { return nullptr; } grpc_endpoint* GetEndpoint() override { return nullptr; }
void Orphan() override { void Orphan() override {
AbortWithError(); AbortWithError();
delete this; Unref();
} }
void StartCall(CallHandler call_handler) override; void StartCall(CallHandler call_handler) override;

@ -96,7 +96,7 @@ class ChaoticGoodServerTransport final : public ServerTransport {
void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {} void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
void PerformOp(grpc_transport_op*) override; void PerformOp(grpc_transport_op*) override;
grpc_endpoint* GetEndpoint() override { return nullptr; } grpc_endpoint* GetEndpoint() override { return nullptr; }
void Orphan() override { delete this; } void Orphan() override { Unref(); }
void SetAcceptor(Acceptor* acceptor) override; void SetAcceptor(Acceptor* acceptor) override;
void AbortWithError(); void AbortWithError();

@ -142,8 +142,6 @@ static bool g_default_server_keepalive_permit_without_calls = false;
#define MAX_CLIENT_STREAM_ID 0x7fffffffu #define MAX_CLIENT_STREAM_ID 0x7fffffffu
grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive"); grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive");
grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
"chttp2_refcount");
// forward declarations of various callbacks that we'll build closures around // forward declarations of various callbacks that we'll build closures around
static void write_action_begin_locked( static void write_action_begin_locked(
@ -594,12 +592,7 @@ static void init_keepalive_pings_if_enabled_locked(
grpc_chttp2_transport::grpc_chttp2_transport( grpc_chttp2_transport::grpc_chttp2_transport(
const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep, const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep,
bool is_client) bool is_client)
: grpc_core::RefCounted<grpc_chttp2_transport, : ep(ep),
grpc_core::NonPolymorphicRefCount>(
GRPC_TRACE_FLAG_ENABLED(grpc_trace_chttp2_refcount)
? "chttp2_refcount"
: nullptr),
ep(ep),
peer_string( peer_string(
grpc_core::Slice::FromCopiedString(grpc_endpoint_get_peer(ep))), grpc_core::Slice::FromCopiedString(grpc_endpoint_get_peer(ep))),
memory_owner(channel_args.GetObject<grpc_core::ResourceQuota>() memory_owner(channel_args.GetObject<grpc_core::ResourceQuota>()

@ -33,45 +33,7 @@
namespace grpc_core { namespace grpc_core {
namespace { namespace {
class InprocClientTransport final : public ClientTransport { class InprocClientTransport;
public:
explicit InprocClientTransport(
RefCountedPtr<InprocServerTransport> server_transport)
: server_transport_(std::move(server_transport)) {}
void StartCall(CallHandler call_handler) override {
call_handler.SpawnGuarded(
"pull_initial_metadata",
TrySeq(call_handler.PullClientInitialMetadata(),
[server_transport = server_transport_,
call_handler](ClientMetadataHandle md) {
auto call_initiator =
server_transport->AcceptCall(std::move(md));
if (!call_initiator.ok()) return call_initiator.status();
ForwardCall(call_handler, std::move(*call_initiator));
return absl::OkStatus();
}));
}
void Orphan() override { delete this; }
FilterStackTransport* filter_stack_transport() override { return nullptr; }
ClientTransport* client_transport() override { return this; }
ServerTransport* server_transport() override { return nullptr; }
absl::string_view GetTransportName() const override { return "inproc"; }
void SetPollset(grpc_stream*, grpc_pollset*) override {}
void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
void PerformOp(grpc_transport_op*) override { Crash("unimplemented"); }
grpc_endpoint* GetEndpoint() override { return nullptr; }
private:
~InprocClientTransport() override {
server_transport_->Disconnect(
absl::UnavailableError("Client transport closed"));
}
const RefCountedPtr<InprocServerTransport> server_transport_;
};
class InprocServerTransport final : public ServerTransport { class InprocServerTransport final : public ServerTransport {
public: public:
@ -134,10 +96,7 @@ class InprocServerTransport final : public ServerTransport {
return acceptor_->CreateCall(std::move(md), acceptor_->CreateArena()); return acceptor_->CreateCall(std::move(md), acceptor_->CreateArena());
} }
OrphanablePtr<ClientTransport> MakeClientTransport() { OrphanablePtr<InprocClientTransport> MakeClientTransport();
return MakeOrphanable<InprocClientTransport>(
RefAsSubclass<InprocServerTransport>());
}
private: private:
enum class ConnectionState : uint8_t { kInitial, kReady, kDisconnected }; enum class ConnectionState : uint8_t { kInitial, kReady, kDisconnected };
@ -151,6 +110,46 @@ class InprocServerTransport final : public ServerTransport {
"inproc_server_transport", GRPC_CHANNEL_CONNECTING}; "inproc_server_transport", GRPC_CHANNEL_CONNECTING};
}; };
class InprocClientTransport final : public ClientTransport {
public:
explicit InprocClientTransport(
RefCountedPtr<InprocServerTransport> server_transport)
: server_transport_(std::move(server_transport)) {}
void StartCall(CallHandler call_handler) override {
call_handler.SpawnGuarded(
"pull_initial_metadata",
TrySeq(call_handler.PullClientInitialMetadata(),
[server_transport = server_transport_,
call_handler](ClientMetadataHandle md) {
auto call_initiator =
server_transport->AcceptCall(std::move(md));
if (!call_initiator.ok()) return call_initiator.status();
ForwardCall(call_handler, std::move(*call_initiator));
return absl::OkStatus();
}));
}
void Orphan() override { delete this; }
FilterStackTransport* filter_stack_transport() override { return nullptr; }
ClientTransport* client_transport() override { return this; }
ServerTransport* server_transport() override { return nullptr; }
absl::string_view GetTransportName() const override { return "inproc"; }
void SetPollset(grpc_stream*, grpc_pollset*) override {}
void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
void PerformOp(grpc_transport_op*) override { Crash("unimplemented"); }
grpc_endpoint* GetEndpoint() override { return nullptr; }
private:
~InprocClientTransport() override {
server_transport_->Disconnect(
absl::UnavailableError("Client transport closed"));
}
const RefCountedPtr<InprocServerTransport> server_transport_;
};
bool UsePromiseBasedTransport() { bool UsePromiseBasedTransport() {
if (!IsPromiseBasedInprocTransportEnabled()) return false; if (!IsPromiseBasedInprocTransportEnabled()) return false;
GPR_ASSERT(IsPromiseBasedClientCallEnabled()); GPR_ASSERT(IsPromiseBasedClientCallEnabled());
@ -158,6 +157,12 @@ bool UsePromiseBasedTransport() {
return true; return true;
} }
OrphanablePtr<InprocClientTransport>
InprocServerTransport::MakeClientTransport() {
return MakeOrphanable<InprocClientTransport>(
RefAsSubclass<InprocServerTransport>());
}
OrphanablePtr<Channel> MakeLameChannel(absl::string_view why, OrphanablePtr<Channel> MakeLameChannel(absl::string_view why,
absl::Status error) { absl::Status error) {
gpr_log(GPR_ERROR, "%s: %s", std::string(why).c_str(), gpr_log(GPR_ERROR, "%s: %s", std::string(why).c_str(),
@ -199,8 +204,8 @@ OrphanablePtr<Channel> MakeInprocChannel(Server* server,
std::pair<OrphanablePtr<Transport>, OrphanablePtr<Transport>> std::pair<OrphanablePtr<Transport>, OrphanablePtr<Transport>>
MakeInProcessTransportPair() { MakeInProcessTransportPair() {
auto client_transport = MakeOrphanable<InprocClientTransport>(); auto server_transport = MakeOrphanable<InprocServerTransport>();
auto server_transport = client_transport->GetServerTransport(); auto client_transport = server_transport->MakeClientTransport();
return std::make_pair(std::move(client_transport), return std::make_pair(std::move(client_transport),
std::move(server_transport)); std::move(server_transport));
} }

Loading…
Cancel
Save