pull/35766/head
Craig Tiller 1 year ago
parent 84ee58da46
commit 8022fd082b
  1. 2
      BUILD
  2. 8
      src/core/BUILD
  3. 12
      src/core/ext/filters/client_channel/channel_connectivity.cc
  4. 2
      src/core/ext/filters/client_channel/client_channel.cc
  5. 2
      src/core/ext/filters/client_channel/client_channel.h
  6. 4
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  7. 4
      src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
  8. 12
      src/core/ext/transport/chttp2/client/chttp2_connector.cc
  9. 15
      src/core/ext/transport/inproc/inproc_transport.cc
  10. 2
      src/core/ext/transport/inproc/legacy_inproc_transport.cc
  11. 4
      src/core/ext/xds/xds_transport_grpc.cc
  12. 21
      src/core/lib/surface/call.cc
  13. 4
      src/core/lib/surface/call.h
  14. 52
      src/core/lib/surface/channel.cc
  15. 32
      src/core/lib/surface/channel.h
  16. 4
      src/core/lib/surface/lame_client.cc
  17. 18
      src/core/lib/surface/server.cc
  18. 6
      src/core/lib/surface/server.h
  19. 8
      src/core/lib/transport/call_factory.cc
  20. 8
      src/core/lib/transport/call_factory.h

