Change grpclb to wrap the subchannels instead of attaching the attributes to them.

pull/24214/head
Mark D. Roth 4 years ago
parent 7c0b500f33
commit 7311a01760
  1. 139
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc

@ -235,6 +235,23 @@ class GrpcLb : public LoadBalancingPolicy {
grpc_closure client_load_report_closure_;
};
class SubchannelWrapper : public DelegatingSubchannel {
public:
SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,
std::string lb_token,
RefCountedPtr<GrpcLbClientStats> client_stats)
: DelegatingSubchannel(std::move(subchannel)),
lb_token_(std::move(lb_token)),
client_stats_(std::move(client_stats)) {}
const std::string& lb_token() const { return lb_token_; }
GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
private:
std::string lb_token_;
RefCountedPtr<GrpcLbClientStats> client_stats_;
};
class TokenAndClientStatsAttribute
: public ServerAddress::AttributeInterface {
public:
@ -262,7 +279,9 @@ class GrpcLb : public LoadBalancingPolicy {
}
const std::string& lb_token() const { return lb_token_; }
GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
RefCountedPtr<GrpcLbClientStats> client_stats() const {
return client_stats_;
}
private:
std::string lb_token_;
@ -310,21 +329,16 @@ class GrpcLb : public LoadBalancingPolicy {
class Picker : public SubchannelPicker {
public:
Picker(GrpcLb* parent, RefCountedPtr<Serverlist> serverlist,
Picker(RefCountedPtr<Serverlist> serverlist,
std::unique_ptr<SubchannelPicker> child_picker,
RefCountedPtr<GrpcLbClientStats> client_stats)
: parent_(parent),
serverlist_(std::move(serverlist)),
: serverlist_(std::move(serverlist)),
child_picker_(std::move(child_picker)),
client_stats_(std::move(client_stats)) {}
PickResult Pick(PickArgs args) override;
private:
// Storing the address for logging, but not holding a ref.
// DO NOT DEFERENCE!
GrpcLb* parent_;
// Serverlist to be used for determining drops.
RefCountedPtr<Serverlist> serverlist_;
@ -591,7 +605,8 @@ const char* GrpcLb::Serverlist::ShouldDrop() {
GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
PickResult result;
// Check if we should drop the call.
const char* drop_token = serverlist_->ShouldDrop();
const char* drop_token =
serverlist_ == nullptr ? nullptr : serverlist_->ShouldDrop();
if (drop_token != nullptr) {
// Update client load reporting stats to indicate the number of
// dropped calls. Note that we have to do this here instead of in
@ -609,17 +624,11 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
// If pick succeeded, add LB token to initial metadata.
if (result.type == PickResult::PICK_COMPLETE &&
result.subchannel != nullptr) {
const TokenAndClientStatsAttribute* attribute =
static_cast<const TokenAndClientStatsAttribute*>(
result.subchannel->GetAttribute(kGrpcLbAddressAttributeKey));
if (attribute == nullptr) {
gpr_log(GPR_ERROR, "[grpclb %p picker %p] No LB token for subchannel %p",
parent_, this, result.subchannel.get());
abort();
}
const SubchannelWrapper* subchannel_wrapper =
static_cast<SubchannelWrapper*>(result.subchannel.get());
// Encode client stats object into metadata for use by
// client_load_reporting filter.
GrpcLbClientStats* client_stats = attribute->client_stats();
GrpcLbClientStats* client_stats = subchannel_wrapper->client_stats();
if (client_stats != nullptr) {
client_stats->Ref().release(); // Ref passed via metadata.
// The metadata value is a hack: we pretend the pointer points to
@ -635,10 +644,14 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
// Create a new copy on the call arena, since the subchannel list
// may get refreshed between when we return this pick and when the
// initial metadata goes out on the wire.
char* lb_token = static_cast<char*>(
args.call_state->Alloc(attribute->lb_token().size() + 1));
strcpy(lb_token, attribute->lb_token().c_str());
args.initial_metadata->Add(kGrpcLbLbTokenMetadataKey, lb_token);
if (!subchannel_wrapper->lb_token().empty()) {
char* lb_token = static_cast<char*>(
args.call_state->Alloc(subchannel_wrapper->lb_token().size() + 1));
strcpy(lb_token, subchannel_wrapper->lb_token().c_str());
args.initial_metadata->Add(kGrpcLbLbTokenMetadataKey, lb_token);
}
// Unwrap subchannel to pass up to the channel.
result.subchannel = subchannel_wrapper->wrapped_subchannel();
}
return result;
}
@ -650,8 +663,21 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
ServerAddress address, const grpc_channel_args& args) {
if (parent_->shutting_down_) return nullptr;
return parent_->channel_control_helper()->CreateSubchannel(std::move(address),
args);
const TokenAndClientStatsAttribute* attribute =
static_cast<const TokenAndClientStatsAttribute*>(
address.GetAttribute(kGrpcLbAddressAttributeKey));
if (attribute == nullptr) {
gpr_log(GPR_ERROR,
"[grpclb %p] no TokenAndClientStatsAttribute for address %p",
parent_.get(), address.ToString().c_str());
abort();
}
std::string lb_token = attribute->lb_token();
RefCountedPtr<GrpcLbClientStats> client_stats = attribute->client_stats();
return MakeRefCounted<SubchannelWrapper>(
parent_->channel_control_helper()->CreateSubchannel(std::move(address),
args),
std::move(lb_token), std::move(client_stats));
}
void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
@ -662,56 +688,37 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
parent_->child_policy_ready_ = state == GRPC_CHANNEL_READY;
// Enter fallback mode if needed.
parent_->MaybeEnterFallbackModeAfterStartup();
// There are three cases to consider here:
// 1. We're in fallback mode. In this case, we're always going to use
// the child policy's result, so we pass its picker through as-is.
// 2. The serverlist contains only drop entries. In this case, we
// want to use our own picker so that we can return the drops.
// 3. Not in fallback mode and serverlist is not all drops (i.e., it
// may be empty or contain at least one backend address). There are
// two sub-cases:
// a. The child policy is reporting state READY. In this case, we wrap
// the child's picker in our own, so that we can handle drops and LB
// token metadata for each pick.
// b. The child policy is reporting a state other than READY. In this
// case, we don't want to use our own picker, because we don't want
// to process drops for picks that yield a QUEUE result; this would
// result in dropping too many calls, since we will see the
// queued picks multiple times, and we'd consider each one a
// separate call for the drop calculation.
//
// Cases 1 and 3b: return picker from the child policy as-is.
if (parent_->serverlist_ == nullptr ||
(!parent_->serverlist_->ContainsAllDropEntries() &&
state != GRPC_CHANNEL_READY)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p helper %p] state=%s (%s) passing "
"child picker %p as-is",
parent_.get(), this, ConnectivityStateName(state),
status.ToString().c_str(), picker.get());
}
parent_->channel_control_helper()->UpdateState(state, status,
std::move(picker));
return;
}
// Cases 2 and 3a: wrap picker from the child in our own picker.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p helper %p] state=%s (%s) wrapping child "
"picker %p",
parent_.get(), this, ConnectivityStateName(state),
status.ToString().c_str(), picker.get());
// We pass the serverlist to the picker so that it can handle drops.
// However, we don't want to handle drops in the case where the child
// policy is reporting a state other than READY (unless we are
// dropping *all* calls), because we don't want to process drops for picks
// that yield a QUEUE result; this would result in dropping too many calls,
// since we will see the queued picks multiple times, and we'd consider each
// one a separate call for the drop calculation. So in this case, we pass
// a null serverlist to the picker, which tells it not to do drops.
RefCountedPtr<Serverlist> serverlist;
if (state == GRPC_CHANNEL_READY ||
(parent_->serverlist_ != nullptr &&
parent_->serverlist_->ContainsAllDropEntries())) {
serverlist = parent_->serverlist_;
}
RefCountedPtr<GrpcLbClientStats> client_stats;
if (parent_->lb_calld_ != nullptr &&
parent_->lb_calld_->client_stats() != nullptr) {
client_stats = parent_->lb_calld_->client_stats()->Ref();
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p helper %p] state=%s (%s) wrapping child "
"picker %p (serverlist=%p, client_stats=%p)",
parent_.get(), this, ConnectivityStateName(state),
status.ToString().c_str(), picker.get(), serverlist.get(),
client_stats.get());
}
parent_->channel_control_helper()->UpdateState(
state, status,
absl::make_unique<Picker>(parent_.get(), parent_->serverlist_,
std::move(picker), std::move(client_stats)));
absl::make_unique<Picker>(std::move(serverlist), std::move(picker),
std::move(client_stats)));
}
void GrpcLb::Helper::RequestReresolution() {

Loading…
Cancel
Save