[call v3] add missing channelz updates in subchannel (#37086)

Closes #37086

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37086 from markdroth:call_v3_subchannel_channelz e5cde30f8c
PiperOrigin-RevId: 648499878
pull/37133/head
Mark D. Roth 10 months ago committed by Copybara-Service
parent 350d26844a
commit 46c36e8aaf
  1. 66
      src/core/client_channel/subchannel.cc
  2. 14
      src/core/client_channel/subchannel.h

@ -97,14 +97,11 @@ using ::grpc_event_engine::experimental::EventEngine;
// ConnectedSubchannel
//
ConnectedSubchannel::ConnectedSubchannel(
const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
ConnectedSubchannel::ConnectedSubchannel(const ChannelArgs& args)
: RefCounted<ConnectedSubchannel>(
GRPC_TRACE_FLAG_ENABLED(subchannel_refcount) ? "ConnectedSubchannel"
: nullptr),
args_(args),
channelz_subchannel_(std::move(channelz_subchannel)) {}
args_(args) {}
//
// LegacyConnectedSubchannel
@ -114,14 +111,19 @@ class LegacyConnectedSubchannel : public ConnectedSubchannel {
public:
LegacyConnectedSubchannel(
RefCountedPtr<grpc_channel_stack> channel_stack, const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
: ConnectedSubchannel(args, std::move(channelz_subchannel)),
RefCountedPtr<channelz::SubchannelNode> channelz_node)
: ConnectedSubchannel(args),
channelz_node_(std::move(channelz_node)),
channel_stack_(std::move(channel_stack)) {}
~LegacyConnectedSubchannel() override {
channel_stack_.reset(DEBUG_LOCATION, "ConnectedSubchannel");
}
channelz::SubchannelNode* channelz_node() const {
return channelz_node_.get();
}
void StartWatch(
grpc_pollset_set* interested_parties,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher) override {
@ -162,6 +164,7 @@ class LegacyConnectedSubchannel : public ConnectedSubchannel {
}
private:
RefCountedPtr<channelz::SubchannelNode> channelz_node_;
RefCountedPtr<grpc_channel_stack> channel_stack_;
};
@ -191,9 +194,8 @@ class NewConnectedSubchannel : public ConnectedSubchannel {
NewConnectedSubchannel(
RefCountedPtr<UnstartedCallDestination> call_destination,
RefCountedPtr<TransportCallDestination> transport,
const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
: ConnectedSubchannel(args, std::move(channelz_subchannel)),
const ChannelArgs& args)
: ConnectedSubchannel(args),
call_destination_(std::move(call_destination)),
transport_(std::move(transport)) {}
@ -240,7 +242,8 @@ RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args,
}
SubchannelCall::SubchannelCall(Args args, grpc_error_handle* error)
: connected_subchannel_(std::move(args.connected_subchannel)),
: connected_subchannel_(args.connected_subchannel
.TakeAsSubclass<LegacyConnectedSubchannel>()),
deadline_(args.deadline) {
grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this);
const grpc_call_element_args call_args = {
@ -259,7 +262,7 @@ SubchannelCall::SubchannelCall(Args args, grpc_error_handle* error)
return;
}
grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
auto* channelz_node = connected_subchannel_->channelz_subchannel();
auto* channelz_node = connected_subchannel_->channelz_node();
if (channelz_node != nullptr) {
channelz_node->RecordCallStarted();
}
@ -327,13 +330,9 @@ void SubchannelCall::Destroy(void* arg, grpc_error_handle /*error*/) {
void SubchannelCall::MaybeInterceptRecvTrailingMetadata(
grpc_transport_stream_op_batch* batch) {
// only intercept payloads with recv trailing.
if (!batch->recv_trailing_metadata) {
return;
}
if (!batch->recv_trailing_metadata) return;
// only add interceptor is channelz is enabled.
if (connected_subchannel_->channelz_subchannel() == nullptr) {
return;
}
if (connected_subchannel_->channelz_node() == nullptr) return;
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
this, grpc_schedule_on_exec_ctx);
// save some state needed for the interception callback.
@ -366,13 +365,13 @@ void SubchannelCall::RecvTrailingMetadataReady(void* arg,
CHECK_NE(call->recv_trailing_metadata_, nullptr);
grpc_status_code status = GRPC_STATUS_OK;
GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_, error);
channelz::SubchannelNode* channelz_subchannel =
call->connected_subchannel_->channelz_subchannel();
CHECK_NE(channelz_subchannel, nullptr);
channelz::SubchannelNode* channelz_node =
call->connected_subchannel_->channelz_node();
CHECK_NE(channelz_node, nullptr);
if (status == GRPC_STATUS_OK) {
channelz_subchannel->RecordCallSucceeded();
channelz_node->RecordCallSucceeded();
} else {
channelz_subchannel->RecordCallFailed();
channelz_node->RecordCallFailed();
}
Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_, error);
}
@ -860,6 +859,24 @@ bool Subchannel::PublishTransportLocked() {
->client_transport());
InterceptionChainBuilder builder(
connecting_result_.channel_args.SetObject(transport.get()));
if (channelz_node_ != nullptr) {
// TODO(ctiller): If/when we have a good way to access the subchannel
// from a filter (maybe GetContext<Subchannel>?), consider replacing
// these two hooks with a filter so that we can avoid storing two
// separate refs to the channelz node in each connection.
builder.AddOnClientInitialMetadata(
[channelz_node = channelz_node_](ClientMetadata&) {
channelz_node->RecordCallStarted();
});
builder.AddOnServerTrailingMetadata(
[channelz_node = channelz_node_](ServerMetadata& metadata) {
if (IsStatusOk(metadata)) {
channelz_node->RecordCallSucceeded();
} else {
channelz_node->RecordCallFailed();
}
});
}
CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder(
GRPC_CLIENT_SUBCHANNEL, builder);
auto transport_destination =
@ -874,8 +891,7 @@ bool Subchannel::PublishTransportLocked() {
return false;
}
connected_subchannel_ = MakeRefCounted<NewConnectedSubchannel>(
std::move(*call_destination), std::move(transport_destination), args_,
channelz_node_);
std::move(*call_destination), std::move(transport_destination), args_);
}
connecting_result_.Reset();
// Publish.

@ -66,9 +66,6 @@ class SubchannelCall;
class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
public:
const ChannelArgs& args() const { return args_; }
channelz::SubchannelNode* channelz_subchannel() const {
return channelz_subchannel_.get();
}
virtual void StartWatch(
grpc_pollset_set* interested_parties,
@ -85,17 +82,14 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
virtual void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) = 0;
protected:
ConnectedSubchannel(
const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
explicit ConnectedSubchannel(const ChannelArgs& args);
private:
ChannelArgs args_;
// ref counted pointer to the channelz node in this connected subchannel's
// owning subchannel.
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
};
class LegacyConnectedSubchannel;
// Implements the interface of RefCounted<>.
class SubchannelCall final {
public:
@ -150,7 +144,7 @@ class SubchannelCall final {
static void Destroy(void* arg, grpc_error_handle error);
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
RefCountedPtr<LegacyConnectedSubchannel> connected_subchannel_;
grpc_closure* after_call_stack_destroy_ = nullptr;
// State needed to support channelz interception of recv trailing metadata.
grpc_closure recv_trailing_metadata_ready_;

Loading…
Cancel
Save