From 7311a01760cfda4f72f409b493d0ecc0e09ae69f Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 5 Oct 2020 08:36:46 -0700 Subject: [PATCH] Change grpclb to wrap the subchannels instead of attaching the attributes to them. --- .../client_channel/lb_policy/grpclb/grpclb.cc | 139 +++++++++--------- 1 file changed, 73 insertions(+), 66 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 9fc2b4db921..421feb8c162 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -235,6 +235,23 @@ class GrpcLb : public LoadBalancingPolicy { grpc_closure client_load_report_closure_; }; + class SubchannelWrapper : public DelegatingSubchannel { + public: + SubchannelWrapper(RefCountedPtr subchannel, + std::string lb_token, + RefCountedPtr client_stats) + : DelegatingSubchannel(std::move(subchannel)), + lb_token_(std::move(lb_token)), + client_stats_(std::move(client_stats)) {} + + const std::string& lb_token() const { return lb_token_; } + GrpcLbClientStats* client_stats() const { return client_stats_.get(); } + + private: + std::string lb_token_; + RefCountedPtr client_stats_; + }; + class TokenAndClientStatsAttribute : public ServerAddress::AttributeInterface { public: @@ -262,7 +279,9 @@ class GrpcLb : public LoadBalancingPolicy { } const std::string& lb_token() const { return lb_token_; } - GrpcLbClientStats* client_stats() const { return client_stats_.get(); } + RefCountedPtr client_stats() const { + return client_stats_; + } private: std::string lb_token_; @@ -310,21 +329,16 @@ class GrpcLb : public LoadBalancingPolicy { class Picker : public SubchannelPicker { public: - Picker(GrpcLb* parent, RefCountedPtr serverlist, + Picker(RefCountedPtr serverlist, std::unique_ptr child_picker, RefCountedPtr client_stats) - : parent_(parent), - serverlist_(std::move(serverlist)), + : serverlist_(std::move(serverlist)), child_picker_(std::move(child_picker)), client_stats_(std::move(client_stats)) {} PickResult Pick(PickArgs args) override; private: - // Storing the address for logging, but not holding a ref. - // DO NOT DEFERENCE! - GrpcLb* parent_; - // Serverlist to be used for determining drops. RefCountedPtr serverlist_; @@ -591,7 +605,8 @@ const char* GrpcLb::Serverlist::ShouldDrop() { GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) { PickResult result; // Check if we should drop the call. - const char* drop_token = serverlist_->ShouldDrop(); + const char* drop_token = + serverlist_ == nullptr ? nullptr : serverlist_->ShouldDrop(); if (drop_token != nullptr) { // Update client load reporting stats to indicate the number of // dropped calls. Note that we have to do this here instead of in @@ -609,17 +624,11 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) { // If pick succeeded, add LB token to initial metadata. if (result.type == PickResult::PICK_COMPLETE && result.subchannel != nullptr) { - const TokenAndClientStatsAttribute* attribute = - static_cast( - result.subchannel->GetAttribute(kGrpcLbAddressAttributeKey)); - if (attribute == nullptr) { - gpr_log(GPR_ERROR, "[grpclb %p picker %p] No LB token for subchannel %p", - parent_, this, result.subchannel.get()); - abort(); - } + const SubchannelWrapper* subchannel_wrapper = + static_cast(result.subchannel.get()); // Encode client stats object into metadata for use by // client_load_reporting filter. - GrpcLbClientStats* client_stats = attribute->client_stats(); + GrpcLbClientStats* client_stats = subchannel_wrapper->client_stats(); if (client_stats != nullptr) { client_stats->Ref().release(); // Ref passed via metadata. // The metadata value is a hack: we pretend the pointer points to @@ -635,10 +644,14 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) { // Create a new copy on the call arena, since the subchannel list // may get refreshed between when we return this pick and when the // initial metadata goes out on the wire. - char* lb_token = static_cast( - args.call_state->Alloc(attribute->lb_token().size() + 1)); - strcpy(lb_token, attribute->lb_token().c_str()); - args.initial_metadata->Add(kGrpcLbLbTokenMetadataKey, lb_token); + if (!subchannel_wrapper->lb_token().empty()) { + char* lb_token = static_cast( + args.call_state->Alloc(subchannel_wrapper->lb_token().size() + 1)); + strcpy(lb_token, subchannel_wrapper->lb_token().c_str()); + args.initial_metadata->Add(kGrpcLbLbTokenMetadataKey, lb_token); + } + // Unwrap subchannel to pass up to the channel. + result.subchannel = subchannel_wrapper->wrapped_subchannel(); } return result; } @@ -650,8 +663,21 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) { RefCountedPtr GrpcLb::Helper::CreateSubchannel( ServerAddress address, const grpc_channel_args& args) { if (parent_->shutting_down_) return nullptr; - return parent_->channel_control_helper()->CreateSubchannel(std::move(address), - args); + const TokenAndClientStatsAttribute* attribute = + static_cast( + address.GetAttribute(kGrpcLbAddressAttributeKey)); + if (attribute == nullptr) { + gpr_log(GPR_ERROR, + "[grpclb %p] no TokenAndClientStatsAttribute for address %p", + parent_.get(), address.ToString().c_str()); + abort(); + } + std::string lb_token = attribute->lb_token(); + RefCountedPtr client_stats = attribute->client_stats(); + return MakeRefCounted( + parent_->channel_control_helper()->CreateSubchannel(std::move(address), + args), + std::move(lb_token), std::move(client_stats)); } void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, @@ -662,56 +688,37 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, parent_->child_policy_ready_ = state == GRPC_CHANNEL_READY; // Enter fallback mode if needed. parent_->MaybeEnterFallbackModeAfterStartup(); - // There are three cases to consider here: - // 1. We're in fallback mode. In this case, we're always going to use - // the child policy's result, so we pass its picker through as-is. - // 2. The serverlist contains only drop entries. In this case, we - // want to use our own picker so that we can return the drops. - // 3. Not in fallback mode and serverlist is not all drops (i.e., it - // may be empty or contain at least one backend address). There are - // two sub-cases: - // a. The child policy is reporting state READY. In this case, we wrap - // the child's picker in our own, so that we can handle drops and LB - // token metadata for each pick. - // b. The child policy is reporting a state other than READY. In this - // case, we don't want to use our own picker, because we don't want - // to process drops for picks that yield a QUEUE result; this would - // result in dropping too many calls, since we will see the - // queued picks multiple times, and we'd consider each one a - // separate call for the drop calculation. - // - // Cases 1 and 3b: return picker from the child policy as-is. - if (parent_->serverlist_ == nullptr || - (!parent_->serverlist_->ContainsAllDropEntries() && - state != GRPC_CHANNEL_READY)) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, - "[grpclb %p helper %p] state=%s (%s) passing " - "child picker %p as-is", - parent_.get(), this, ConnectivityStateName(state), - status.ToString().c_str(), picker.get()); - } - parent_->channel_control_helper()->UpdateState(state, status, - std::move(picker)); - return; - } - // Cases 2 and 3a: wrap picker from the child in our own picker. - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, - "[grpclb %p helper %p] state=%s (%s) wrapping child " - "picker %p", - parent_.get(), this, ConnectivityStateName(state), - status.ToString().c_str(), picker.get()); + // We pass the serverlist to the picker so that it can handle drops. + // However, we don't want to handle drops in the case where the child + // policy is reporting a state other than READY (unless we are + // dropping *all* calls), because we don't want to process drops for picks + // that yield a QUEUE result; this would result in dropping too many calls, + // since we will see the queued picks multiple times, and we'd consider each + // one a separate call for the drop calculation. So in this case, we pass + // a null serverlist to the picker, which tells it not to do drops. + RefCountedPtr serverlist; + if (state == GRPC_CHANNEL_READY || + (parent_->serverlist_ != nullptr && + parent_->serverlist_->ContainsAllDropEntries())) { + serverlist = parent_->serverlist_; } RefCountedPtr client_stats; if (parent_->lb_calld_ != nullptr && parent_->lb_calld_->client_stats() != nullptr) { client_stats = parent_->lb_calld_->client_stats()->Ref(); } + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, + "[grpclb %p helper %p] state=%s (%s) wrapping child " + "picker %p (serverlist=%p, client_stats=%p)", + parent_.get(), this, ConnectivityStateName(state), + status.ToString().c_str(), picker.get(), serverlist.get(), + client_stats.get()); + } parent_->channel_control_helper()->UpdateState( state, status, - absl::make_unique(parent_.get(), parent_->serverlist_, - std::move(picker), std::move(client_stats))); + absl::make_unique(std::move(serverlist), std::move(picker), + std::move(client_stats))); } void GrpcLb::Helper::RequestReresolution() {