Add child refs support to grpclb

reviewable/pr16051/r1
ncteisen 6 years ago
parent af50d795de
commit d47779bc56
  1. 4
      src/core/ext/filters/client_channel/client_channel_channelz.cc
  2. 26
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc

@ -85,12 +85,12 @@ void ClientChannelNode::PopulateChildRefs(grpc_json* json) {
grpc_json* array_parent = grpc_json_create_child( grpc_json* array_parent = grpc_json_create_child(
nullptr, json, "channelRef", nullptr, GRPC_JSON_ARRAY, false); nullptr, json, "channelRef", nullptr, GRPC_JSON_ARRAY, false);
json_iterator = nullptr; json_iterator = nullptr;
for (size_t i = 0; i < child_subchannels.size(); ++i) { for (size_t i = 0; i < child_channels.size(); ++i) {
json_iterator = json_iterator =
grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr, grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr,
GRPC_JSON_OBJECT, false); GRPC_JSON_OBJECT, false);
grpc_json_add_number_string_child(json_iterator, nullptr, "channelId", grpc_json_add_number_string_child(json_iterator, nullptr, "channelId",
child_subchannels[i]); child_channels[i]);
} }
} }
} }

@ -135,9 +135,8 @@ class GrpcLb : public LoadBalancingPolicy {
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override; void ExitIdleLocked() override;
// TODO(ncteisen): implement this in a follow up PR
void FillChildRefsForChannelz(ChildRefsList* child_subchannels, void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* child_channels) override {} ChildRefsList* child_channels) override;
private: private:
/// Linked list of pending pick requests. It stores all information needed to /// Linked list of pending pick requests. It stores all information needed to
@ -301,6 +300,9 @@ class GrpcLb : public LoadBalancingPolicy {
// The channel for communicating with the LB server. // The channel for communicating with the LB server.
grpc_channel* lb_channel_ = nullptr; grpc_channel* lb_channel_ = nullptr;
// Mutex to protect the channel to the LB server. This is used when
// processing a channelz request.
gpr_mu lb_channel_mu_;
grpc_connectivity_state lb_channel_connectivity_; grpc_connectivity_state lb_channel_connectivity_;
grpc_closure lb_channel_on_connectivity_changed_; grpc_closure lb_channel_on_connectivity_changed_;
// Are we already watching the LB channel's connectivity? // Are we already watching the LB channel's connectivity?
@ -1040,6 +1042,7 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
.set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1000)) { 1000)) {
// Initialization. // Initialization.
gpr_mu_init(&lb_channel_mu_);
grpc_subchannel_index_ref(); grpc_subchannel_index_ref();
GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_, GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChangedLocked, this, &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
@ -1078,6 +1081,7 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
GrpcLb::~GrpcLb() { GrpcLb::~GrpcLb() {
GPR_ASSERT(pending_picks_ == nullptr); GPR_ASSERT(pending_picks_ == nullptr);
GPR_ASSERT(pending_pings_ == nullptr); GPR_ASSERT(pending_pings_ == nullptr);
gpr_mu_destroy(&lb_channel_mu_);
gpr_free((void*)server_name_); gpr_free((void*)server_name_);
grpc_channel_args_destroy(args_); grpc_channel_args_destroy(args_);
grpc_connectivity_state_destroy(&state_tracker_); grpc_connectivity_state_destroy(&state_tracker_);
@ -1107,8 +1111,10 @@ void GrpcLb::ShutdownLocked() {
// OnBalancerChannelConnectivityChangedLocked(), and we need to be // OnBalancerChannelConnectivityChangedLocked(), and we need to be
// alive when that callback is invoked. // alive when that callback is invoked.
if (lb_channel_ != nullptr) { if (lb_channel_ != nullptr) {
gpr_mu_lock(&lb_channel_mu_);
grpc_channel_destroy(lb_channel_); grpc_channel_destroy(lb_channel_);
lb_channel_ = nullptr; lb_channel_ = nullptr;
gpr_mu_unlock(&lb_channel_mu_);
} }
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "grpclb_shutdown"); GRPC_ERROR_REF(error), "grpclb_shutdown");
@ -1279,6 +1285,20 @@ void GrpcLb::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
} }
} }
void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* child_channels) {
// delegate to the RoundRobin to fill the children subchannels.
rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
mu_guard guard(&lb_channel_mu_);
if (lb_channel_ != nullptr) {
grpc_core::channelz::ChannelNode* channel_node =
grpc_channel_get_channelz_node(lb_channel_);
if (channel_node != nullptr) {
child_channels->push_back(channel_node->channel_uuid());
}
}
}
grpc_connectivity_state GrpcLb::CheckConnectivityLocked( grpc_connectivity_state GrpcLb::CheckConnectivityLocked(
grpc_error** connectivity_error) { grpc_error** connectivity_error) {
return grpc_connectivity_state_get(&state_tracker_, connectivity_error); return grpc_connectivity_state_get(&state_tracker_, connectivity_error);
@ -1322,9 +1342,11 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
if (lb_channel_ == nullptr) { if (lb_channel_ == nullptr) {
char* uri_str; char* uri_str;
gpr_asprintf(&uri_str, "fake:///%s", server_name_); gpr_asprintf(&uri_str, "fake:///%s", server_name_);
gpr_mu_lock(&lb_channel_mu_);
lb_channel_ = grpc_client_channel_factory_create_channel( lb_channel_ = grpc_client_channel_factory_create_channel(
client_channel_factory(), uri_str, client_channel_factory(), uri_str,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args); GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args);
gpr_mu_unlock(&lb_channel_mu_);
GPR_ASSERT(lb_channel_ != nullptr); GPR_ASSERT(lb_channel_ != nullptr);
gpr_free(uri_str); gpr_free(uri_str);
} }

Loading…
Cancel
Save