Remove channelz from LB policy API.

pull/19066/head
Mark D. Roth 6 years ago
parent 10f7ab2b4d
commit cfb31818ef
  1. 105
      src/core/ext/filters/client_channel/client_channel.cc
  2. 8
      src/core/ext/filters/client_channel/client_channel.h
  3. 83
      src/core/ext/filters/client_channel/client_channel_channelz.cc
  4. 30
      src/core/ext/filters/client_channel/client_channel_channelz.h
  5. 8
      src/core/ext/filters/client_channel/client_channel_plugin.cc
  6. 28
      src/core/ext/filters/client_channel/lb_policy.h
  7. 101
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  8. 59
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  9. 58
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  10. 13
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  11. 170
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  12. 83
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  13. 8
      src/core/ext/filters/client_channel/resolving_lb_policy.h
  14. 10
      src/core/ext/filters/client_channel/subchannel_interface.h
  15. 155
      src/core/lib/channel/channelz.cc
  16. 76
      src/core/lib/channel/channelz.h
  17. 133
      src/core/lib/surface/channel.cc
  18. 4
      src/core/lib/surface/channel.h
  19. 11
      test/core/channel/channel_trace_test.cc
  20. 3
      test/core/channel/channelz_test.cc
  21. 10
      test/core/util/test_lb_policies.cc
  22. 3
      test/cpp/end2end/channelz_service_test.cc

@ -118,18 +118,6 @@ class ChannelData {
static void GetChannelInfo(grpc_channel_element* elem,
const grpc_channel_info* info);
void set_channelz_node(channelz::ClientChannelNode* node) {
channelz_node_ = node;
resolving_lb_policy_->set_channelz_node(node->Ref());
}
void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) {
if (resolving_lb_policy_ != nullptr) {
resolving_lb_policy_->FillChildRefsForChannelz(child_subchannels,
child_channels);
}
}
bool deadline_checking_enabled() const { return deadline_checking_enabled_; }
bool enable_retries() const { return enable_retries_; }
size_t per_rpc_retry_buffer_size() const {
@ -249,8 +237,7 @@ class ChannelData {
ClientChannelFactory* client_channel_factory_;
UniquePtr<char> server_name_;
RefCountedPtr<ServiceConfig> default_service_config_;
// Initialized shortly after construction.
channelz::ClientChannelNode* channelz_node_ = nullptr;
channelz::ChannelNode* channelz_node_;
//
// Fields used in the data plane. Guarded by data_plane_combiner.
@ -269,12 +256,13 @@ class ChannelData {
grpc_combiner* combiner_;
grpc_pollset_set* interested_parties_;
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy_;
OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_;
grpc_connectivity_state_tracker state_tracker_;
ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
UniquePtr<char> health_check_service_name_;
RefCountedPtr<ServiceConfig> saved_service_config_;
bool received_first_resolver_result_ = false;
Map<Subchannel*, int> subchannel_refcount_map_;
//
// Fields accessed from both data plane and control plane combiners.
@ -735,6 +723,7 @@ class ChannelData::ConnectivityStateAndPickerSetter {
// Update connectivity state here, while holding control plane combiner.
grpc_connectivity_state_set(&chand->state_tracker_, state, reason);
if (chand->channelz_node_ != nullptr) {
chand->channelz_node_->SetConnectivityState(state);
chand->channelz_node_->AddTraceEvent(
channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string(
@ -971,12 +960,39 @@ void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked(
// control plane combiner.
class ChannelData::GrpcSubchannel : public SubchannelInterface {
public:
GrpcSubchannel(Subchannel* subchannel,
GrpcSubchannel(ChannelData* chand, Subchannel* subchannel,
UniquePtr<char> health_check_service_name)
: subchannel_(subchannel),
health_check_service_name_(std::move(health_check_service_name)) {}
: chand_(chand),
subchannel_(subchannel),
health_check_service_name_(std::move(health_check_service_name)) {
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "GrpcSubchannel");
auto* subchannel_node = subchannel_->channelz_node();
if (subchannel_node != nullptr) {
intptr_t subchannel_uuid = subchannel_node->uuid();
auto it = chand_->subchannel_refcount_map_.find(subchannel_);
if (it == chand_->subchannel_refcount_map_.end()) {
chand_->channelz_node_->AddChildSubchannel(subchannel_uuid);
it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first;
}
++it->second;
}
}
~GrpcSubchannel() { GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB"); }
~GrpcSubchannel() {
auto* subchannel_node = subchannel_->channelz_node();
if (subchannel_node != nullptr) {
intptr_t subchannel_uuid = subchannel_node->uuid();
auto it = chand_->subchannel_refcount_map_.find(subchannel_);
GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
--it->second;
if (it->second == 0) {
chand_->channelz_node_->RemoveChildSubchannel(subchannel_uuid);
chand_->subchannel_refcount_map_.erase(it);
}
}
GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB");
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "GrpcSubchannel");
}
grpc_connectivity_state CheckConnectivityState(
RefCountedPtr<ConnectedSubchannelInterface>* connected_subchannel)
@ -1005,13 +1021,10 @@ class ChannelData::GrpcSubchannel : public SubchannelInterface {
void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
channelz::SubchannelNode* channelz_node() override {
return subchannel_->channelz_node();
}
void ResetBackoff() override { subchannel_->ResetBackoff(); }
private:
ChannelData* chand_;
Subchannel* subchannel_;
UniquePtr<char> health_check_service_name_;
};
@ -1041,7 +1054,10 @@ class ChannelData::ClientChannelControlHelper
health_check_service_name.reset(
gpr_strdup(chand_->health_check_service_name_.get()));
}
static const char* args_to_remove[] = {GRPC_ARG_INHIBIT_HEALTH_CHECKING};
static const char* args_to_remove[] = {
GRPC_ARG_INHIBIT_HEALTH_CHECKING,
GRPC_ARG_CHANNELZ_CHANNEL_NODE,
};
grpc_arg arg = SubchannelPoolInterface::CreateChannelArg(
chand_->subchannel_pool_.get());
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
@ -1050,7 +1066,7 @@ class ChannelData::ClientChannelControlHelper
chand_->client_channel_factory_->CreateSubchannel(new_args);
grpc_channel_args_destroy(new_args);
if (subchannel == nullptr) return nullptr;
return MakeRefCounted<GrpcSubchannel>(subchannel,
return MakeRefCounted<GrpcSubchannel>(chand_, subchannel,
std::move(health_check_service_name));
}
@ -1082,7 +1098,22 @@ class ChannelData::ClientChannelControlHelper
// No-op -- we should never get this from ResolvingLoadBalancingPolicy.
void RequestReresolution() override {}
void AddTraceEvent(TraceSeverity severity, const char* message) override {
if (chand_->channelz_node_ != nullptr) {
chand_->channelz_node_->AddTraceEvent(
ConvertSeverityEnum(severity),
grpc_slice_from_copied_string(message));
}
}
private:
static channelz::ChannelTrace::Severity ConvertSeverityEnum(
TraceSeverity severity) {
if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
return channelz::ChannelTrace::Error;
}
ChannelData* chand_;
};
@ -1125,6 +1156,15 @@ RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
return GlobalSubchannelPool::instance();
}
channelz::ChannelNode* GetChannelzNode(const grpc_channel_args* args) {
const grpc_arg* arg =
grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
if (arg != nullptr && arg->type == GRPC_ARG_POINTER) {
return static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
}
return nullptr;
}
ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
: deadline_checking_enabled_(
grpc_deadline_checking_enabled(args->channel_args)),
@ -1134,6 +1174,7 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
owning_stack_(args->channel_stack),
client_channel_factory_(
ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
channelz_node_(GetChannelzNode(args->channel_args)),
data_plane_combiner_(grpc_combiner_create()),
combiner_(grpc_combiner_create()),
interested_parties_(grpc_pollset_set_create()),
@ -3532,20 +3573,6 @@ const grpc_channel_filter grpc_client_channel_filter = {
"client-channel",
};
void grpc_client_channel_set_channelz_node(
grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
chand->set_channelz_node(node);
}
void grpc_client_channel_populate_child_refs(
grpc_channel_element* elem,
grpc_core::channelz::ChildRefsList* child_subchannels,
grpc_core::channelz::ChildRefsList* child_channels) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
chand->FillChildRefsForChannelz(child_subchannels, child_channels);
}
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_channel_element* elem, int try_to_connect) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);

@ -40,14 +40,6 @@ extern grpc_core::TraceFlag grpc_client_channel_trace;
extern const grpc_channel_filter grpc_client_channel_filter;
void grpc_client_channel_set_channelz_node(
grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node);
void grpc_client_channel_populate_child_refs(
grpc_channel_element* elem,
grpc_core::channelz::ChildRefsList* child_subchannels,
grpc_core::channelz::ChildRefsList* child_channels);
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_channel_element* elem, int try_to_connect);

@ -29,89 +29,6 @@
namespace grpc_core {
namespace channelz {
namespace {
void* client_channel_channelz_copy(void* p) { return p; }
void client_channel_channelz_destroy(void* p) {}
int client_channel_channelz_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
} // namespace
static const grpc_arg_pointer_vtable client_channel_channelz_vtable = {
client_channel_channelz_copy, client_channel_channelz_destroy,
client_channel_channelz_cmp};
ClientChannelNode::ClientChannelNode(grpc_channel* channel,
size_t channel_tracer_max_nodes,
bool is_top_level_channel)
: ChannelNode(channel, channel_tracer_max_nodes, is_top_level_channel) {
client_channel_ =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
GPR_ASSERT(client_channel_->filter == &grpc_client_channel_filter);
grpc_client_channel_set_channelz_node(client_channel_, this);
}
void ClientChannelNode::PopulateConnectivityState(grpc_json* json) {
grpc_connectivity_state state;
if (ChannelIsDestroyed()) {
state = GRPC_CHANNEL_SHUTDOWN;
} else {
state =
grpc_client_channel_check_connectivity_state(client_channel_, false);
}
json = grpc_json_create_child(nullptr, json, "state", nullptr,
GRPC_JSON_OBJECT, false);
grpc_json_create_child(nullptr, json, "state",
grpc_connectivity_state_name(state), GRPC_JSON_STRING,
false);
}
void ClientChannelNode::PopulateChildRefs(grpc_json* json) {
ChildRefsList child_subchannels;
ChildRefsList child_channels;
grpc_json* json_iterator = nullptr;
grpc_client_channel_populate_child_refs(client_channel_, &child_subchannels,
&child_channels);
if (!child_subchannels.empty()) {
grpc_json* array_parent = grpc_json_create_child(
nullptr, json, "subchannelRef", nullptr, GRPC_JSON_ARRAY, false);
for (size_t i = 0; i < child_subchannels.size(); ++i) {
json_iterator =
grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr,
GRPC_JSON_OBJECT, false);
grpc_json_add_number_string_child(json_iterator, nullptr, "subchannelId",
child_subchannels[i]);
}
}
if (!child_channels.empty()) {
grpc_json* array_parent = grpc_json_create_child(
nullptr, json, "channelRef", nullptr, GRPC_JSON_ARRAY, false);
json_iterator = nullptr;
for (size_t i = 0; i < child_channels.size(); ++i) {
json_iterator =
grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr,
GRPC_JSON_OBJECT, false);
grpc_json_add_number_string_child(json_iterator, nullptr, "channelId",
child_channels[i]);
}
}
}
grpc_arg ClientChannelNode::CreateChannelArg() {
return grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_NODE_CREATION_FUNC),
reinterpret_cast<void*>(MakeClientChannelNode),
&client_channel_channelz_vtable);
}
RefCountedPtr<ChannelNode> ClientChannelNode::MakeClientChannelNode(
grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel) {
return MakeRefCounted<ClientChannelNode>(channel, channel_tracer_max_nodes,
is_top_level_channel);
}
SubchannelNode::SubchannelNode(Subchannel* subchannel,
size_t channel_tracer_max_nodes)

@ -32,32 +32,6 @@ class Subchannel;
namespace channelz {
// Subtype of ChannelNode that overrides and provides client_channel specific
// functionality like querying for connectivity_state and subchannel data.
class ClientChannelNode : public ChannelNode {
public:
static RefCountedPtr<ChannelNode> MakeClientChannelNode(
grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel);
ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel);
virtual ~ClientChannelNode() {}
// Overriding template methods from ChannelNode to render information that
// only ClientChannelNode knows about.
void PopulateConnectivityState(grpc_json* json) override;
void PopulateChildRefs(grpc_json* json) override;
// Helper to create a channel arg to ensure this type of ChannelNode is
// created.
static grpc_arg CreateChannelArg();
private:
grpc_channel_element* client_channel_;
};
// Handles channelz bookkeeping for sockets
class SubchannelNode : public BaseNode {
public:
SubchannelNode(Subchannel* subchannel, size_t channel_tracer_max_nodes);
@ -85,12 +59,12 @@ class SubchannelNode : public BaseNode {
void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); }
private:
void PopulateConnectivityState(grpc_json* json);
Subchannel* subchannel_;
UniquePtr<char> target_;
CallCountingHelper call_counter_;
ChannelTrace trace_;
void PopulateConnectivityState(grpc_json* json);
};
} // namespace channelz