@ -1531,11 +1531,11 @@ grpc_cc_library(
"//src/core:arena_promise",
"//src/core:atomic_utils",
"//src/core:bitset",
"//src/core:call_factory",
"//src/core:call_filters",
"//src/core:call_final_info",
"//src/core:call_spine",
"//src/core:cancel_callback",
"//src/core:channel",
"//src/core:channel_args",
"//src/core:channel_args_endpoint_config",
"//src/core:channel_args_preconditioning",

@ -6737,19 +6737,21 @@ grpc_cc_library(
)
grpc_cc_library(
name = "channel",
name = "call_factory",
srcs = [
"lib/transport/channel.cc",
"lib/transport/call_factory.cc",
],
hdrs = [
"lib/transport/channel.h",
"lib/transport/call_factory.h",
],
deps = [
"arena",
"call_size_estimator",
"call_spine",
"channel_args",
"ref_counted",
"resource_quota",
"//:gpr_platform",
"//:stats",
],
)

@ -49,7 +49,7 @@
namespace grpc_core {
namespace {
bool IsLameChannel(GrpcChannel* channel) {
bool IsLameChannel(Channel* channel) {
grpc_channel_element* elem =
grpc_channel_stack_last_element(channel->channel_stack());
return elem->filter == &LameClientFilter::kFilter;
@ -65,7 +65,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
GRPC_API_TRACE(
"grpc_channel_check_connectivity_state(channel=%p, try_to_connect=%d)", 2,
(c_channel, try_to_connect));
grpc_core::GrpcChannel* channel = grpc_core::GrpcChannel::FromC(c_channel);
grpc_core::Channel* channel = grpc_core::Channel::FromC(c_channel);
// Forward through to the underlying client channel.
grpc_core::ClientChannel* client_channel =
grpc_core::ClientChannel::GetFromChannel(channel);
@ -82,7 +82,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
}
int grpc_channel_num_external_connectivity_watchers(grpc_channel* c_channel) {
grpc_core::GrpcChannel* channel = grpc_core::GrpcChannel::FromC(c_channel);
grpc_core::Channel* channel = grpc_core::Channel::FromC(c_channel);
grpc_core::ClientChannel* client_channel =
grpc_core::ClientChannel::GetFromChannel(channel);
if (client_channel == nullptr) {
@ -98,7 +98,7 @@ int grpc_channel_num_external_connectivity_watchers(grpc_channel* c_channel) {
int grpc_channel_support_connectivity_watcher(grpc_channel* channel) {
return grpc_core::ClientChannel::GetFromChannel(
grpc_core::GrpcChannel::FromC(channel)) != nullptr;
grpc_core::Channel::FromC(channel)) != nullptr;
}
namespace grpc_core {
@ -109,7 +109,7 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
StateWatcher(grpc_channel* c_channel, grpc_completion_queue* cq, void* tag,
grpc_connectivity_state last_observed_state,
gpr_timespec deadline)
: channel_(GrpcChannel::FromC(c_channel)->RefAsSubclass<GrpcChannel>()),
: channel_(Channel::FromC(c_channel)->RefAsSubclass<Channel>()),
cq_(cq),
tag_(tag),
state_(last_observed_state) {
@ -225,7 +225,7 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
self->WeakUnref();
}
RefCountedPtr<GrpcChannel> channel_;
RefCountedPtr<Channel> channel_;
grpc_completion_queue* cq_;
void* tag_;

@ -1185,7 +1185,7 @@ class ClientChannel::ClientChannelControlHelper
// ClientChannel implementation
//
ClientChannel* ClientChannel::GetFromChannel(GrpcChannel* channel) {
ClientChannel* ClientChannel::GetFromChannel(Channel* channel) {
grpc_channel_element* elem =
grpc_channel_stack_last_element(channel->channel_stack());
if (elem->filter != &kFilterVtableWithPromises &&

@ -117,7 +117,7 @@ class ClientChannel {
// Returns the ClientChannel object from channel, or null if channel
// is not a client channel.
static ClientChannel* GetFromChannel(GrpcChannel* channel);
static ClientChannel* GetFromChannel(Channel* channel);
static ArenaPromise<ServerMetadataHandle> MakeCallPromise(
grpc_channel_element* elem, CallArgs call_args,

@ -1593,7 +1593,7 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
// goes into state TRANSIENT_FAILURE before the timer fires, we go into
// fallback mode even if the fallback timeout has not elapsed.
ClientChannel* client_channel =
ClientChannel::GetFromChannel(GrpcChannel::FromC(lb_channel_));
ClientChannel::GetFromChannel(Channel::FromC(lb_channel_));
GPR_ASSERT(client_channel != nullptr);
// Ref held by callback.
watcher_ =
@ -1660,7 +1660,7 @@ absl::Status GrpcLb::UpdateBalancerChannelLocked() {
void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
ClientChannel* client_channel =
ClientChannel::GetFromChannel(GrpcChannel::FromC(lb_channel_));
ClientChannel::GetFromChannel(Channel::FromC(lb_channel_));
GPR_ASSERT(client_channel != nullptr);
client_channel->RemoveConnectivityWatcher(watcher_);
}

@ -1558,7 +1558,7 @@ RlsLb::RlsChannel::RlsChannel(RefCountedPtr<RlsLb> lb_policy)
}
// Start connectivity watch.
ClientChannel* client_channel =
ClientChannel::GetFromChannel(GrpcChannel::FromC(channel_));
ClientChannel::GetFromChannel(Channel::FromC(channel_));
GPR_ASSERT(client_channel != nullptr);
watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher"));
client_channel->AddConnectivityWatcher(
@ -1584,7 +1584,7 @@ void RlsLb::RlsChannel::Orphan() {
// Stop connectivity watch.
if (watcher_ != nullptr) {
ClientChannel* client_channel =
ClientChannel::GetFromChannel(GrpcChannel::FromC(channel_));
ClientChannel::GetFromChannel(Channel::FromC(channel_));
GPR_ASSERT(client_channel != nullptr);
client_channel->RemoveConnectivityWatcher(watcher_);
watcher_ = nullptr;

@ -307,8 +307,8 @@ class Chttp2SecureClientChannelFactory : public ClientChannelFactory {
}
};
absl::StatusOr<RefCountedPtr<GrpcChannel>> CreateChannel(
const char* target, const ChannelArgs& args) {
absl::StatusOr<RefCountedPtr<Channel>> CreateChannel(const char* target,
const ChannelArgs& args) {
if (target == nullptr) {
gpr_log(GPR_ERROR, "cannot create channel with NULL target name");
return absl::InvalidArgumentError("channel target is NULL");
@ -317,9 +317,9 @@ absl::StatusOr<RefCountedPtr<GrpcChannel>> CreateChannel(
std::string canonical_target =
CoreConfiguration::Get().resolver_registry().AddDefaultPrefixIfNeeded(
target);
return GrpcChannel::Create(target,
args.Set(GRPC_ARG_SERVER_URI, canonical_target),
GRPC_CLIENT_CHANNEL, nullptr);
return Channel::Create(target,
args.Set(GRPC_ARG_SERVER_URI, canonical_target),
GRPC_CLIENT_CHANNEL, nullptr);
}
} // namespace
@ -410,7 +410,7 @@ grpc_channel* grpc_channel_create_from_fd(const char* target, int fd,
grpc_core::Transport* transport =
grpc_create_chttp2_transport(final_args, client, true);
GPR_ASSERT(transport);
auto channel = grpc_core::GrpcChannel::Create(
auto channel = grpc_core::Channel::Create(
target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
if (channel.ok()) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);

@ -155,8 +155,8 @@ bool UsePromiseBasedTransport() {
return true;
}
RefCountedPtr<GrpcChannel> MakeLameChannel(absl::string_view why,
absl::Status error) {
RefCountedPtr<Channel> MakeLameChannel(absl::string_view why,
absl::Status error) {
gpr_log(GPR_ERROR, "%s: %s", std::string(why).c_str(),
std::string(error.message()).c_str());
intptr_t integer;
@ -164,13 +164,12 @@ RefCountedPtr<GrpcChannel> MakeLameChannel(absl::string_view why,
if (grpc_error_get_int(error, StatusIntProperty::kRpcStatus, &integer)) {
status = static_cast<grpc_status_code>(integer);
}
return RefCountedPtr<GrpcChannel>(
GrpcChannel::FromC(grpc_lame_client_channel_create(
nullptr, status, std::string(why).c_str())));
return RefCountedPtr<Channel>(Channel::FromC(grpc_lame_client_channel_create(
nullptr, status, std::string(why).c_str())));
}
RefCountedPtr<GrpcChannel> MakeInprocChannel(Server* server,
ChannelArgs client_channel_args) {
RefCountedPtr<Channel> MakeInprocChannel(Server* server,
ChannelArgs client_channel_args) {
auto transports = MakeInProcessTransportPair();
auto client_transport = std::move(transports.first);
auto server_transport = std::move(transports.second);
@ -184,7 +183,7 @@ RefCountedPtr<GrpcChannel> MakeInprocChannel(Server* server,
return MakeLameChannel("Failed to create server channel", std::move(error));
}
std::ignore = server_transport.release(); // consumed by SetupTransport
auto channel = GrpcChannel::Create(
auto channel = Channel::Create(
"inproc",
client_channel_args.Set(GRPC_ARG_DEFAULT_AUTHORITY, "inproc.authority"),
GRPC_CLIENT_DIRECT_CHANNEL, client_transport.release());

@ -1264,7 +1264,7 @@ grpc_channel* grpc_legacy_inproc_channel_create(grpc_server* server,
server_transport, nullptr, server_args, nullptr);
grpc_channel* channel = nullptr;
if (error.ok()) {
auto new_channel = grpc_core::GrpcChannel::Create(
auto new_channel = grpc_core::Channel::Create(
"inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport);
if (!new_channel.ok()) {
GPR_ASSERT(!channel);

@ -282,7 +282,7 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcXdsTransport(
*status = absl::UnavailableError("xds client has a lame channel");
} else {
ClientChannel* client_channel =
ClientChannel::GetFromChannel(GrpcChannel::FromC(channel_));
ClientChannel::GetFromChannel(Channel::FromC(channel_));
GPR_ASSERT(client_channel != nullptr);
watcher_ = new StateWatcher(std::move(on_connectivity_failure));
client_channel->AddConnectivityWatcher(
@ -298,7 +298,7 @@ GrpcXdsTransportFactory::GrpcXdsTransport::~GrpcXdsTransport() {
void GrpcXdsTransportFactory::GrpcXdsTransport::Orphan() {
if (!IsLameChannel(channel_)) {
ClientChannel* client_channel =
ClientChannel::GetFromChannel(GrpcChannel::FromC(channel_));
ClientChannel::GetFromChannel(Channel::FromC(channel_));
GPR_ASSERT(client_channel != nullptr);
client_channel->RemoveConnectivityWatcher(watcher_);
}

@ -184,7 +184,7 @@ class Call : public CppImplOf<Call, grpc_call> {
};
Call(Arena* arena, bool is_client, Timestamp send_deadline,
RefCountedPtr<GrpcChannel> channel)
RefCountedPtr<Channel> channel)
: channel_(std::move(channel)),
arena_(arena),
send_deadline_(send_deadline),
@ -198,7 +198,7 @@ class Call : public CppImplOf<Call, grpc_call> {
ParentCall* GetOrCreateParentCall();
ParentCall* parent_call();
GrpcChannel* channel() const {
Channel* channel() const {
GPR_DEBUG_ASSERT(channel_ != nullptr);
return channel_.get();
}
@ -248,7 +248,7 @@ class Call : public CppImplOf<Call, grpc_call> {
gpr_cycle_counter start_time() const { return start_time_; }
private:
RefCountedPtr<GrpcChannel> channel_;
RefCountedPtr<Channel> channel_;
Arena* const arena_;
std::atomic<ParentCall*> parent_call_{nullptr};
ChildCall* child_ = nullptr;
@ -689,7 +689,7 @@ class FilterStackCall final : public Call {
FilterStackCall(Arena* arena, const grpc_call_create_args& args)
: Call(arena, args.server_transport_data == nullptr, args.send_deadline,
args.channel->RefAsSubclass<GrpcChannel>()),
args.channel->RefAsSubclass<Channel>()),
cq_(args.cq),
stream_op_payload_(context_) {}
@ -802,7 +802,7 @@ class FilterStackCall final : public Call {
grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args,
grpc_call** out_call) {
GrpcChannel* channel = args->channel.get();
Channel* channel = args->channel.get();
auto add_init_error = [](grpc_error_handle* composite,
grpc_error_handle new_err) {
@ -1994,7 +1994,7 @@ class BasicPromiseBasedCall : public Call,
BasicPromiseBasedCall(Arena* arena, uint32_t initial_external_refs,
const grpc_call_create_args& args)
: Call(arena, args.server_transport_data == nullptr, args.send_deadline,
args.channel->RefAsSubclass<GrpcChannel>()),
args.channel->RefAsSubclass<Channel>()),
Party(arena, initial_external_refs != 0 ? 1 : 0),
external_refs_(initial_external_refs),
cq_(args.cq) {
@ -3704,7 +3704,7 @@ class ServerCallSpine final : public CallSpineInterface,
public ServerCallContext,
public BasicPromiseBasedCall {
public:
ServerCallSpine(Server* server, GrpcChannel* channel, Arena* arena);
ServerCallSpine(Server* server, Channel* channel, Arena* arena);
// CallSpineInterface
Pipe<ClientMetadataHandle>& client_initial_metadata() override {
@ -3788,12 +3788,11 @@ class ServerCallSpine final : public CallSpineInterface,
ClientMetadataHandle client_initial_metadata_stored_;
};
ServerCallSpine::ServerCallSpine(Server* server, GrpcChannel* channel,
Arena* arena)
ServerCallSpine::ServerCallSpine(Server* server, Channel* channel, Arena* arena)
: BasicPromiseBasedCall(
arena, 1, [channel, server]() -> grpc_call_create_args {
grpc_call_create_args args;
args.channel = channel->RefAsSubclass<GrpcChannel>();
args.channel = channel->RefAsSubclass<Channel>();
args.server = server;
args.parent = nullptr;
args.propagation_mask = 0;
@ -4055,7 +4054,7 @@ void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops,
}
RefCountedPtr<CallSpineInterface> MakeServerCall(Server* server,
GrpcChannel* channel,
Channel* channel,
Arena* arena) {
return RefCountedPtr<ServerCallSpine>(
arena->New<ServerCallSpine>(server, channel, arena));

@ -57,7 +57,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call* call, int success,
void* user_data);
typedef struct grpc_call_create_args {
grpc_core::RefCountedPtr<grpc_core::GrpcChannel> channel;
grpc_core::RefCountedPtr<grpc_core::Channel> channel;
grpc_core::Server* server;
grpc_call* parent;
@ -160,7 +160,7 @@ template <>
struct ContextType<CallContext> {};
RefCountedPtr<CallSpineInterface> MakeServerCall(Server* server,
GrpcChannel* channel,
Channel* channel,
Arena* arena);
} // namespace grpc_core

@ -58,23 +58,36 @@
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/surface/init_internally.h"
#include "src/core/lib/transport/call_factory.h"
#include "src/core/lib/transport/transport.h"
// IWYU pragma: no_include <type_traits>
namespace grpc_core {
GrpcChannel::GrpcChannel(bool is_client, bool is_promising, std::string target,
const ChannelArgs& channel_args,
grpc_compression_options compression_options,
RefCountedPtr<grpc_channel_stack> channel_stack)
: Channel(channel_args),
is_client_(is_client),
namespace {
class NotReallyACallFactory final : public CallFactory {
public:
using CallFactory::CallFactory;
CallInitiator CreateCall(ClientMetadataHandle md, Arena* arena) {
Crash("NotReallyACallFactory::CreateCall should never be called");
}
};
} // namespace
Channel::Channel(bool is_client, bool is_promising, std::string target,
const ChannelArgs& channel_args,
grpc_compression_options compression_options,
RefCountedPtr<grpc_channel_stack> channel_stack)
: is_client_(is_client),
is_promising_(is_promising),
compression_options_(compression_options),
channelz_node_(channel_args.GetObjectRef<channelz::ChannelNode>()),
target_(std::move(target)),
channel_stack_(std::move(channel_stack)) {
channel_stack_(std::move(channel_stack)),
call_factory_(MakeRefCounted<NotReallyACallFactory>(channel_args)) {
// We need to make sure that grpc_shutdown() does not shut things down
// until after the channel is destroyed. However, the channel may not
// actually be destroyed by the time grpc_channel_destroy() returns,
@ -103,7 +116,7 @@ GrpcChannel::GrpcChannel(bool is_client, bool is_promising, std::string target,
};
}
absl::StatusOr<RefCountedPtr<GrpcChannel>> GrpcChannel::CreateWithBuilder(
absl::StatusOr<RefCountedPtr<Channel>> Channel::CreateWithBuilder(
ChannelStackBuilder* builder) {
auto channel_args = builder->channel_args();
if (builder->channel_stack_type() == GRPC_SERVER_CHANNEL) {
@ -147,7 +160,7 @@ absl::StatusOr<RefCountedPtr<GrpcChannel>> GrpcChannel::CreateWithBuilder(
*enabled_algorithms_bitset | 1 /* always support no compression */;
}
return RefCountedPtr<GrpcChannel>(new GrpcChannel(
return RefCountedPtr<Channel>(new Channel(
grpc_channel_stack_type_is_client(builder->channel_stack_type()),
builder->IsPromising(), std::string(builder->target()), channel_args,
compression_options, std::move(*r)));
@ -169,7 +182,7 @@ const grpc_arg_pointer_vtable channelz_node_arg_vtable = {
channelz_node_copy, channelz_node_destroy, channelz_node_cmp};
} // namespace
absl::StatusOr<RefCountedPtr<GrpcChannel>> GrpcChannel::Create(
absl::StatusOr<RefCountedPtr<Channel>> Channel::Create(
const char* target, ChannelArgs args,
grpc_channel_stack_type channel_stack_type, Transport* optional_transport) {
if (!args.GetString(GRPC_ARG_DEFAULT_AUTHORITY).has_value()) {
@ -230,7 +243,7 @@ absl::StatusOr<RefCountedPtr<GrpcChannel>> GrpcChannel::Create(
char* grpc_channel_get_target(grpc_channel* channel) {
GRPC_API_TRACE("grpc_channel_get_target(channel=%p)", 1, (channel));
auto target = grpc_core::GrpcChannel::FromC(channel)->target();
auto target = grpc_core::Channel::FromC(channel)->target();
char* buffer = static_cast<char*>(gpr_zalloc(target.size() + 1));
memcpy(buffer, target.data(), target.size());
return buffer;
@ -241,7 +254,7 @@ void grpc_channel_get_info(grpc_channel* channel,
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_channel_element* elem = grpc_channel_stack_element(
grpc_core::GrpcChannel::FromC(channel)->channel_stack(), 0);
grpc_core::Channel::FromC(channel)->channel_stack(), 0);
elem->filter->get_channel_info(elem, channel_info);
}
@ -253,7 +266,7 @@ void grpc_channel_reset_connect_backoff(grpc_channel* channel) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->reset_connect_backoff = true;
grpc_channel_element* elem = grpc_channel_stack_element(
grpc_core::GrpcChannel::FromC(channel)->channel_stack(), 0);
grpc_core::Channel::FromC(channel)->channel_stack(), 0);
elem->filter->start_transport_op(elem, op);
}
@ -262,8 +275,8 @@ static grpc_call* grpc_channel_create_call_internal(
grpc_completion_queue* cq, grpc_pollset_set* pollset_set_alternative,
grpc_core::Slice path, absl::optional<grpc_core::Slice> authority,
grpc_core::Timestamp deadline, bool registered_method) {
auto channel = grpc_core::GrpcChannel::FromC(c_channel)
->RefAsSubclass<grpc_core::GrpcChannel>();
auto channel =
grpc_core::Channel::FromC(c_channel)->RefAsSubclass<grpc_core::Channel>();
GPR_ASSERT(channel->is_client());
GPR_ASSERT(!(cq != nullptr && pollset_set_alternative != nullptr));
@ -348,13 +361,12 @@ void* grpc_channel_register_call(grpc_channel* channel, const char* method,
GPR_ASSERT(!reserved);
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
return grpc_core::GrpcChannel::FromC(channel)->RegisterCall(method, host);
return grpc_core::Channel::FromC(channel)->RegisterCall(method, host);
}
namespace grpc_core {
RegisteredCall* GrpcChannel::RegisterCall(const char* method,
const char* host) {
RegisteredCall* Channel::RegisterCall(const char* method, const char* host) {
MutexLock lock(&registration_table_.mu);
auto key = std::make_pair(std::string(host != nullptr ? host : ""),
std::string(method != nullptr ? method : ""));
@ -402,8 +414,8 @@ grpc_call* grpc_channel_create_registered_call(
}
void grpc_channel_destroy_internal(grpc_channel* c_channel) {
grpc_core::RefCountedPtr<grpc_core::GrpcChannel> channel(
grpc_core::GrpcChannel::FromC(c_channel));
grpc_core::RefCountedPtr<grpc_core::Channel> channel(
grpc_core::Channel::FromC(c_channel));
grpc_transport_op* op = grpc_make_transport_op(nullptr);
grpc_channel_element* elem;
GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (c_channel));

@ -54,7 +54,7 @@
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/channel.h"
#include "src/core/lib/transport/call_factory.h"
#include "src/core/lib/transport/transport.h"
/// The same as grpc_channel_destroy, but doesn't create an ExecCtx, and so
@ -102,15 +102,15 @@ struct CallRegistrationTable {
ABSL_GUARDED_BY(mu);
};
class GrpcChannel : public Channel,
public CppImplOf<GrpcChannel, grpc_channel> {
class Channel : public RefCounted<Channel>,
public CppImplOf<Channel, grpc_channel> {
public:
static absl::StatusOr<RefCountedPtr<GrpcChannel>> Create(
static absl::StatusOr<RefCountedPtr<Channel>> Create(
const char* target, ChannelArgs args,
grpc_channel_stack_type channel_stack_type,
Transport* optional_transport);
static absl::StatusOr<RefCountedPtr<GrpcChannel>> CreateWithBuilder(
static absl::StatusOr<RefCountedPtr<Channel>> CreateWithBuilder(
ChannelStackBuilder* builder);
grpc_channel_stack* channel_stack() const { return channel_stack_.get(); }
@ -121,6 +121,9 @@ class GrpcChannel : public Channel,
channelz::ChannelNode* channelz_node() const { return channelz_node_.get(); }
Arena* CreateArena() { return call_factory_->CreateArena(); }
void DestroyArena(Arena* arena) { return call_factory_->DestroyArena(arena); }
absl::string_view target() const { return target_; }
bool is_client() const { return is_client_; }
bool is_promising() const { return is_promising_; }
@ -136,10 +139,10 @@ class GrpcChannel : public Channel,
}
private:
GrpcChannel(bool is_client, bool is_promising, std::string target,
const ChannelArgs& channel_args,
grpc_compression_options compression_options,
RefCountedPtr<grpc_channel_stack> channel_stack);
Channel(bool is_client, bool is_promising, std::string target,
const ChannelArgs& channel_args,
grpc_compression_options compression_options,
RefCountedPtr<grpc_channel_stack> channel_stack);
const bool is_client_;
const bool is_promising_;
@ -148,32 +151,33 @@ class GrpcChannel : public Channel,
RefCountedPtr<channelz::ChannelNode> channelz_node_;
std::string target_;
const RefCountedPtr<grpc_channel_stack> channel_stack_;
const RefCountedPtr<CallFactory> call_factory_;
};
} // namespace grpc_core
inline grpc_compression_options grpc_channel_compression_options(
const grpc_channel* channel) {
return grpc_core::GrpcChannel::FromC(channel)->compression_options();
return grpc_core::Channel::FromC(channel)->compression_options();
}
inline grpc_channel_stack* grpc_channel_get_channel_stack(
grpc_channel* channel) {
return grpc_core::GrpcChannel::FromC(channel)->channel_stack();
return grpc_core::Channel::FromC(channel)->channel_stack();
}
inline grpc_core::channelz::ChannelNode* grpc_channel_get_channelz_node(
grpc_channel* channel) {
return grpc_core::GrpcChannel::FromC(channel)->channelz_node();
return grpc_core::Channel::FromC(channel)->channelz_node();
}
inline void grpc_channel_internal_ref(grpc_channel* channel,
const char* reason) {
grpc_core::GrpcChannel::FromC(channel)->Ref(DEBUG_LOCATION, reason).release();
grpc_core::Channel::FromC(channel)->Ref(DEBUG_LOCATION, reason).release();
}
inline void grpc_channel_internal_unref(grpc_channel* channel,
const char* reason) {
grpc_core::GrpcChannel::FromC(channel)->Unref(DEBUG_LOCATION, reason);
grpc_core::Channel::FromC(channel)->Unref(DEBUG_LOCATION, reason);
}
// Return the channel's compression options.

@ -157,8 +157,8 @@ grpc_channel* grpc_lame_client_channel_create(const char* target,
new absl::Status(static_cast<absl::StatusCode>(error_code),
error_message),
&grpc_core::kLameFilterErrorArgVtable));
auto channel = grpc_core::GrpcChannel::Create(
target, std::move(args), GRPC_CLIENT_LAME_CHANNEL, nullptr);
auto channel = grpc_core::Channel::Create(target, std::move(args),
GRPC_CLIENT_LAME_CHANNEL, nullptr);
GPR_ASSERT(channel.ok());
return channel->release()->c_ptr();
}

@ -734,14 +734,14 @@ class ChannelBroadcaster {
// when the actual setup and shutdown broadcast take place.
// Copies over the channels from the locked server.
void FillChannelsLocked(std::vector<RefCountedPtr<GrpcChannel>> channels) {
void FillChannelsLocked(std::vector<RefCountedPtr<Channel>> channels) {
GPR_DEBUG_ASSERT(channels_.empty());
channels_ = std::move(channels);
}
// Broadcasts a shutdown on each channel.
void BroadcastShutdown(bool send_goaway, grpc_error_handle force_disconnect) {
for (const RefCountedPtr<GrpcChannel>& channel : channels_) {
for (const RefCountedPtr<Channel>& channel : channels_) {
SendShutdown(channel->c_ptr(), send_goaway, force_disconnect);
}
channels_.clear(); // just for safety against double broadcast
@ -778,7 +778,7 @@ class ChannelBroadcaster {
elem->filter->start_transport_op(elem, op);
}
std::vector<RefCountedPtr<GrpcChannel>> channels_;
std::vector<RefCountedPtr<Channel>> channels_;
};
} // namespace
@ -911,8 +911,8 @@ grpc_error_handle Server::SetupTransport(
const ChannelArgs& args,
const RefCountedPtr<channelz::SocketNode>& socket_node) {
// Create channel.
absl::StatusOr<RefCountedPtr<GrpcChannel>> channel =
GrpcChannel::Create(nullptr, args, GRPC_SERVER_CHANNEL, transport);
absl::StatusOr<RefCountedPtr<Channel>> channel =
Channel::Create(nullptr, args, GRPC_SERVER_CHANNEL, transport);
if (!channel.ok()) {
return absl_status_to_grpc_error(channel.status());
}
@ -1053,11 +1053,11 @@ void Server::KillPendingWorkLocked(grpc_error_handle error) {
}
}
std::vector<RefCountedPtr<GrpcChannel>> Server::GetChannelsLocked() const {
std::vector<RefCountedPtr<GrpcChannel>> channels;
std::vector<RefCountedPtr<Channel>> Server::GetChannelsLocked() const {
std::vector<RefCountedPtr<Channel>> channels;
channels.reserve(channels_.size());
for (const ChannelData* chand : channels_) {
channels.push_back(chand->channel()->RefAsSubclass<GrpcChannel>());
channels.push_back(chand->channel()->RefAsSubclass<Channel>());
}
return channels;
}
@ -1309,7 +1309,7 @@ absl::StatusOr<CallInitiator> Server::ChannelData::CreateCall(
}
void Server::ChannelData::InitTransport(RefCountedPtr<Server> server,
RefCountedPtr<GrpcChannel> channel,
RefCountedPtr<Channel> channel,
size_t cq_idx, Transport* transport,
intptr_t channelz_socket_uuid) {
server_ = std::move(server);

@ -224,7 +224,7 @@ class Server : public InternallyRefCounted<Server>,
~ChannelData();
void InitTransport(RefCountedPtr<Server> server,
RefCountedPtr<GrpcChannel> channel, size_t cq_idx,
RefCountedPtr<Channel> channel, size_t cq_idx,
Transport* transport, intptr_t channelz_socket_uuid);
RefCountedPtr<Server> server() const { return server_; }
@ -257,7 +257,7 @@ class Server : public InternallyRefCounted<Server>,
static void FinishDestroy(void* arg, grpc_error_handle error);
RefCountedPtr<Server> server_;
RefCountedPtr<GrpcChannel> channel_;
RefCountedPtr<Channel> channel_;
// The index into Server::cqs_ of the CQ used as a starting point for
// where to publish new incoming calls.
size_t cq_idx_;
@ -406,7 +406,7 @@ class Server : public InternallyRefCounted<Server>,
size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag,
grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
std::vector<RefCountedPtr<GrpcChannel>> GetChannelsLocked() const;
std::vector<RefCountedPtr<Channel>> GetChannelsLocked() const;
// Take a shutdown ref for a request (increment by 2) and return if shutdown
// has not been called.

@ -14,26 +14,26 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/transport/channel.h"
#include "src/core/lib/transport/call_factory.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/resource_quota/resource_quota.h"
namespace grpc_core {
Channel::Channel(const ChannelArgs& args)
CallFactory::CallFactory(const ChannelArgs& args)
: call_size_estimator_(1024),
allocator_(args.GetObject<ResourceQuota>()
->memory_quota()
->CreateMemoryOwner()) {}
Arena* Channel::CreateArena() {
Arena* CallFactory::CreateArena() {
const size_t initial_size = call_size_estimator_.CallSizeEstimate();
global_stats().IncrementCallInitialSize(initial_size);
return Arena::Create(initial_size, &allocator_);
}
void Channel::DestroyArena(Arena* arena) {
void CallFactory::DestroyArena(Arena* arena) {
call_size_estimator_.UpdateCallSizeEstimate(arena->TotalUsedBytes());
arena->Destroy();
}

@ -21,16 +21,18 @@
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/transport/call_size_estimator.h"
#include "src/core/lib/transport/call_spine.h"
namespace grpc_core {
class Channel : public RefCounted<Channel> {
class CallFactory : public RefCounted<CallFactory> {
public:
explicit CallFactory(const ChannelArgs& args);
Arena* CreateArena();
void DestroyArena(Arena* arena);
protected:
explicit Channel(const ChannelArgs& args);
virtual CallInitiator CreateCall(ClientMetadataHandle md, Arena* arena) = 0;
private:
CallSizeEstimator call_size_estimator_;
Loading…
Cancel
Save