From d47779bc56f5e15b459a8e26fb642b7b6bb4699a Mon Sep 17 00:00:00 2001 From: ncteisen Date: Wed, 18 Jul 2018 11:46:06 -0700 Subject: [PATCH] Add child refs support to grpclb --- .../client_channel/client_channel_channelz.cc | 4 +-- .../client_channel/lb_policy/grpclb/grpclb.cc | 26 +++++++++++++++++-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc index d43e9ea67a5..4c9c9a6bd66 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.cc +++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc @@ -85,12 +85,12 @@ void ClientChannelNode::PopulateChildRefs(grpc_json* json) { grpc_json* array_parent = grpc_json_create_child( nullptr, json, "channelRef", nullptr, GRPC_JSON_ARRAY, false); json_iterator = nullptr; - for (size_t i = 0; i < child_subchannels.size(); ++i) { + for (size_t i = 0; i < child_channels.size(); ++i) { json_iterator = grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr, GRPC_JSON_OBJECT, false); grpc_json_add_number_string_child(json_iterator, nullptr, "channelId", - child_subchannels[i]); + child_channels[i]); } } } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 85534412cf8..959c7441a37 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -135,9 +135,8 @@ class GrpcLb : public LoadBalancingPolicy { void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; void ExitIdleLocked() override; - // TODO(ncteisen): implement this in a follow up PR void FillChildRefsForChannelz(ChildRefsList* child_subchannels, - ChildRefsList* child_channels) override {} + ChildRefsList* child_channels) override; private: /// Linked list of pending pick requests. It stores all information needed to @@ -301,6 +300,9 @@ class GrpcLb : public LoadBalancingPolicy { // The channel for communicating with the LB server. grpc_channel* lb_channel_ = nullptr; + // Mutex to protect the channel to the LB server. This is used when + // processing a channelz request. + gpr_mu lb_channel_mu_; grpc_connectivity_state lb_channel_connectivity_; grpc_closure lb_channel_on_connectivity_changed_; // Are we already watching the LB channel's connectivity? @@ -1040,6 +1042,7 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses, .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) { // Initialization. + gpr_mu_init(&lb_channel_mu_); grpc_subchannel_index_ref(); GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_, &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this, @@ -1078,6 +1081,7 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses, GrpcLb::~GrpcLb() { GPR_ASSERT(pending_picks_ == nullptr); GPR_ASSERT(pending_pings_ == nullptr); + gpr_mu_destroy(&lb_channel_mu_); gpr_free((void*)server_name_); grpc_channel_args_destroy(args_); grpc_connectivity_state_destroy(&state_tracker_); @@ -1107,8 +1111,10 @@ void GrpcLb::ShutdownLocked() { // OnBalancerChannelConnectivityChangedLocked(), and we need to be // alive when that callback is invoked. if (lb_channel_ != nullptr) { + gpr_mu_lock(&lb_channel_mu_); grpc_channel_destroy(lb_channel_); lb_channel_ = nullptr; + gpr_mu_unlock(&lb_channel_mu_); } grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "grpclb_shutdown"); @@ -1279,6 +1285,20 @@ void GrpcLb::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { } } +void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels, + ChildRefsList* child_channels) { + // delegate to the RoundRobin to fill the children subchannels. + rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels); + mu_guard guard(&lb_channel_mu_); + if (lb_channel_ != nullptr) { + grpc_core::channelz::ChannelNode* channel_node = + grpc_channel_get_channelz_node(lb_channel_); + if (channel_node != nullptr) { + child_channels->push_back(channel_node->channel_uuid()); + } + } +} + grpc_connectivity_state GrpcLb::CheckConnectivityLocked( grpc_error** connectivity_error) { return grpc_connectivity_state_get(&state_tracker_, connectivity_error); @@ -1322,9 +1342,11 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { if (lb_channel_ == nullptr) { char* uri_str; gpr_asprintf(&uri_str, "fake:///%s", server_name_); + gpr_mu_lock(&lb_channel_mu_); lb_channel_ = grpc_client_channel_factory_create_channel( client_channel_factory(), uri_str, GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args); + gpr_mu_unlock(&lb_channel_mu_); GPR_ASSERT(lb_channel_ != nullptr); gpr_free(uri_str); }