@ -38,14 +38,6 @@
#include "src/core/lib/surface/channel_init.h"
static bool append_filter(grpc_channel_stack_builder* builder, void* arg) {
const grpc_channel_args* args =
grpc_channel_stack_builder_get_channel_arguments(builder);
grpc_arg args_to_add[] = {
grpc_core::channelz::ClientChannelNode::CreateChannelArg()};
grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
args, args_to_add, GPR_ARRAY_SIZE(args_to_add));
grpc_channel_stack_builder_set_channel_arguments(builder, new_args);
grpc_channel_args_destroy(new_args);
return grpc_channel_stack_builder_append_filter(
builder, static_cast<const grpc_channel_filter*>(arg), nullptr, nullptr);
}

@ -21,7 +21,6 @@
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/filters/client_channel/subchannel_interface.h"
@ -31,6 +30,7 @@
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata_batch.h"
namespace grpc_core {
@ -201,6 +201,12 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Requests that the resolver re-resolve.
virtual void RequestReresolution() GRPC_ABSTRACT;
/// Adds a trace message associated with the channel.
/// Does NOT take ownership of \a message.
enum TraceSeverity { TRACE_INFO, TRACE_WARNING, TRACE_ERROR };
virtual void AddTraceEvent(TraceSeverity severity,
const char* message) GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
};
@ -274,20 +280,6 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Resets connection backoff.
virtual void ResetBackoffLocked() GRPC_ABSTRACT;
/// Populates child_subchannels and child_channels with the uuids of this
/// LB policy's referenced children.
///
/// This is not invoked from the client_channel's combiner. The
/// implementation is responsible for providing its own synchronization.
virtual void FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) GRPC_ABSTRACT;
void set_channelz_node(
RefCountedPtr<channelz::ClientChannelNode> channelz_node) {
channelz_node_ = std::move(channelz_node);
}
grpc_pollset_set* interested_parties() const { return interested_parties_; }
void Orphan() override;
@ -333,10 +325,6 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
return channel_control_helper_.get();
}
channelz::ClientChannelNode* channelz_node() const {
return channelz_node_.get();
}
/// Shuts down the policy.
virtual void ShutdownLocked() GRPC_ABSTRACT;
@ -349,8 +337,6 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
grpc_pollset_set* interested_parties_;
/// Channel control helper.
UniquePtr<ChannelControlHelper> channel_control_helper_;
/// Channelz node.
RefCountedPtr<channelz::ClientChannelNode> channelz_node_;
};
} // namespace grpc_core

