diff --git a/BUILD b/BUILD index 06475de0881..6534f189793 100644 --- a/BUILD +++ b/BUILD @@ -1531,11 +1531,11 @@ grpc_cc_library( "//src/core:arena_promise", "//src/core:atomic_utils", "//src/core:bitset", + "//src/core:call_factory", "//src/core:call_filters", "//src/core:call_final_info", "//src/core:call_spine", "//src/core:cancel_callback", - "//src/core:channel", "//src/core:channel_args", "//src/core:channel_args_endpoint_config", "//src/core:channel_args_preconditioning", diff --git a/src/core/BUILD b/src/core/BUILD index 2c8e42bb9a9..a4cda5d5097 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -6737,19 +6737,21 @@ grpc_cc_library( ) grpc_cc_library( - name = "channel", + name = "call_factory", srcs = [ - "lib/transport/channel.cc", + "lib/transport/call_factory.cc", ], hdrs = [ - "lib/transport/channel.h", + "lib/transport/call_factory.h", ], deps = [ "arena", "call_size_estimator", + "call_spine", "channel_args", "ref_counted", "resource_quota", + "//:gpr_platform", "//:stats", ], ) diff --git a/src/core/ext/filters/client_channel/channel_connectivity.cc b/src/core/ext/filters/client_channel/channel_connectivity.cc index 26eaac7be1f..87a6965643c 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.cc +++ b/src/core/ext/filters/client_channel/channel_connectivity.cc @@ -49,7 +49,7 @@ namespace grpc_core { namespace { -bool IsLameChannel(GrpcChannel* channel) { +bool IsLameChannel(Channel* channel) { grpc_channel_element* elem = grpc_channel_stack_last_element(channel->channel_stack()); return elem->filter == &LameClientFilter::kFilter; @@ -65,7 +65,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( GRPC_API_TRACE( "grpc_channel_check_connectivity_state(channel=%p, try_to_connect=%d)", 2, (c_channel, try_to_connect)); - grpc_core::GrpcChannel* channel = grpc_core::GrpcChannel::FromC(c_channel); + grpc_core::Channel* channel = grpc_core::Channel::FromC(c_channel); // Forward through to the underlying client channel. grpc_core::ClientChannel* client_channel = grpc_core::ClientChannel::GetFromChannel(channel); @@ -82,7 +82,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( } int grpc_channel_num_external_connectivity_watchers(grpc_channel* c_channel) { - grpc_core::GrpcChannel* channel = grpc_core::GrpcChannel::FromC(c_channel); + grpc_core::Channel* channel = grpc_core::Channel::FromC(c_channel); grpc_core::ClientChannel* client_channel = grpc_core::ClientChannel::GetFromChannel(channel); if (client_channel == nullptr) { @@ -98,7 +98,7 @@ int grpc_channel_num_external_connectivity_watchers(grpc_channel* c_channel) { int grpc_channel_support_connectivity_watcher(grpc_channel* channel) { return grpc_core::ClientChannel::GetFromChannel( - grpc_core::GrpcChannel::FromC(channel)) != nullptr; + grpc_core::Channel::FromC(channel)) != nullptr; } namespace grpc_core { @@ -109,7 +109,7 @@ class StateWatcher : public DualRefCounted { StateWatcher(grpc_channel* c_channel, grpc_completion_queue* cq, void* tag, grpc_connectivity_state last_observed_state, gpr_timespec deadline) - : channel_(GrpcChannel::FromC(c_channel)->RefAsSubclass()), + : channel_(Channel::FromC(c_channel)->RefAsSubclass()), cq_(cq), tag_(tag), state_(last_observed_state) { @@ -225,7 +225,7 @@ class StateWatcher : public DualRefCounted { self->WeakUnref(); } - RefCountedPtr channel_; + RefCountedPtr channel_; grpc_completion_queue* cq_; void* tag_; diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index a229109b4e1..29b5d7ec4d1 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1185,7 +1185,7 @@ class ClientChannel::ClientChannelControlHelper // ClientChannel implementation // -ClientChannel* ClientChannel::GetFromChannel(GrpcChannel* channel) { +ClientChannel* ClientChannel::GetFromChannel(Channel* channel) { grpc_channel_element* elem = grpc_channel_stack_last_element(channel->channel_stack()); if (elem->filter != &kFilterVtableWithPromises && diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 36fb68aae84..f54c5db4669 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -117,7 +117,7 @@ class ClientChannel { // Returns the ClientChannel object from channel, or null if channel // is not a client channel. - static ClientChannel* GetFromChannel(GrpcChannel* channel); + static ClientChannel* GetFromChannel(Channel* channel); static ArenaPromise MakeCallPromise( grpc_channel_element* elem, CallArgs call_args, diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index ed4df777397..1ae17ff0620 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1593,7 +1593,7 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) { // goes into state TRANSIENT_FAILURE before the timer fires, we go into // fallback mode even if the fallback timeout has not elapsed. ClientChannel* client_channel = - ClientChannel::GetFromChannel(GrpcChannel::FromC(lb_channel_)); + ClientChannel::GetFromChannel(Channel::FromC(lb_channel_)); GPR_ASSERT(client_channel != nullptr); // Ref held by callback. watcher_ = @@ -1660,7 +1660,7 @@ absl::Status GrpcLb::UpdateBalancerChannelLocked() { void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() { ClientChannel* client_channel = - ClientChannel::GetFromChannel(GrpcChannel::FromC(lb_channel_)); + ClientChannel::GetFromChannel(Channel::FromC(lb_channel_)); GPR_ASSERT(client_channel != nullptr); client_channel->RemoveConnectivityWatcher(watcher_); } diff --git a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc index 791cbc8eb9f..382c579488e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc +++ b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc @@ -1558,7 +1558,7 @@ RlsLb::RlsChannel::RlsChannel(RefCountedPtr lb_policy) } // Start connectivity watch. ClientChannel* client_channel = - ClientChannel::GetFromChannel(GrpcChannel::FromC(channel_)); + ClientChannel::GetFromChannel(Channel::FromC(channel_)); GPR_ASSERT(client_channel != nullptr); watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher")); client_channel->AddConnectivityWatcher( @@ -1584,7 +1584,7 @@ void RlsLb::RlsChannel::Orphan() { // Stop connectivity watch. if (watcher_ != nullptr) { ClientChannel* client_channel = - ClientChannel::GetFromChannel(GrpcChannel::FromC(channel_)); + ClientChannel::GetFromChannel(Channel::FromC(channel_)); GPR_ASSERT(client_channel != nullptr); client_channel->RemoveConnectivityWatcher(watcher_); watcher_ = nullptr; diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc index 6566a4a2c57..37e490458d1 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -307,8 +307,8 @@ class Chttp2SecureClientChannelFactory : public ClientChannelFactory { } }; -absl::StatusOr> CreateChannel( - const char* target, const ChannelArgs& args) { +absl::StatusOr> CreateChannel(const char* target, + const ChannelArgs& args) { if (target == nullptr) { gpr_log(GPR_ERROR, "cannot create channel with NULL target name"); return absl::InvalidArgumentError("channel target is NULL"); @@ -317,9 +317,9 @@ absl::StatusOr> CreateChannel( std::string canonical_target = CoreConfiguration::Get().resolver_registry().AddDefaultPrefixIfNeeded( target); - return GrpcChannel::Create(target, - args.Set(GRPC_ARG_SERVER_URI, canonical_target), - GRPC_CLIENT_CHANNEL, nullptr); + return Channel::Create(target, + args.Set(GRPC_ARG_SERVER_URI, canonical_target), + GRPC_CLIENT_CHANNEL, nullptr); } } // namespace @@ -410,7 +410,7 @@ grpc_channel* grpc_channel_create_from_fd(const char* target, int fd, grpc_core::Transport* transport = grpc_create_chttp2_transport(final_args, client, true); GPR_ASSERT(transport); - auto channel = grpc_core::GrpcChannel::Create( + auto channel = grpc_core::Channel::Create( target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); if (channel.ok()) { grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index b678c097c81..936068b0aa3 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -155,8 +155,8 @@ bool UsePromiseBasedTransport() { return true; } -RefCountedPtr MakeLameChannel(absl::string_view why, - absl::Status error) { +RefCountedPtr MakeLameChannel(absl::string_view why, + absl::Status error) { gpr_log(GPR_ERROR, "%s: %s", std::string(why).c_str(), std::string(error.message()).c_str()); intptr_t integer; @@ -164,13 +164,12 @@ RefCountedPtr MakeLameChannel(absl::string_view why, if (grpc_error_get_int(error, StatusIntProperty::kRpcStatus, &integer)) { status = static_cast(integer); } - return RefCountedPtr( - GrpcChannel::FromC(grpc_lame_client_channel_create( - nullptr, status, std::string(why).c_str()))); + return RefCountedPtr(Channel::FromC(grpc_lame_client_channel_create( + nullptr, status, std::string(why).c_str()))); } -RefCountedPtr MakeInprocChannel(Server* server, - ChannelArgs client_channel_args) { +RefCountedPtr MakeInprocChannel(Server* server, + ChannelArgs client_channel_args) { auto transports = MakeInProcessTransportPair(); auto client_transport = std::move(transports.first); auto server_transport = std::move(transports.second); @@ -184,7 +183,7 @@ RefCountedPtr MakeInprocChannel(Server* server, return MakeLameChannel("Failed to create server channel", std::move(error)); } std::ignore = server_transport.release(); // consumed by SetupTransport - auto channel = GrpcChannel::Create( + auto channel = Channel::Create( "inproc", client_channel_args.Set(GRPC_ARG_DEFAULT_AUTHORITY, "inproc.authority"), GRPC_CLIENT_DIRECT_CHANNEL, client_transport.release()); diff --git a/src/core/ext/transport/inproc/legacy_inproc_transport.cc b/src/core/ext/transport/inproc/legacy_inproc_transport.cc index a62536b426b..66519608870 100644 --- a/src/core/ext/transport/inproc/legacy_inproc_transport.cc +++ b/src/core/ext/transport/inproc/legacy_inproc_transport.cc @@ -1264,7 +1264,7 @@ grpc_channel* grpc_legacy_inproc_channel_create(grpc_server* server, server_transport, nullptr, server_args, nullptr); grpc_channel* channel = nullptr; if (error.ok()) { - auto new_channel = grpc_core::GrpcChannel::Create( + auto new_channel = grpc_core::Channel::Create( "inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport); if (!new_channel.ok()) { GPR_ASSERT(!channel); diff --git a/src/core/ext/xds/xds_transport_grpc.cc b/src/core/ext/xds/xds_transport_grpc.cc index 74d577ee97b..1deff78f2a3 100644 --- a/src/core/ext/xds/xds_transport_grpc.cc +++ b/src/core/ext/xds/xds_transport_grpc.cc @@ -282,7 +282,7 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcXdsTransport( *status = absl::UnavailableError("xds client has a lame channel"); } else { ClientChannel* client_channel = - ClientChannel::GetFromChannel(GrpcChannel::FromC(channel_)); + ClientChannel::GetFromChannel(Channel::FromC(channel_)); GPR_ASSERT(client_channel != nullptr); watcher_ = new StateWatcher(std::move(on_connectivity_failure)); client_channel->AddConnectivityWatcher( @@ -298,7 +298,7 @@ GrpcXdsTransportFactory::GrpcXdsTransport::~GrpcXdsTransport() { void GrpcXdsTransportFactory::GrpcXdsTransport::Orphan() { if (!IsLameChannel(channel_)) { ClientChannel* client_channel = - ClientChannel::GetFromChannel(GrpcChannel::FromC(channel_)); + ClientChannel::GetFromChannel(Channel::FromC(channel_)); GPR_ASSERT(client_channel != nullptr); client_channel->RemoveConnectivityWatcher(watcher_); } diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 23b8e262a1e..c216493d2bf 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -184,7 +184,7 @@ class Call : public CppImplOf { }; Call(Arena* arena, bool is_client, Timestamp send_deadline, - RefCountedPtr channel) + RefCountedPtr channel) : channel_(std::move(channel)), arena_(arena), send_deadline_(send_deadline), @@ -198,7 +198,7 @@ class Call : public CppImplOf { ParentCall* GetOrCreateParentCall(); ParentCall* parent_call(); - GrpcChannel* channel() const { + Channel* channel() const { GPR_DEBUG_ASSERT(channel_ != nullptr); return channel_.get(); } @@ -248,7 +248,7 @@ class Call : public CppImplOf { gpr_cycle_counter start_time() const { return start_time_; } private: - RefCountedPtr channel_; + RefCountedPtr channel_; Arena* const arena_; std::atomic parent_call_{nullptr}; ChildCall* child_ = nullptr; @@ -689,7 +689,7 @@ class FilterStackCall final : public Call { FilterStackCall(Arena* arena, const grpc_call_create_args& args) : Call(arena, args.server_transport_data == nullptr, args.send_deadline, - args.channel->RefAsSubclass()), + args.channel->RefAsSubclass()), cq_(args.cq), stream_op_payload_(context_) {} @@ -802,7 +802,7 @@ class FilterStackCall final : public Call { grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args, grpc_call** out_call) { - GrpcChannel* channel = args->channel.get(); + Channel* channel = args->channel.get(); auto add_init_error = [](grpc_error_handle* composite, grpc_error_handle new_err) { @@ -1994,7 +1994,7 @@ class BasicPromiseBasedCall : public Call, BasicPromiseBasedCall(Arena* arena, uint32_t initial_external_refs, const grpc_call_create_args& args) : Call(arena, args.server_transport_data == nullptr, args.send_deadline, - args.channel->RefAsSubclass()), + args.channel->RefAsSubclass()), Party(arena, initial_external_refs != 0 ? 1 : 0), external_refs_(initial_external_refs), cq_(args.cq) { @@ -3704,7 +3704,7 @@ class ServerCallSpine final : public CallSpineInterface, public ServerCallContext, public BasicPromiseBasedCall { public: - ServerCallSpine(Server* server, GrpcChannel* channel, Arena* arena); + ServerCallSpine(Server* server, Channel* channel, Arena* arena); // CallSpineInterface Pipe& client_initial_metadata() override { @@ -3788,12 +3788,11 @@ class ServerCallSpine final : public CallSpineInterface, ClientMetadataHandle client_initial_metadata_stored_; }; -ServerCallSpine::ServerCallSpine(Server* server, GrpcChannel* channel, - Arena* arena) +ServerCallSpine::ServerCallSpine(Server* server, Channel* channel, Arena* arena) : BasicPromiseBasedCall( arena, 1, [channel, server]() -> grpc_call_create_args { grpc_call_create_args args; - args.channel = channel->RefAsSubclass(); + args.channel = channel->RefAsSubclass(); args.server = server; args.parent = nullptr; args.propagation_mask = 0; @@ -4055,7 +4054,7 @@ void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops, } RefCountedPtr MakeServerCall(Server* server, - GrpcChannel* channel, + Channel* channel, Arena* arena) { return RefCountedPtr( arena->New(server, channel, arena)); diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index 72476606dae..520cf13505c 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -57,7 +57,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call* call, int success, void* user_data); typedef struct grpc_call_create_args { - grpc_core::RefCountedPtr channel; + grpc_core::RefCountedPtr channel; grpc_core::Server* server; grpc_call* parent; @@ -160,7 +160,7 @@ template <> struct ContextType {}; RefCountedPtr MakeServerCall(Server* server, - GrpcChannel* channel, + Channel* channel, Arena* arena); } // namespace grpc_core diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index 97242716c1b..545ae8a9d70 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -58,23 +58,36 @@ #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/surface/channel_stack_type.h" #include "src/core/lib/surface/init_internally.h" +#include "src/core/lib/transport/call_factory.h" #include "src/core/lib/transport/transport.h" // IWYU pragma: no_include namespace grpc_core { -GrpcChannel::GrpcChannel(bool is_client, bool is_promising, std::string target, - const ChannelArgs& channel_args, - grpc_compression_options compression_options, - RefCountedPtr channel_stack) - : Channel(channel_args), - is_client_(is_client), +namespace { + +class NotReallyACallFactory final : public CallFactory { + public: + using CallFactory::CallFactory; + CallInitiator CreateCall(ClientMetadataHandle md, Arena* arena) { + Crash("NotReallyACallFactory::CreateCall should never be called"); + } +}; + +} // namespace + +Channel::Channel(bool is_client, bool is_promising, std::string target, + const ChannelArgs& channel_args, + grpc_compression_options compression_options, + RefCountedPtr channel_stack) + : is_client_(is_client), is_promising_(is_promising), compression_options_(compression_options), channelz_node_(channel_args.GetObjectRef()), target_(std::move(target)), - channel_stack_(std::move(channel_stack)) { + channel_stack_(std::move(channel_stack)), + call_factory_(MakeRefCounted(channel_args)) { // We need to make sure that grpc_shutdown() does not shut things down // until after the channel is destroyed. However, the channel may not // actually be destroyed by the time grpc_channel_destroy() returns, @@ -103,7 +116,7 @@ GrpcChannel::GrpcChannel(bool is_client, bool is_promising, std::string target, }; } -absl::StatusOr> GrpcChannel::CreateWithBuilder( +absl::StatusOr> Channel::CreateWithBuilder( ChannelStackBuilder* builder) { auto channel_args = builder->channel_args(); if (builder->channel_stack_type() == GRPC_SERVER_CHANNEL) { @@ -147,7 +160,7 @@ absl::StatusOr> GrpcChannel::CreateWithBuilder( *enabled_algorithms_bitset | 1 /* always support no compression */; } - return RefCountedPtr(new GrpcChannel( + return RefCountedPtr(new Channel( grpc_channel_stack_type_is_client(builder->channel_stack_type()), builder->IsPromising(), std::string(builder->target()), channel_args, compression_options, std::move(*r))); @@ -169,7 +182,7 @@ const grpc_arg_pointer_vtable channelz_node_arg_vtable = { channelz_node_copy, channelz_node_destroy, channelz_node_cmp}; } // namespace -absl::StatusOr> GrpcChannel::Create( +absl::StatusOr> Channel::Create( const char* target, ChannelArgs args, grpc_channel_stack_type channel_stack_type, Transport* optional_transport) { if (!args.GetString(GRPC_ARG_DEFAULT_AUTHORITY).has_value()) { @@ -230,7 +243,7 @@ absl::StatusOr> GrpcChannel::Create( char* grpc_channel_get_target(grpc_channel* channel) { GRPC_API_TRACE("grpc_channel_get_target(channel=%p)", 1, (channel)); - auto target = grpc_core::GrpcChannel::FromC(channel)->target(); + auto target = grpc_core::Channel::FromC(channel)->target(); char* buffer = static_cast(gpr_zalloc(target.size() + 1)); memcpy(buffer, target.data(), target.size()); return buffer; @@ -241,7 +254,7 @@ void grpc_channel_get_info(grpc_channel* channel, grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; grpc_channel_element* elem = grpc_channel_stack_element( - grpc_core::GrpcChannel::FromC(channel)->channel_stack(), 0); + grpc_core::Channel::FromC(channel)->channel_stack(), 0); elem->filter->get_channel_info(elem, channel_info); } @@ -253,7 +266,7 @@ void grpc_channel_reset_connect_backoff(grpc_channel* channel) { grpc_transport_op* op = grpc_make_transport_op(nullptr); op->reset_connect_backoff = true; grpc_channel_element* elem = grpc_channel_stack_element( - grpc_core::GrpcChannel::FromC(channel)->channel_stack(), 0); + grpc_core::Channel::FromC(channel)->channel_stack(), 0); elem->filter->start_transport_op(elem, op); } @@ -262,8 +275,8 @@ static grpc_call* grpc_channel_create_call_internal( grpc_completion_queue* cq, grpc_pollset_set* pollset_set_alternative, grpc_core::Slice path, absl::optional authority, grpc_core::Timestamp deadline, bool registered_method) { - auto channel = grpc_core::GrpcChannel::FromC(c_channel) - ->RefAsSubclass(); + auto channel = + grpc_core::Channel::FromC(c_channel)->RefAsSubclass(); GPR_ASSERT(channel->is_client()); GPR_ASSERT(!(cq != nullptr && pollset_set_alternative != nullptr)); @@ -348,13 +361,12 @@ void* grpc_channel_register_call(grpc_channel* channel, const char* method, GPR_ASSERT(!reserved); grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; - return grpc_core::GrpcChannel::FromC(channel)->RegisterCall(method, host); + return grpc_core::Channel::FromC(channel)->RegisterCall(method, host); } namespace grpc_core { -RegisteredCall* GrpcChannel::RegisterCall(const char* method, - const char* host) { +RegisteredCall* Channel::RegisterCall(const char* method, const char* host) { MutexLock lock(®istration_table_.mu); auto key = std::make_pair(std::string(host != nullptr ? host : ""), std::string(method != nullptr ? method : "")); @@ -402,8 +414,8 @@ grpc_call* grpc_channel_create_registered_call( } void grpc_channel_destroy_internal(grpc_channel* c_channel) { - grpc_core::RefCountedPtr channel( - grpc_core::GrpcChannel::FromC(c_channel)); + grpc_core::RefCountedPtr channel( + grpc_core::Channel::FromC(c_channel)); grpc_transport_op* op = grpc_make_transport_op(nullptr); grpc_channel_element* elem; GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (c_channel)); diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 6f0e3edb4a6..4796331893e 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -54,7 +54,7 @@ #include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/surface/channel_stack_type.h" -#include "src/core/lib/transport/channel.h" +#include "src/core/lib/transport/call_factory.h" #include "src/core/lib/transport/transport.h" /// The same as grpc_channel_destroy, but doesn't create an ExecCtx, and so @@ -102,15 +102,15 @@ struct CallRegistrationTable { ABSL_GUARDED_BY(mu); }; -class GrpcChannel : public Channel, - public CppImplOf { +class Channel : public RefCounted, + public CppImplOf { public: - static absl::StatusOr> Create( + static absl::StatusOr> Create( const char* target, ChannelArgs args, grpc_channel_stack_type channel_stack_type, Transport* optional_transport); - static absl::StatusOr> CreateWithBuilder( + static absl::StatusOr> CreateWithBuilder( ChannelStackBuilder* builder); grpc_channel_stack* channel_stack() const { return channel_stack_.get(); } @@ -121,6 +121,9 @@ class GrpcChannel : public Channel, channelz::ChannelNode* channelz_node() const { return channelz_node_.get(); } + Arena* CreateArena() { return call_factory_->CreateArena(); } + void DestroyArena(Arena* arena) { return call_factory_->DestroyArena(arena); } + absl::string_view target() const { return target_; } bool is_client() const { return is_client_; } bool is_promising() const { return is_promising_; } @@ -136,10 +139,10 @@ class GrpcChannel : public Channel, } private: - GrpcChannel(bool is_client, bool is_promising, std::string target, - const ChannelArgs& channel_args, - grpc_compression_options compression_options, - RefCountedPtr channel_stack); + Channel(bool is_client, bool is_promising, std::string target, + const ChannelArgs& channel_args, + grpc_compression_options compression_options, + RefCountedPtr channel_stack); const bool is_client_; const bool is_promising_; @@ -148,32 +151,33 @@ class GrpcChannel : public Channel, RefCountedPtr channelz_node_; std::string target_; const RefCountedPtr channel_stack_; + const RefCountedPtr call_factory_; }; } // namespace grpc_core inline grpc_compression_options grpc_channel_compression_options( const grpc_channel* channel) { - return grpc_core::GrpcChannel::FromC(channel)->compression_options(); + return grpc_core::Channel::FromC(channel)->compression_options(); } inline grpc_channel_stack* grpc_channel_get_channel_stack( grpc_channel* channel) { - return grpc_core::GrpcChannel::FromC(channel)->channel_stack(); + return grpc_core::Channel::FromC(channel)->channel_stack(); } inline grpc_core::channelz::ChannelNode* grpc_channel_get_channelz_node( grpc_channel* channel) { - return grpc_core::GrpcChannel::FromC(channel)->channelz_node(); + return grpc_core::Channel::FromC(channel)->channelz_node(); } inline void grpc_channel_internal_ref(grpc_channel* channel, const char* reason) { - grpc_core::GrpcChannel::FromC(channel)->Ref(DEBUG_LOCATION, reason).release(); + grpc_core::Channel::FromC(channel)->Ref(DEBUG_LOCATION, reason).release(); } inline void grpc_channel_internal_unref(grpc_channel* channel, const char* reason) { - grpc_core::GrpcChannel::FromC(channel)->Unref(DEBUG_LOCATION, reason); + grpc_core::Channel::FromC(channel)->Unref(DEBUG_LOCATION, reason); } // Return the channel's compression options. diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index 270f3afebf0..723ec6b703c 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -157,8 +157,8 @@ grpc_channel* grpc_lame_client_channel_create(const char* target, new absl::Status(static_cast(error_code), error_message), &grpc_core::kLameFilterErrorArgVtable)); - auto channel = grpc_core::GrpcChannel::Create( - target, std::move(args), GRPC_CLIENT_LAME_CHANNEL, nullptr); + auto channel = grpc_core::Channel::Create(target, std::move(args), + GRPC_CLIENT_LAME_CHANNEL, nullptr); GPR_ASSERT(channel.ok()); return channel->release()->c_ptr(); } diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 548f7765233..1c75c0afa91 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -734,14 +734,14 @@ class ChannelBroadcaster { // when the actual setup and shutdown broadcast take place. // Copies over the channels from the locked server. - void FillChannelsLocked(std::vector> channels) { + void FillChannelsLocked(std::vector> channels) { GPR_DEBUG_ASSERT(channels_.empty()); channels_ = std::move(channels); } // Broadcasts a shutdown on each channel. void BroadcastShutdown(bool send_goaway, grpc_error_handle force_disconnect) { - for (const RefCountedPtr& channel : channels_) { + for (const RefCountedPtr& channel : channels_) { SendShutdown(channel->c_ptr(), send_goaway, force_disconnect); } channels_.clear(); // just for safety against double broadcast @@ -778,7 +778,7 @@ class ChannelBroadcaster { elem->filter->start_transport_op(elem, op); } - std::vector> channels_; + std::vector> channels_; }; } // namespace @@ -911,8 +911,8 @@ grpc_error_handle Server::SetupTransport( const ChannelArgs& args, const RefCountedPtr& socket_node) { // Create channel. - absl::StatusOr> channel = - GrpcChannel::Create(nullptr, args, GRPC_SERVER_CHANNEL, transport); + absl::StatusOr> channel = + Channel::Create(nullptr, args, GRPC_SERVER_CHANNEL, transport); if (!channel.ok()) { return absl_status_to_grpc_error(channel.status()); } @@ -1053,11 +1053,11 @@ void Server::KillPendingWorkLocked(grpc_error_handle error) { } } -std::vector> Server::GetChannelsLocked() const { - std::vector> channels; +std::vector> Server::GetChannelsLocked() const { + std::vector> channels; channels.reserve(channels_.size()); for (const ChannelData* chand : channels_) { - channels.push_back(chand->channel()->RefAsSubclass()); + channels.push_back(chand->channel()->RefAsSubclass()); } return channels; } @@ -1309,7 +1309,7 @@ absl::StatusOr Server::ChannelData::CreateCall( } void Server::ChannelData::InitTransport(RefCountedPtr server, - RefCountedPtr channel, + RefCountedPtr channel, size_t cq_idx, Transport* transport, intptr_t channelz_socket_uuid) { server_ = std::move(server); diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index 26564eaa0e1..4bb6fce3fae 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -224,7 +224,7 @@ class Server : public InternallyRefCounted, ~ChannelData(); void InitTransport(RefCountedPtr server, - RefCountedPtr channel, size_t cq_idx, + RefCountedPtr channel, size_t cq_idx, Transport* transport, intptr_t channelz_socket_uuid); RefCountedPtr server() const { return server_; } @@ -257,7 +257,7 @@ class Server : public InternallyRefCounted, static void FinishDestroy(void* arg, grpc_error_handle error); RefCountedPtr server_; - RefCountedPtr channel_; + RefCountedPtr channel_; // The index into Server::cqs_ of the CQ used as a starting point for // where to publish new incoming calls. size_t cq_idx_; @@ -406,7 +406,7 @@ class Server : public InternallyRefCounted, size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag, grpc_byte_buffer** optional_payload, RegisteredMethod* rm); - std::vector> GetChannelsLocked() const; + std::vector> GetChannelsLocked() const; // Take a shutdown ref for a request (increment by 2) and return if shutdown // has not been called. diff --git a/src/core/lib/transport/channel.cc b/src/core/lib/transport/call_factory.cc similarity index 87% rename from src/core/lib/transport/channel.cc rename to src/core/lib/transport/call_factory.cc index 57d4adf8dd9..bcc01cc87ff 100644 --- a/src/core/lib/transport/channel.cc +++ b/src/core/lib/transport/call_factory.cc @@ -14,26 +14,26 @@ #include -#include "src/core/lib/transport/channel.h" +#include "src/core/lib/transport/call_factory.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/resource_quota/resource_quota.h" namespace grpc_core { -Channel::Channel(const ChannelArgs& args) +CallFactory::CallFactory(const ChannelArgs& args) : call_size_estimator_(1024), allocator_(args.GetObject() ->memory_quota() ->CreateMemoryOwner()) {} -Arena* Channel::CreateArena() { +Arena* CallFactory::CreateArena() { const size_t initial_size = call_size_estimator_.CallSizeEstimate(); global_stats().IncrementCallInitialSize(initial_size); return Arena::Create(initial_size, &allocator_); } -void Channel::DestroyArena(Arena* arena) { +void CallFactory::DestroyArena(Arena* arena) { call_size_estimator_.UpdateCallSizeEstimate(arena->TotalUsedBytes()); arena->Destroy(); } diff --git a/src/core/lib/transport/channel.h b/src/core/lib/transport/call_factory.h similarity index 83% rename from src/core/lib/transport/channel.h rename to src/core/lib/transport/call_factory.h index 52012538324..c5dd0322774 100644 --- a/src/core/lib/transport/channel.h +++ b/src/core/lib/transport/call_factory.h @@ -21,16 +21,18 @@ #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/transport/call_size_estimator.h" +#include "src/core/lib/transport/call_spine.h" namespace grpc_core { -class Channel : public RefCounted { +class CallFactory : public RefCounted { public: + explicit CallFactory(const ChannelArgs& args); + Arena* CreateArena(); void DestroyArena(Arena* arena); - protected: - explicit Channel(const ChannelArgs& args); + virtual CallInitiator CreateCall(ClientMetadataHandle md, Arena* arena) = 0; private: CallSizeEstimator call_size_estimator_;