@ -141,9 +141,6 @@ class GrpcLb : public LoadBalancingPolicy {
void UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;
void FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) override;
private:
/// Contains a call to the LB server and all the data related to the call.
@ -300,6 +297,7 @@ class GrpcLb : public LoadBalancingPolicy {
void UpdateState(grpc_connectivity_state state,
UniquePtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
void AddTraceEvent(TraceSeverity severity, const char* message) override;
void set_child(LoadBalancingPolicy* child) { child_ = child; }
@ -349,8 +347,6 @@ class GrpcLb : public LoadBalancingPolicy {
// The channel for communicating with the LB server.
grpc_channel* lb_channel_ = nullptr;
// Uuid of the lb channel. Used for channelz.
gpr_atm lb_channel_uuid_ = 0;
// Response generator to inject address updates into lb_channel_.
RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
@ -386,9 +382,6 @@ class GrpcLb : public LoadBalancingPolicy {
grpc_connectivity_state lb_channel_connectivity_ = GRPC_CHANNEL_IDLE;
grpc_closure lb_channel_on_connectivity_changed_;
// Lock held when modifying the value of child_policy_ or
// pending_child_policy_.
gpr_mu child_policy_mu_;
// The child policy to use for the backends.
OrphanablePtr<LoadBalancingPolicy> child_policy_;
// When switching child policies, the new policy will be stored here
@ -655,7 +648,6 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
grpc_pollset_set_del_pollset_set(
parent_->child_policy_->interested_parties(),
parent_->interested_parties());
MutexLock lock(&parent_->child_policy_mu_);
parent_->child_policy_ = std::move(parent_->pending_child_policy_);
} else if (!CalledByCurrentChild()) {
// This request is from an outdated child, so ignore it.
@ -735,6 +727,15 @@ void GrpcLb::Helper::RequestReresolution() {
}
}
void GrpcLb::Helper::AddTraceEvent(TraceSeverity severity,
const char* message) {
if (parent_->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) {
return;
}
parent_->channel_control_helper()->AddTraceEvent(severity, message);
}
//
// GrpcLb::BalancerCallState
//
@ -1244,25 +1245,34 @@ grpc_channel_args* BuildBalancerChannelArgs(
// treated as a stand-alone channel and not inherit this argument from the
// args of the parent channel.
GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
// Don't want to pass down channelz node from parent; the balancer
// channel will get its own.
GRPC_ARG_CHANNELZ_CHANNEL_NODE,
};
// Channel args to add.
const grpc_arg args_to_add[] = {
// The fake resolver response generator, which we use to inject
// address updates into the LB channel.
InlinedVector<grpc_arg, 3> args_to_add;
// The fake resolver response generator, which we use to inject
// address updates into the LB channel.
args_to_add.emplace_back(
grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
response_generator),
// A channel arg indicating the target is a grpclb load balancer.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1),
// A channel arg indicating this is an internal channels, aka it is
// owned by components in Core, not by the user application.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL), 1),
};
response_generator));
// A channel arg indicating the target is a grpclb load balancer.
args_to_add.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1));
// The parent channel's channelz uuid.
channelz::ChannelNode* channelz_node = nullptr;
const grpc_arg* arg =
grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
if (arg != nullptr && arg->type == GRPC_ARG_POINTER &&
arg->value.pointer.p != nullptr) {
channelz_node = static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
args_to_add.emplace_back(
channelz::MakeParentUuidArg(channelz_node->uuid()));
}
// Construct channel args.
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add,
GPR_ARRAY_SIZE(args_to_add));
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(),
args_to_add.size());
// Make any necessary modifications for security.
return grpc_lb_policy_grpclb_modify_lb_channel_args(addresses, new_args);
}
@ -1288,7 +1298,6 @@ GrpcLb::GrpcLb(Args args)
GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
grpc_combiner_scheduler(args.combiner));
gpr_mu_init(&child_policy_mu_);
// Record server name.
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
const char* server_uri = grpc_channel_arg_get_string(arg);
@ -1314,7 +1323,6 @@ GrpcLb::GrpcLb(Args args)
GrpcLb::~GrpcLb() {
gpr_free((void*)server_name_);
grpc_channel_args_destroy(args_);
gpr_mu_destroy(&child_policy_mu_);
}
void GrpcLb::ShutdownLocked() {
@ -1335,11 +1343,8 @@ void GrpcLb::ShutdownLocked() {
grpc_pollset_set_del_pollset_set(
pending_child_policy_->interested_parties(), interested_parties());
}
{
MutexLock lock(&child_policy_mu_);
child_policy_.reset();
pending_child_policy_.reset();
}
child_policy_.reset();
pending_child_policy_.reset();
// We destroy the LB channel here instead of in our destructor because
// destroying the channel triggers a last callback to
// OnBalancerChannelConnectivityChangedLocked(), and we need to be
@ -1347,7 +1352,6 @@ void GrpcLb::ShutdownLocked() {
if (lb_channel_ != nullptr) {
grpc_channel_destroy(lb_channel_);
lb_channel_ = nullptr;
gpr_atm_no_barrier_store(&lb_channel_uuid_, 0);
}
}
@ -1367,29 +1371,6 @@ void GrpcLb::ResetBackoffLocked() {
}
}
void GrpcLb::FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) {
{
// Delegate to the child policy to fill the children subchannels.
// This must be done holding child_policy_mu_, since this method
// does not run in the combiner.
MutexLock lock(&child_policy_mu_);
if (child_policy_ != nullptr) {
child_policy_->FillChildRefsForChannelz(child_subchannels,
child_channels);
}
if (pending_child_policy_ != nullptr) {
pending_child_policy_->FillChildRefsForChannelz(child_subchannels,
child_channels);
}
}
gpr_atm uuid = gpr_atm_no_barrier_load(&lb_channel_uuid_);
if (uuid != 0) {
child_channels->push_back(uuid);
}
}
void GrpcLb::UpdateLocked(UpdateArgs args) {
const bool is_initial_update = lb_channel_ == nullptr;
auto* grpclb_config =
@ -1472,11 +1453,6 @@ void GrpcLb::ProcessAddressesAndChannelArgsLocked(
lb_channel_ =
channel_control_helper()->CreateChannel(uri_str, *lb_channel_args);
GPR_ASSERT(lb_channel_ != nullptr);
grpc_core::channelz::ChannelNode* channel_node =
grpc_channel_get_channelz_node(lb_channel_);
if (channel_node != nullptr) {
gpr_atm_no_barrier_store(&lb_channel_uuid_, channel_node->uuid());
}
gpr_free(uri_str);
}
// Propagate updates to the LB channel (pick_first) through the fake
@ -1764,15 +1740,10 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() {
gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this,
child_policy_ == nullptr ? "" : "pending ", child_policy_name);
}
auto new_policy =
CreateChildPolicyLocked(child_policy_name, update_args.args);
// Swap the policy into place.
auto& lb_policy =
child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
{
MutexLock lock(&child_policy_mu_);
lb_policy = std::move(new_policy);
}
lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args);
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.

@ -53,8 +53,6 @@ class PickFirst : public LoadBalancingPolicy {
void UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* ignored) override;
private:
~PickFirst();
@ -128,22 +126,8 @@ class PickFirst : public LoadBalancingPolicy {
RefCountedPtr<ConnectedSubchannelInterface> connected_subchannel_;
};
// Helper class to ensure that any function that modifies the child refs
// data structures will update the channelz snapshot data structures before
// returning.
class AutoChildRefsUpdater {
public:
explicit AutoChildRefsUpdater(PickFirst* pf) : pf_(pf) {}
~AutoChildRefsUpdater() { pf_->UpdateChildRefsLocked(); }
private:
PickFirst* pf_;
};
void ShutdownLocked() override;
void UpdateChildRefsLocked();
// All our subchannels.
OrphanablePtr<PickFirstSubchannelList> subchannel_list_;
// Latest pending subchannel list.
@ -154,12 +138,6 @@ class PickFirst : public LoadBalancingPolicy {
bool idle_ = false;
// Are we shut down?
bool shutdown_ = false;
/// Lock and data used to capture snapshots of this channels child
/// channels and subchannels. This data is consumed by channelz.
Mutex child_refs_mu_;
channelz::ChildRefsList child_subchannels_;
channelz::ChildRefsList child_channels_;
};
PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) {
@ -177,7 +155,6 @@ PickFirst::~PickFirst() {
}
void PickFirst::ShutdownLocked() {
AutoChildRefsUpdater guard(this);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
}
@ -212,42 +189,7 @@ void PickFirst::ResetBackoffLocked() {
}
}
void PickFirst::FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels_to_fill,
channelz::ChildRefsList* ignored) {
MutexLock lock(&child_refs_mu_);
for (size_t i = 0; i < child_subchannels_.size(); ++i) {
// TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
// have to implement lightweight set. For now, we don't care about
// performance when channelz requests are made.
bool found = false;
for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
found = true;
break;
}
}
if (!found) {
child_subchannels_to_fill->push_back(child_subchannels_[i]);
}
}
}
void PickFirst::UpdateChildRefsLocked() {
channelz::ChildRefsList cs;
if (subchannel_list_ != nullptr) {
subchannel_list_->PopulateChildRefsList(&cs);
}
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
}
// atomically update the data that channelz will actually be looking at.
MutexLock lock(&child_refs_mu_);
child_subchannels_ = std::move(cs);
}
void PickFirst::UpdateLocked(UpdateArgs args) {
AutoChildRefsUpdater guard(this);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p received update with %" PRIuPTR " addresses", this,
@ -348,7 +290,6 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) {
PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
AutoChildRefsUpdater guard(p);
// The notification must be for a subchannel in either the current or
// latest pending subchannel lists.
GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||

@ -63,8 +63,6 @@ class RoundRobin : public LoadBalancingPolicy {
void UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;
void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* ignored) override;
private:
~RoundRobin();
@ -160,22 +158,8 @@ class RoundRobin : public LoadBalancingPolicy {
InlinedVector<RefCountedPtr<ConnectedSubchannelInterface>, 10> subchannels_;
};
// Helper class to ensure that any function that modifies the child refs
// data structures will update the channelz snapshot data structures before
// returning.
class AutoChildRefsUpdater {
public:
explicit AutoChildRefsUpdater(RoundRobin* rr) : rr_(rr) {}
~AutoChildRefsUpdater() { rr_->UpdateChildRefsLocked(); }
private:
RoundRobin* rr_;
};
void ShutdownLocked() override;
void UpdateChildRefsLocked();
/** list of subchannels */
OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
/** Latest version of the subchannel list.
@ -186,11 +170,6 @@ class RoundRobin : public LoadBalancingPolicy {
OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
/** are we shutting down? */
bool shutdown_ = false;
/// Lock and data used to capture snapshots of this channel's child
/// channels and subchannels. This data is consumed by channelz.
Mutex child_refs_mu_;
channelz::ChildRefsList child_subchannels_;
channelz::ChildRefsList child_channels_;
};
//
@ -255,7 +234,6 @@ RoundRobin::~RoundRobin() {
}
void RoundRobin::ShutdownLocked() {
AutoChildRefsUpdater guard(this);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
}
@ -271,40 +249,6 @@ void RoundRobin::ResetBackoffLocked() {
}
}
void RoundRobin::FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels_to_fill,
channelz::ChildRefsList* ignored) {
MutexLock lock(&child_refs_mu_);
for (size_t i = 0; i < child_subchannels_.size(); ++i) {
// TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
// have to implement lightweight set. For now, we don't care about
// performance when channelz requests are made.
bool found = false;
for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
found = true;
break;
}
}
if (!found) {
child_subchannels_to_fill->push_back(child_subchannels_[i]);
}
}
}
void RoundRobin::UpdateChildRefsLocked() {
channelz::ChildRefsList cs;
if (subchannel_list_ != nullptr) {
subchannel_list_->PopulateChildRefsList(&cs);
}
if (latest_pending_subchannel_list_ != nullptr) {
latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
}
// atomically update the data that channelz will actually be looking at.
MutexLock lock(&child_refs_mu_);
child_subchannels_ = std::move(cs);
}
void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
if (num_subchannels() == 0) return;
// Check current state of each subchannel synchronously, since any
@ -396,7 +340,6 @@ void RoundRobin::RoundRobinSubchannelList::
void RoundRobin::RoundRobinSubchannelList::
UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
RoundRobin* p = static_cast<RoundRobin*>(policy());
AutoChildRefsUpdater guard(p);
if (num_ready_ > 0) {
if (p->subchannel_list_.get() != this) {
// Promote this list to p->subchannel_list_.
@ -468,7 +411,6 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
}
void RoundRobin::UpdateLocked(UpdateArgs args) {
AutoChildRefsUpdater guard(this);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
this, args.addresses.size());

@ -223,19 +223,6 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
// Returns true if the subchannel list is shutting down.
bool shutting_down() const { return shutting_down_; }
// Populates refs_list with the uuids of this SubchannelLists's subchannels.
void PopulateChildRefsList(channelz::ChildRefsList* refs_list) {
for (size_t i = 0; i < subchannels_.size(); ++i) {
if (subchannels_[i].subchannel() != nullptr) {
grpc_core::channelz::SubchannelNode* subchannel_node =
subchannels_[i].subchannel()->channelz_node();
if (subchannel_node != nullptr) {
refs_list->push_back(subchannel_node->uuid());
}
}
}
}
// Accessors.
LoadBalancingPolicy* policy() const { return policy_; }
TraceFlag* tracer() const { return tracer_; }

@ -157,9 +157,6 @@ class XdsLb : public LoadBalancingPolicy {
void UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;
void FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) override;
private:
struct LocalityServerlistEntry;
@ -343,6 +340,7 @@ class XdsLb : public LoadBalancingPolicy {
void UpdateState(grpc_connectivity_state state,
UniquePtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
void AddTraceEvent(TraceSeverity severity, const char* message) override;
void set_child(LoadBalancingPolicy* child) { child_ = child; }
@ -398,8 +396,6 @@ class XdsLb : public LoadBalancingPolicy {
const grpc_channel_args* args);
void ShutdownLocked();
void ResetBackoffLocked();
void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels);
void Orphan() override;
private:
@ -415,6 +411,8 @@ class XdsLb : public LoadBalancingPolicy {
void UpdateState(grpc_connectivity_state state,
UniquePtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
void AddTraceEvent(TraceSeverity severity,
const char* message) override;
void set_child(LoadBalancingPolicy* child) { child_ = child; }
private:
@ -432,9 +430,6 @@ class XdsLb : public LoadBalancingPolicy {
OrphanablePtr<LoadBalancingPolicy> child_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
// Lock held when modifying the value of child_policy_ or
// pending_child_policy_.
Mutex child_policy_mu_;
RefCountedPtr<XdsLb> parent_;
RefCountedPtr<PickerRef> picker_ref_;
grpc_connectivity_state connectivity_state_;
@ -446,17 +441,12 @@ class XdsLb : public LoadBalancingPolicy {
const grpc_channel_args* args, XdsLb* parent);
void ShutdownLocked();
void ResetBackoffLocked();
void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels);
private:
void PruneLocalities(const LocalityList& locality_list);
Map<RefCountedPtr<LocalityName>, OrphanablePtr<LocalityEntry>,
LocalityName::Less>
map_;
// Lock held while filling child refs for all localities
// inside the map
Mutex child_refs_mu_;
};
struct LocalityServerlistEntry {
@ -511,10 +501,6 @@ class XdsLb : public LoadBalancingPolicy {
// The channel for communicating with the LB server.
OrphanablePtr<BalancerChannelState> lb_chand_;
OrphanablePtr<BalancerChannelState> pending_lb_chand_;
// Mutex to protect the channel to the LB server. This is used when
// processing a channelz request.
// TODO(juanlishen): Replace this with atomic.
Mutex lb_chand_mu_;
// Timeout in milliseconds for the LB call. 0 means no deadline.
int lb_call_timeout_ms_ = 0;
@ -538,9 +524,6 @@ class XdsLb : public LoadBalancingPolicy {
// The policy to use for the fallback backends.
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy_config_;
// Lock held when modifying the value of fallback_policy_ or
// pending_fallback_policy_.
Mutex fallback_policy_mu_;
// Non-null iff we are in fallback mode.
OrphanablePtr<LoadBalancingPolicy> fallback_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_fallback_policy_;
@ -648,7 +631,6 @@ void XdsLb::FallbackHelper::UpdateState(grpc_connectivity_state state,
grpc_pollset_set_del_pollset_set(
parent_->fallback_policy_->interested_parties(),
parent_->interested_parties());
MutexLock lock(&parent_->fallback_policy_mu_);
parent_->fallback_policy_ = std::move(parent_->pending_fallback_policy_);
} else if (!CalledByCurrentFallback()) {
// This request is from an outdated fallback policy, so ignore it.
@ -673,6 +655,15 @@ void XdsLb::FallbackHelper::RequestReresolution() {
parent_->channel_control_helper()->RequestReresolution();
}
void XdsLb::FallbackHelper::AddTraceEvent(TraceSeverity severity,
const char* message) {
if (parent_->shutting_down_ ||
(!CalledByPendingFallback() && !CalledByCurrentFallback())) {
return;
}
parent_->channel_control_helper()->AddTraceEvent(severity, message);
}
//
// serverlist parsing code
//
@ -1365,21 +1356,29 @@ grpc_channel_args* BuildBalancerChannelArgs(const grpc_channel_args* args) {
// treated as a stand-alone channel and not inherit this argument from the
// args of the parent channel.
GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
// Don't want to pass down channelz node from parent; the balancer
// channel will get its own.
GRPC_ARG_CHANNELZ_CHANNEL_NODE,
};
// Channel args to add.
const grpc_arg args_to_add[] = {
// A channel arg indicating the target is a xds load balancer.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER), 1),
// A channel arg indicating this is an internal channels, aka it is
// owned by components in Core, not by the user application.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL), 1),
};
InlinedVector<grpc_arg, 2> args_to_add;
// A channel arg indicating the target is a xds load balancer.
args_to_add.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER), 1));
// The parent channel's channelz uuid.
channelz::ChannelNode* channelz_node = nullptr;
const grpc_arg* arg =
grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
if (arg != nullptr && arg->type == GRPC_ARG_POINTER &&
arg->value.pointer.p != nullptr) {
channelz_node = static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
args_to_add.emplace_back(
channelz::MakeParentUuidArg(channelz_node->uuid()));
}
// Construct channel args.
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add,
GPR_ARRAY_SIZE(args_to_add));
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(),
args_to_add.size());
// Make any necessary modifications for security.
return grpc_lb_policy_xds_modify_lb_channel_args(new_args);
}
@ -1434,18 +1433,12 @@ void XdsLb::ShutdownLocked() {
grpc_pollset_set_del_pollset_set(
pending_fallback_policy_->interested_parties(), interested_parties());
}
{
MutexLock lock(&fallback_policy_mu_);
fallback_policy_.reset();
pending_fallback_policy_.reset();
}
fallback_policy_.reset();
pending_fallback_policy_.reset();
// We reset the LB channels here instead of in our destructor because they
// hold refs to XdsLb.
{
MutexLock lock(&lb_chand_mu_);
lb_chand_.reset();
pending_lb_chand_.reset();
}
lb_chand_.reset();
pending_lb_chand_.reset();
}
//
@ -1468,40 +1461,6 @@ void XdsLb::ResetBackoffLocked() {
}
}
void XdsLb::FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) {
// Delegate to the locality_map_ to fill the children subchannels.
locality_map_.FillChildRefsForChannelz(child_subchannels, child_channels);
{
// This must be done holding fallback_policy_mu_, since this method does not
// run in the combiner.
MutexLock lock(&fallback_policy_mu_);
if (fallback_policy_ != nullptr) {
fallback_policy_->FillChildRefsForChannelz(child_subchannels,
child_channels);
}
if (pending_fallback_policy_ != nullptr) {
pending_fallback_policy_->FillChildRefsForChannelz(child_subchannels,
child_channels);
}
}
MutexLock lock(&lb_chand_mu_);
if (lb_chand_ != nullptr) {
grpc_core::channelz::ChannelNode* channel_node =
grpc_channel_get_channelz_node(lb_chand_->channel());
if (channel_node != nullptr) {
child_channels->push_back(channel_node->uuid());
}
}
if (pending_lb_chand_ != nullptr) {
grpc_core::channelz::ChannelNode* channel_node =
grpc_channel_get_channelz_node(pending_lb_chand_->channel());
if (channel_node != nullptr) {
child_channels->push_back(channel_node->uuid());
}
}
}
void XdsLb::ProcessAddressesAndChannelArgsLocked(
const ServerAddressList& addresses, const grpc_channel_args& args) {
// Update fallback address list.
@ -1691,14 +1650,10 @@ void XdsLb::UpdateFallbackPolicyLocked() {
fallback_policy_ == nullptr ? "" : "pending ",
fallback_policy_name);
}
auto new_policy =
CreateFallbackPolicyLocked(fallback_policy_name, update_args.args);
auto& lb_policy = fallback_policy_ == nullptr ? fallback_policy_
: pending_fallback_policy_;
{
MutexLock lock(&fallback_policy_mu_);
lb_policy = std::move(new_policy);
}
lb_policy =
CreateFallbackPolicyLocked(fallback_policy_name, update_args.args);
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
@ -1769,7 +1724,6 @@ void XdsLb::LocalityMap::PruneLocalities(const LocalityList& locality_list) {
}
}
if (!found) { // Remove entries not present in the locality list
MutexLock lock(&child_refs_mu_);
iter = map_.erase(iter);
} else
iter++;
@ -1786,7 +1740,6 @@ void XdsLb::LocalityMap::UpdateLocked(
if (iter == map_.end()) {
OrphanablePtr<LocalityEntry> new_entry = MakeOrphanable<LocalityEntry>(
parent->Ref(), locality_serverlist[i]->locality_weight);
MutexLock lock(&child_refs_mu_);
iter = map_.emplace(locality_serverlist[i]->locality_name,
std::move(new_entry))
.first;
@ -1799,10 +1752,7 @@ void XdsLb::LocalityMap::UpdateLocked(
PruneLocalities(locality_serverlist);
}
void XdsLb::LocalityMap::ShutdownLocked() {
MutexLock lock(&child_refs_mu_);
map_.clear();
}
void XdsLb::LocalityMap::ShutdownLocked() { map_.clear(); }
void XdsLb::LocalityMap::ResetBackoffLocked() {
for (auto& p : map_) {
@ -1810,15 +1760,6 @@ void XdsLb::LocalityMap::ResetBackoffLocked() {
}
}
void XdsLb::LocalityMap::FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) {
MutexLock lock(&child_refs_mu_);
for (auto& p : map_) {
p.second->FillChildRefsForChannelz(child_subchannels, child_channels);
}
}
//
// XdsLb::LocalityMap::LocalityEntry
//
@ -1954,14 +1895,9 @@ void XdsLb::LocalityMap::LocalityEntry::UpdateLocked(
gpr_log(GPR_INFO, "[xdslb %p] Creating new %schild policy %s", this,
child_policy_ == nullptr ? "" : "pending ", child_policy_name);
}
auto new_policy =
CreateChildPolicyLocked(child_policy_name, update_args.args);
auto& lb_policy =
child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
{
MutexLock lock(&child_policy_mu_);
lb_policy = std::move(new_policy);
}
lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args);
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
@ -1991,11 +1927,8 @@ void XdsLb::LocalityMap::LocalityEntry::ShutdownLocked() {
pending_child_policy_->interested_parties(),
parent_->interested_parties());
}
{
MutexLock lock(&child_policy_mu_);
child_policy_.reset();
pending_child_policy_.reset();
}
child_policy_.reset();
pending_child_policy_.reset();
}
void XdsLb::LocalityMap::LocalityEntry::ResetBackoffLocked() {
@ -2005,17 +1938,6 @@ void XdsLb::LocalityMap::LocalityEntry::ResetBackoffLocked() {
}
}
void XdsLb::LocalityMap::LocalityEntry::FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) {
MutexLock lock(&child_policy_mu_);
child_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
if (pending_child_policy_ != nullptr) {
pending_child_policy_->FillChildRefsForChannelz(child_subchannels,
child_channels);
}
}
void XdsLb::LocalityMap::LocalityEntry::Orphan() {
ShutdownLocked();
Unref();
@ -2070,7 +1992,6 @@ void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState(
grpc_pollset_set_del_pollset_set(
entry_->child_policy_->interested_parties(),
entry_->parent_->interested_parties());
MutexLock lock(&entry_->child_policy_mu_);
entry_->child_policy_ = std::move(entry_->pending_child_policy_);
} else if (!CalledByCurrentChild()) {
// This request is from an outdated child, so ignore it.
@ -2180,6 +2101,15 @@ void XdsLb::LocalityMap::LocalityEntry::Helper::RequestReresolution() {
}
}
void XdsLb::LocalityMap::LocalityEntry::Helper::AddTraceEvent(
TraceSeverity severity, const char* message) {
if (entry_->parent_->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) {
return;
}
entry_->parent_->channel_control_helper()->AddTraceEvent(severity, message);
}
//
// factory
//

@ -137,7 +137,6 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
grpc_pollset_set_del_pollset_set(
parent_->lb_policy_->interested_parties(),
parent_->interested_parties());
MutexLock lock(&parent_->lb_policy_mu_);
parent_->lb_policy_ = std::move(parent_->pending_lb_policy_);
} else if (!CalledByCurrentChild()) {
// This request is from an outdated child, so ignore it.
@ -214,7 +213,6 @@ ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
process_resolver_result_(process_resolver_result),
process_resolver_result_user_data_(process_resolver_result_user_data) {
GPR_ASSERT(process_resolver_result != nullptr);
gpr_mu_init(&lb_policy_mu_);
*error = Init(*args.args);
}
@ -234,13 +232,11 @@ grpc_error* ResolvingLoadBalancingPolicy::Init(const grpc_channel_args& args) {
ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() {
GPR_ASSERT(resolver_ == nullptr);
GPR_ASSERT(lb_policy_ == nullptr);
gpr_mu_destroy(&lb_policy_mu_);
}
void ResolvingLoadBalancingPolicy::ShutdownLocked() {
if (resolver_ != nullptr) {
resolver_.reset();
MutexLock lock(&lb_policy_mu_);
if (lb_policy_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this,
@ -282,22 +278,6 @@ void ResolvingLoadBalancingPolicy::ResetBackoffLocked() {
if (pending_lb_policy_ != nullptr) pending_lb_policy_->ResetBackoffLocked();
}
void ResolvingLoadBalancingPolicy::FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) {
// Delegate to the lb_policy_ to fill the children subchannels.
// This must be done holding lb_policy_mu_, since this method does not
// run in the combiner.
MutexLock lock(&lb_policy_mu_);
if (lb_policy_ != nullptr) {
lb_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
}
if (pending_lb_policy_ != nullptr) {
pending_lb_policy_->FillChildRefsForChannelz(child_subchannels,
child_channels);
}
}
void ResolvingLoadBalancingPolicy::StartResolvingLocked() {
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this);
@ -403,13 +383,9 @@ void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
gpr_log(GPR_INFO, "resolving_lb=%p: Creating new %schild policy %s", this,
lb_policy_ == nullptr ? "" : "pending ", lb_policy_name);
}
auto new_policy =
CreateLbPolicyLocked(lb_policy_name, *result.args, trace_strings);
auto& lb_policy = lb_policy_ == nullptr ? lb_policy_ : pending_lb_policy_;
{
MutexLock lock(&lb_policy_mu_);
lb_policy = std::move(new_policy);
}
lb_policy =
CreateLbPolicyLocked(lb_policy_name, *result.args, trace_strings);
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
@ -451,11 +427,9 @@ ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
lb_policy_name, std::move(lb_policy_args));
if (GPR_UNLIKELY(lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
if (channelz_node() != nullptr) {
char* str;
gpr_asprintf(&str, "Could not create LB policy \"%s\"", lb_policy_name);
trace_strings->push_back(str);
}
char* str;
gpr_asprintf(&str, "Could not create LB policy \"%s\"", lb_policy_name);
trace_strings->push_back(str);
return nullptr;
}
helper->set_child(lb_policy.get());
@ -463,16 +437,9 @@ ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy \"%s\" (%p)",
this, lb_policy_name, lb_policy.get());
}
if (channelz_node() != nullptr) {
char* str;
gpr_asprintf(&str, "Created new LB policy \"%s\"", lb_policy_name);
trace_strings->push_back(str);
}
// Propagate channelz node.
auto* channelz = channelz_node();
if (channelz != nullptr) {
lb_policy->set_channelz_node(channelz->Ref());
}
char* str;
gpr_asprintf(&str, "Created new LB policy \"%s\"", lb_policy_name);
trace_strings->push_back(str);
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties());
return lb_policy;
@ -502,11 +469,10 @@ void ResolvingLoadBalancingPolicy::ConcatenateAndAddChannelTraceLocked(
is_first = false;
gpr_strvec_add(&v, (*trace_strings)[i]);
}
char* flat;
size_t flat_len = 0;
flat = gpr_strvec_flatten(&v, &flat_len);
channelz_node()->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
grpc_slice_new(flat, flat_len, gpr_free));
size_t len = 0;
UniquePtr<char> message(gpr_strvec_flatten(&v, &len));
channel_control_helper()->AddTraceEvent(ChannelControlHelper::TRACE_INFO,
message.get());
gpr_strvec_destroy(&v);
}
}
@ -560,21 +526,18 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
std::move(result), &trace_strings);
}
// Add channel trace event.
if (channelz_node() != nullptr) {
if (service_config_changed) {
// TODO(ncteisen): might be worth somehow including a snippet of the
// config in the trace, at the risk of bloating the trace logs.
trace_strings.push_back(gpr_strdup("Service config changed"));
}
if (service_config_error_string != nullptr) {
trace_strings.push_back(service_config_error_string);
service_config_error_string = nullptr;
}
MaybeAddTraceMessagesForAddressChangesLocked(resolution_contains_addresses,
&trace_strings);
ConcatenateAndAddChannelTraceLocked(&trace_strings);
if (service_config_changed) {
// TODO(ncteisen): might be worth somehow including a snippet of the
// config in the trace, at the risk of bloating the trace logs.
trace_strings.push_back(gpr_strdup("Service config changed"));
}
if (service_config_error_string != nullptr) {
trace_strings.push_back(service_config_error_string);
service_config_error_string = nullptr;
}
gpr_free(service_config_error_string);
MaybeAddTraceMessagesForAddressChangesLocked(resolution_contains_addresses,
&trace_strings);
ConcatenateAndAddChannelTraceLocked(&trace_strings);
}
} // namespace grpc_core

@ -21,7 +21,6 @@
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/resolver.h"
@ -91,10 +90,6 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
void ResetBackoffLocked() override;
void FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) override;
private:
using TraceStringVector = InlinedVector<char*, 3>;
@ -137,9 +132,6 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
// Child LB policy.
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_lb_policy_;
// Lock held when modifying the value of child_policy_ or
// pending_child_policy_.
gpr_mu lb_policy_mu_;
};
} // namespace grpc_core

@ -94,11 +94,15 @@ class SubchannelInterface : public RefCounted<SubchannelInterface> {
GRPC_ABSTRACT;
// Attempt to connect to the backend. Has no effect if already connected.
// If the subchannel is currently in backoff delay due to a previously
// failed attempt, the new connection attempt will not start until the
// backoff delay has elapsed.
virtual void AttemptToConnect() GRPC_ABSTRACT;
// TODO(roth): These methods should be removed from this interface to
// bettter hide grpc-specific functionality from the LB policy API.
virtual channelz::SubchannelNode* channelz_node() GRPC_ABSTRACT;
// Resets the subchannel's connection backoff state. If AttemptToConnect()
// has been called since the subchannel entered TRANSIENT_FAILURE state,
// starts a new connection attempt immediately; otherwise, a new connection
// attempt will be started as soon as AttemptToConnect() is called.
virtual void ResetBackoff() GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS

@ -40,12 +40,51 @@
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/server.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/uri/uri_parser.h"
namespace grpc_core {
namespace channelz {
//
// channel arg code
//
namespace {
void* parent_uuid_copy(void* p) { return p; }
void parent_uuid_destroy(void* p) {}
int parent_uuid_cmp(void* p1, void* p2) { return GPR_ICMP(p1, p2); }
const grpc_arg_pointer_vtable parent_uuid_vtable = {
parent_uuid_copy, parent_uuid_destroy, parent_uuid_cmp};
} // namespace
grpc_arg MakeParentUuidArg(intptr_t parent_uuid) {
// We would ideally like to store the uuid in an integer argument.
// Unfortunately, that won't work, because intptr_t (the type used for
// uuids) doesn't fit in an int (the type used for integer args).
// So instead, we use a hack to store it as a pointer, because
// intptr_t should be the same size as void*.
static_assert(sizeof(intptr_t) <= sizeof(void*),
"can't fit intptr_t inside of void*");
return grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_PARENT_UUID),
reinterpret_cast<void*>(parent_uuid), &parent_uuid_vtable);
}
intptr_t GetParentUuidFromArgs(const grpc_channel_args& args) {
const grpc_arg* arg =
grpc_channel_args_find(&args, GRPC_ARG_CHANNELZ_PARENT_UUID);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) return 0;
return reinterpret_cast<intptr_t>(arg->value.pointer.p);
}
//
// BaseNode
//
BaseNode::BaseNode(EntityType type) : type_(type), uuid_(-1) {
// The registry will set uuid_ under its lock.
ChannelzRegistry::Register(this);
@ -61,6 +100,10 @@ char* BaseNode::RenderJsonString() {
return json_str;
}
//
// CallCountingHelper
//
CallCountingHelper::CallCountingHelper() {
num_cores_ = GPR_MAX(1, gpr_cpu_num_cores());
per_cpu_counter_data_storage_ = static_cast<AtomicCounterData*>(
@ -137,15 +180,17 @@ void CallCountingHelper::PopulateCallCounts(grpc_json* json) {
}
}
ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel)
: BaseNode(is_top_level_channel ? EntityType::kTopLevelChannel
: EntityType::kInternalChannel),
channel_(channel),
target_(UniquePtr<char>(grpc_channel_get_target(channel_))),
trace_(channel_tracer_max_nodes) {}
//
// ChannelNode
//
ChannelNode::~ChannelNode() {}
ChannelNode::ChannelNode(UniquePtr<char> target,
size_t channel_tracer_max_nodes, intptr_t parent_uuid)
: BaseNode(parent_uuid == 0 ? EntityType::kTopLevelChannel
: EntityType::kInternalChannel),
target_(std::move(target)),
trace_(channel_tracer_max_nodes),
parent_uuid_(parent_uuid) {}
grpc_json* ChannelNode::RenderJson() {
// We need to track these three json objects to build our object
@ -167,9 +212,19 @@ grpc_json* ChannelNode::RenderJson() {
GRPC_JSON_OBJECT, false);
json = data;
json_iterator = nullptr;
// template method. Child classes may override this to add their specific
// functionality.
PopulateConnectivityState(json);
// connectivity state
// If low-order bit is on, then the field is set.
int state_field = connectivity_state_.Load(MemoryOrder::RELAXED);
if ((state_field & 1) != 0) {
grpc_connectivity_state state =
static_cast<grpc_connectivity_state>(state_field >> 1);
json = grpc_json_create_child(nullptr, json, "state", nullptr,
GRPC_JSON_OBJECT, false);
grpc_json_create_child(nullptr, json, "state",
grpc_connectivity_state_name(state),
GRPC_JSON_STRING, false);
json = data;
}
// populate the target.
GPR_ASSERT(target_.get() != nullptr);
grpc_json_create_child(nullptr, json, "target", target_.get(),
@ -189,13 +244,64 @@ grpc_json* ChannelNode::RenderJson() {
return top_level_json;
}
RefCountedPtr<ChannelNode> ChannelNode::MakeChannelNode(
grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel) {
return MakeRefCounted<grpc_core::channelz::ChannelNode>(
channel, channel_tracer_max_nodes, is_top_level_channel);
void ChannelNode::PopulateChildRefs(grpc_json* json) {
MutexLock lock(&child_mu_);
grpc_json* json_iterator = nullptr;
if (!child_subchannels_.empty()) {
grpc_json* array_parent = grpc_json_create_child(
nullptr, json, "subchannelRef", nullptr, GRPC_JSON_ARRAY, false);
for (const auto& p : child_subchannels_) {
json_iterator =
grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr,
GRPC_JSON_OBJECT, false);
grpc_json_add_number_string_child(json_iterator, nullptr, "subchannelId",
p.first);
}
}
if (!child_channels_.empty()) {
grpc_json* array_parent = grpc_json_create_child(
nullptr, json, "channelRef", nullptr, GRPC_JSON_ARRAY, false);
json_iterator = nullptr;
for (const auto& p : child_channels_) {
json_iterator =
grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr,
GRPC_JSON_OBJECT, false);
grpc_json_add_number_string_child(json_iterator, nullptr, "channelId",
p.first);
}
}
}
void ChannelNode::SetConnectivityState(grpc_connectivity_state state) {
// Store with low-order bit set to indicate that the field is set.
int state_field = (state << 1) + 1;
connectivity_state_.Store(state_field, MemoryOrder::RELAXED);
}
void ChannelNode::AddChildChannel(intptr_t child_uuid) {
MutexLock lock(&child_mu_);
child_channels_.insert(MakePair(child_uuid, true));
}
void ChannelNode::RemoveChildChannel(intptr_t child_uuid) {
MutexLock lock(&child_mu_);
child_channels_.erase(child_uuid);
}
void ChannelNode::AddChildSubchannel(intptr_t child_uuid) {
MutexLock lock(&child_mu_);
child_subchannels_.insert(MakePair(child_uuid, true));
}
void ChannelNode::RemoveChildSubchannel(intptr_t child_uuid) {
MutexLock lock(&child_mu_);
child_subchannels_.erase(child_uuid);
}
//
// ServerNode
//
ServerNode::ServerNode(grpc_server* server, size_t channel_tracer_max_nodes)
: BaseNode(EntityType::kServer),
server_(server),
@ -281,8 +387,14 @@ grpc_json* ServerNode::RenderJson() {
return top_level_json;
}
static void PopulateSocketAddressJson(grpc_json* json, const char* name,
const char* addr_str) {
//
// SocketNode
//
namespace {
void PopulateSocketAddressJson(grpc_json* json, const char* name,
const char* addr_str) {
if (addr_str == nullptr) return;
grpc_json* json_iterator = nullptr;
json_iterator = grpc_json_create_child(json_iterator, json, name, nullptr,
@ -312,7 +424,6 @@ static void PopulateSocketAddressJson(grpc_json* json, const char* name,
b64_host, GRPC_JSON_STRING, true);
gpr_free(host);
gpr_free(port);
} else if (uri != nullptr && strcmp(uri->scheme, "unix") == 0) {
json_iterator = grpc_json_create_child(json_iterator, json, "uds_address",
nullptr, GRPC_JSON_OBJECT, false);
@ -332,6 +443,8 @@ static void PopulateSocketAddressJson(grpc_json* json, const char* name,
grpc_uri_destroy(uri);
}
} // namespace
SocketNode::SocketNode(UniquePtr<char> local, UniquePtr<char> remote)
: BaseNode(EntityType::kSocket),
local_(std::move(local)),
@ -448,6 +561,10 @@ grpc_json* SocketNode::RenderJson() {
return top_level_json;
}
//
// ListenSocketNode
//
ListenSocketNode::ListenSocketNode(UniquePtr<char> local_addr)
: BaseNode(EntityType::kSocket), local_addr_(std::move(local_addr)) {}

@ -26,19 +26,19 @@
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/json/json.h"
// Channel arg key for client channel factory.
#define GRPC_ARG_CHANNELZ_CHANNEL_NODE_CREATION_FUNC \
"grpc.channelz_channel_node_creation_func"
// Channel arg key for channelz node.
#define GRPC_ARG_CHANNELZ_CHANNEL_NODE "grpc.channelz_channel_node"
// Channel arg key to signal that the channel is an internal channel.
#define GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL \
"grpc.channelz_channel_is_internal_channel"
// Channel arg key to encode the channelz uuid of the channel's parent.
#define GRPC_ARG_CHANNELZ_PARENT_UUID "grpc.channelz_parent_uuid"
/** This is the default value for whether or not to enable channelz. If
* GRPC_ARG_ENABLE_CHANNELZ is set, it will override this default value. */
@ -54,6 +54,10 @@ namespace grpc_core {
namespace channelz {
// Helpers for getting and setting GRPC_ARG_CHANNELZ_PARENT_UUID.
grpc_arg MakeParentUuidArg(intptr_t parent_uuid);
intptr_t GetParentUuidFromArgs(const grpc_channel_args& args);
// TODO(ncteisen), this only contains the uuids of the children for now,
// since that is all that is strictly needed. In a future enhancement we will
// add human readable names as in the channelz.proto
@ -147,38 +151,13 @@ class CallCountingHelper {
// Handles channelz bookkeeping for channels
class ChannelNode : public BaseNode {
public:
static RefCountedPtr<ChannelNode> MakeChannelNode(
grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel);
ChannelNode(UniquePtr<char> target, size_t channel_tracer_max_nodes,
intptr_t parent_uuid);
ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes,
bool is_top_level_channel);
~ChannelNode() override;
intptr_t parent_uuid() const { return parent_uuid_; }
grpc_json* RenderJson() override;
// template methods. RenderJSON uses these methods to render its JSON
// representation. These are virtual so that children classes may provide
// their specific mechanism for populating these parts of the channelz
// object.
//
// ChannelNode does not have a notion of connectivity state or child refs,
// so it leaves these implementations blank.
//
// This is utilizing the template method design pattern.
//
// TODO(ncteisen): remove these template methods in favor of manual traversal
// and mutation of the grpc_json object.
virtual void PopulateConnectivityState(grpc_json* json) {}
virtual void PopulateChildRefs(grpc_json* json) {}
void MarkChannelDestroyed() {
GPR_ASSERT(channel_ != nullptr);
channel_ = nullptr;
}
bool ChannelIsDestroyed() { return channel_ == nullptr; }
// proxy methods to composed classes.
void AddTraceEvent(ChannelTrace::Severity severity, const grpc_slice& data) {
trace_.AddTraceEvent(severity, data);
@ -193,13 +172,35 @@ class ChannelNode : public BaseNode {
void RecordCallFailed() { call_counter_.RecordCallFailed(); }
void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); }
void SetConnectivityState(grpc_connectivity_state state);
void AddChildChannel(intptr_t child_uuid);
void RemoveChildChannel(intptr_t child_uuid);
void AddChildSubchannel(intptr_t child_uuid);
void RemoveChildSubchannel(intptr_t child_uuid);
private:
void PopulateChildRefs(grpc_json* json);
// to allow the channel trace test to access trace_.
friend class testing::ChannelNodePeer;
grpc_channel* channel_ = nullptr;
UniquePtr<char> target_;
CallCountingHelper call_counter_;
ChannelTrace trace_;
const intptr_t parent_uuid_;
// Least significant bit indicates whether the value is set. Remaining
// bits are a grpc_connectivity_state value.
Atomic<int> connectivity_state_{0};
Mutex child_mu_; // Guards child maps below.
// TODO(roth): We don't actually use the values here, only the keys, so
// these should be sets instead of maps, but we don't currently have a set
// implementation. Change this if/when we have one.
Map<intptr_t, bool> child_channels_;
Map<intptr_t, bool> child_subchannels_;
};
// Handles channelz bookkeeping for servers
@ -285,11 +286,6 @@ class ListenSocketNode : public BaseNode {
UniquePtr<char> local_addr_;
};
// Creation functions
typedef RefCountedPtr<ChannelNode> (*ChannelNodeCreationFunc)(grpc_channel*,
size_t, bool);
} // namespace channelz
} // namespace grpc_core

@ -33,6 +33,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/channel/channelz_registry.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
@ -86,18 +87,9 @@ grpc_channel* grpc_channel_create_with_builder(
grpc_channel_args_destroy(args);
return channel;
}
channel->target = target;
channel->resource_user = resource_user;
channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type);
bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT;
size_t channel_tracer_max_memory =
GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT;
bool internal_channel = false;
// this creates the default ChannelNode. Different types of channels may
// override this to ensure a correct ChannelNode is created.
grpc_core::channelz::ChannelNodeCreationFunc channel_node_create_func =
grpc_core::channelz::ChannelNode::MakeChannelNode;
gpr_mu_init(&channel->registered_call_mu);
channel->registered_calls = nullptr;
@ -129,40 +121,16 @@ grpc_channel* grpc_channel_create_with_builder(
channel->compression_options.enabled_algorithms_bitset =
static_cast<uint32_t>(args->args[i].value.integer) |
0x1; /* always support no compression */
} else if (0 == strcmp(args->args[i].key,
GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE)) {
const grpc_integer_options options = {
GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX};
channel_tracer_max_memory =
(size_t)grpc_channel_arg_get_integer(&args->args[i], options);
} else if (0 == strcmp(args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) {
// channelz will not be enabled by default until all concerns in
// https://github.com/grpc/grpc/issues/15986 are addressed.
channelz_enabled = grpc_channel_arg_get_bool(
&args->args[i], GRPC_ENABLE_CHANNELZ_DEFAULT);
} else if (0 == strcmp(args->args[i].key,
GRPC_ARG_CHANNELZ_CHANNEL_NODE_CREATION_FUNC)) {
} else if (0 == strcmp(args->args[i].key, GRPC_ARG_CHANNELZ_CHANNEL_NODE)) {
GPR_ASSERT(args->args[i].type == GRPC_ARG_POINTER);
GPR_ASSERT(args->args[i].value.pointer.p != nullptr);
channel_node_create_func =
reinterpret_cast<grpc_core::channelz::ChannelNodeCreationFunc>(
args->args[i].value.pointer.p);
} else if (0 == strcmp(args->args[i].key,
GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL)) {
internal_channel = grpc_channel_arg_get_bool(&args->args[i], false);
channel->channelz_node = static_cast<grpc_core::channelz::ChannelNode*>(
args->args[i].value.pointer.p)
->Ref();
}
}
grpc_channel_args_destroy(args);
// we only need to do the channelz bookkeeping for clients here. The channelz
// bookkeeping for server channels occurs in src/core/lib/surface/server.cc
if (channelz_enabled && channel->is_client) {
channel->channelz_channel = channel_node_create_func(
channel, channel_tracer_max_memory, !internal_channel);
channel->channelz_channel->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel created"));
}
return channel;
}
@ -197,6 +165,71 @@ static grpc_channel_args* build_channel_args(
return grpc_channel_args_copy_and_add(input_args, new_args, num_new_args);
}
namespace {
void* channelz_node_copy(void* p) {
grpc_core::channelz::ChannelNode* node =
static_cast<grpc_core::channelz::ChannelNode*>(p);
node->Ref().release();
return p;
}
void channelz_node_destroy(void* p) {
grpc_core::channelz::ChannelNode* node =
static_cast<grpc_core::channelz::ChannelNode*>(p);
node->Unref();
}
int channelz_node_cmp(void* p1, void* p2) { return GPR_ICMP(p1, p2); }
const grpc_arg_pointer_vtable channelz_node_arg_vtable = {
channelz_node_copy, channelz_node_destroy, channelz_node_cmp};
void CreateChannelzNode(grpc_channel_stack_builder* builder) {
const grpc_channel_args* args =
grpc_channel_stack_builder_get_channel_arguments(builder);
// Check whether channelz is enabled.
const bool channelz_enabled = grpc_channel_arg_get_bool(
grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ),
GRPC_ENABLE_CHANNELZ_DEFAULT);
if (!channelz_enabled) return;
// Get parameters needed to create the channelz node.
const size_t channel_tracer_max_memory = grpc_channel_arg_get_integer(
grpc_channel_args_find(args,
GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE),
{GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX});
const intptr_t channelz_parent_uuid =
grpc_core::channelz::GetParentUuidFromArgs(*args);
// Create the channelz node.
grpc_core::RefCountedPtr<grpc_core::channelz::ChannelNode> channelz_node =
grpc_core::MakeRefCounted<grpc_core::channelz::ChannelNode>(
grpc_core::UniquePtr<char>(
gpr_strdup(grpc_channel_stack_builder_get_target(builder))),
channel_tracer_max_memory, channelz_parent_uuid);
channelz_node->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel created"));
// Update parent channel node, if any.
if (channelz_parent_uuid > 0) {
grpc_core::RefCountedPtr<grpc_core::channelz::BaseNode> parent_node =
grpc_core::channelz::ChannelzRegistry::Get(channelz_parent_uuid);
if (parent_node != nullptr) {
grpc_core::channelz::ChannelNode* parent =
static_cast<grpc_core::channelz::ChannelNode*>(parent_node.get());
parent->AddChildChannel(channelz_node->uuid());
}
}
// Add channelz node to channel args.
// We remove the arg for the parent uuid, since we no longer need it.
grpc_arg new_arg = grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_NODE), channelz_node.get(),
&channelz_node_arg_vtable);
const char* args_to_remove[] = {GRPC_ARG_CHANNELZ_PARENT_UUID};
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
grpc_channel_stack_builder_set_channel_arguments(builder, new_args);
grpc_channel_args_destroy(new_args);
}
} // namespace
grpc_channel* grpc_channel_create(const char* target,
const grpc_channel_args* input_args,
grpc_channel_stack_type channel_stack_type,
@ -219,9 +252,12 @@ grpc_channel* grpc_channel_create(const char* target,
}
return nullptr;
}
grpc_channel* channel =
grpc_channel_create_with_builder(builder, channel_stack_type);
return channel;
// We only need to do this for clients here. For servers, this will be
// done in src/core/lib/surface/server.cc.
if (grpc_channel_stack_type_is_client(channel_stack_type)) {
CreateChannelzNode(builder);
}
return grpc_channel_create_with_builder(builder, channel_stack_type);
}
size_t grpc_channel_get_call_size_estimate(grpc_channel* channel) {
@ -401,12 +437,21 @@ grpc_call* grpc_channel_create_registered_call(
static void destroy_channel(void* arg, grpc_error* error) {
grpc_channel* channel = static_cast<grpc_channel*>(arg);
if (channel->channelz_channel != nullptr) {
channel->channelz_channel->AddTraceEvent(
if (channel->channelz_node != nullptr) {
if (channel->channelz_node->parent_uuid() > 0) {
grpc_core::RefCountedPtr<grpc_core::channelz::BaseNode> parent_node =
grpc_core::channelz::ChannelzRegistry::Get(
channel->channelz_node->parent_uuid());
if (parent_node != nullptr) {
grpc_core::channelz::ChannelNode* parent =
static_cast<grpc_core::channelz::ChannelNode*>(parent_node.get());
parent->RemoveChildChannel(channel->channelz_node->uuid());
}
}
channel->channelz_node->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Channel destroyed"));
channel->channelz_channel->MarkChannelDestroyed();
channel->channelz_channel.reset();
channel->channelz_node.reset();
}
grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
while (channel->registered_calls) {

@ -88,7 +88,7 @@ struct grpc_channel {
gpr_mu registered_call_mu;
registered_call* registered_calls;
grpc_core::RefCountedPtr<grpc_core::channelz::ChannelNode> channelz_channel;
grpc_core::RefCountedPtr<grpc_core::channelz::ChannelNode> channelz_node;
char* target;
};
@ -106,7 +106,7 @@ inline grpc_channel_stack* grpc_channel_get_channel_stack(
inline grpc_core::channelz::ChannelNode* grpc_channel_get_channelz_node(
grpc_channel* channel) {
return channel->channelz_channel.get();
return channel->channelz_node.get();
}
#ifndef NDEBUG

@ -23,6 +23,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/channel/channelz.h"
@ -174,7 +175,7 @@ TEST(ChannelTracerTest, ComplexTest) {
AddSimpleTrace(&tracer);
ChannelFixture channel1(kEventListMemoryLimit);
RefCountedPtr<ChannelNode> sc1 = MakeRefCounted<ChannelNode>(
channel1.channel(), kEventListMemoryLimit, true);
UniquePtr<char>(gpr_strdup("fake_target")), kEventListMemoryLimit, 0);
ChannelNodePeer sc1_peer(sc1.get());
tracer.AddTraceEventWithReference(
ChannelTrace::Severity::Info,
@ -193,7 +194,7 @@ TEST(ChannelTracerTest, ComplexTest) {
ValidateChannelTrace(&tracer, 5);
ChannelFixture channel2(kEventListMemoryLimit);
RefCountedPtr<ChannelNode> sc2 = MakeRefCounted<ChannelNode>(
channel2.channel(), kEventListMemoryLimit, true);
UniquePtr<char>(gpr_strdup("fake_target")), kEventListMemoryLimit, 0);
tracer.AddTraceEventWithReference(
ChannelTrace::Severity::Info,
grpc_slice_from_static_string("LB channel two created"), sc2);
@ -222,7 +223,7 @@ TEST(ChannelTracerTest, TestNesting) {
ValidateChannelTrace(&tracer, 2);
ChannelFixture channel1(kEventListMemoryLimit);
RefCountedPtr<ChannelNode> sc1 = MakeRefCounted<ChannelNode>(
channel1.channel(), kEventListMemoryLimit, true);
UniquePtr<char>(gpr_strdup("fake_target")), kEventListMemoryLimit, 0);
ChannelNodePeer sc1_peer(sc1.get());
tracer.AddTraceEventWithReference(
ChannelTrace::Severity::Info,
@ -231,7 +232,7 @@ TEST(ChannelTracerTest, TestNesting) {
AddSimpleTrace(sc1_peer.trace());
ChannelFixture channel2(kEventListMemoryLimit);
RefCountedPtr<ChannelNode> conn1 = MakeRefCounted<ChannelNode>(
channel2.channel(), kEventListMemoryLimit, true);
UniquePtr<char>(gpr_strdup("fake_target")), kEventListMemoryLimit, 0);
ChannelNodePeer conn1_peer(conn1.get());
// nesting one level deeper.
sc1_peer.trace()->AddTraceEventWithReference(
@ -245,7 +246,7 @@ TEST(ChannelTracerTest, TestNesting) {
ValidateChannelTrace(conn1_peer.trace(), 1);
ChannelFixture channel3(kEventListMemoryLimit);
RefCountedPtr<ChannelNode> sc2 = MakeRefCounted<ChannelNode>(
channel3.channel(), kEventListMemoryLimit, true);
UniquePtr<char>(gpr_strdup("fake_target")), kEventListMemoryLimit, 0);
tracer.AddTraceEventWithReference(
ChannelTrace::Severity::Info,
grpc_slice_from_static_string("subchannel two created"), sc2);

@ -486,8 +486,7 @@ TEST_F(ChannelzRegistryBasedTest, InternalChannelTest) {
(void)channels; // suppress unused variable error
// create an internal channel
grpc_arg client_a[2];
client_a[0] = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL), true);
client_a[0] = grpc_core::channelz::MakeParentUuidArg(1);
client_a[1] = grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ENABLE_CHANNELZ), true);
grpc_channel_args client_args = {GPR_ARRAY_SIZE(client_a), client_a};

@ -72,12 +72,6 @@ class ForwardingLoadBalancingPolicy : public LoadBalancingPolicy {
void ResetBackoffLocked() override { delegate_->ResetBackoffLocked(); }
void FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) override {
delegate_->FillChildRefsForChannelz(child_subchannels, child_channels);
}
private:
void ShutdownLocked() override { delegate_.reset(); }
@ -164,6 +158,10 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
parent_->channel_control_helper()->RequestReresolution();
}
void AddTraceEvent(TraceSeverity severity, const char* message) override {
parent_->channel_control_helper()->AddTraceEvent(severity, message);
}
private:
RefCountedPtr<InterceptRecvTrailingMetadataLoadBalancingPolicy> parent_;
InterceptRecvTrailingMetadataCallback cb_;

@ -163,13 +163,12 @@ class ChannelzServerTest : public ::testing::Test {
}
std::unique_ptr<grpc::testing::EchoTestService::Stub> NewEchoStub() {
static int salt = 0;
string target = "dns:localhost:" + to_string(proxy_port_);
ChannelArguments args;
// disable channelz. We only want to focus on proxy to backend outbound.
args.SetInt(GRPC_ARG_ENABLE_CHANNELZ, 0);
// This ensures that gRPC will not do connection sharing.
args.SetInt("salt", salt++);
args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, true);
std::shared_ptr<Channel> channel =
::grpc::CreateCustomChannel(target, InsecureChannelCredentials(), args);
return grpc::testing::EchoTestService::NewStub(channel);

Loading…
Cancel
Save