Store grpclb LB token and stats object in a per-address attribute.

pull/24174/head
Mark D. Roth 4 years ago
parent 3a8f1d3b7c
commit b36fbb75ed
  1. 158
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc

@ -124,6 +124,8 @@ TraceFlag grpc_lb_glb_trace(false, "glb");
const char kGrpcLbClientStatsMetadataKey[] = "grpclb_client_stats";
const char kGrpcLbLbTokenMetadataKey[] = "lb-token";
const char kGrpcLbAddressAttributeKey[] = "grpclb";
namespace {
constexpr char kGrpclb[] = "grpclb";
@ -233,6 +235,40 @@ class GrpcLb : public LoadBalancingPolicy {
grpc_closure client_load_report_closure_;
};
class TokenAndClientStatsAttribute
: public ServerAddress::AttributeInterface {
public:
TokenAndClientStatsAttribute(std::string lb_token,
RefCountedPtr<GrpcLbClientStats> client_stats)
: lb_token_(std::move(lb_token)),
client_stats_(std::move(client_stats)) {}
std::unique_ptr<AttributeInterface> Copy() const override {
return absl::make_unique<TokenAndClientStatsAttribute>(lb_token_,
client_stats_);
}
int Cmp(const AttributeInterface* other_base) const override {
const TokenAndClientStatsAttribute* other =
static_cast<const TokenAndClientStatsAttribute*>(other_base);
int r = lb_token_.compare(other->lb_token_);
if (r != 0) return r;
return GPR_ICMP(client_stats_.get(), other->client_stats_.get());
}
std::string ToString() const override {
return absl::StrFormat("lb_token=\"%s\" client_stats=%p", lb_token_,
client_stats_.get());
}
const std::string& lb_token() const { return lb_token_; }
GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
private:
std::string lb_token_;
RefCountedPtr<GrpcLbClientStats> client_stats_;
};
class Serverlist : public RefCounted<Serverlist> {
public:
// Takes ownership of serverlist.
@ -352,6 +388,8 @@ class GrpcLb : public LoadBalancingPolicy {
// Helper functions used in UpdateLocked().
void ProcessAddressesAndChannelArgsLocked(const ServerAddressList& addresses,
const grpc_channel_args& args);
static ServerAddressList AddNullLbTokenToAddresses(
const ServerAddressList& addresses);
void CancelBalancerChannelConnectivityWatchLocked();
@ -473,44 +511,6 @@ std::string GrpcLb::Serverlist::AsText() const {
return absl::StrJoin(entries, "");
}
// vtables for channel args for LB token and client stats.
void* lb_token_copy(void* token) {
return gpr_strdup(static_cast<char*>(token));
}
void lb_token_destroy(void* token) { gpr_free(token); }
void* client_stats_copy(void* p) {
GrpcLbClientStats* client_stats = static_cast<GrpcLbClientStats*>(p);
client_stats->Ref().release();
return p;
}
void client_stats_destroy(void* p) {
GrpcLbClientStats* client_stats = static_cast<GrpcLbClientStats*>(p);
client_stats->Unref();
}
int equal_cmp(void* /*p1*/, void* /*p2*/) {
// Always indicate a match, since we don't want this channel arg to
// affect the subchannel's key in the index.
// TODO(roth): Is this right? This does prevent us from needlessly
// recreating the subchannel whenever the LB token or client stats
// changes (i.e., when the balancer call is terminated and reestablished).
// However, it means that we don't actually recreate the subchannel,
// which means that we won't ever switch over to using the new LB
// token or client stats. A better approach might be to find somewhere
// other than the subchannel args to store the LB token and client
// stats. They could be stored in a map and then looked up for each
// call. Or we could do something more complicated whereby
// we create our own subchannel wrapper to store them, although that would
// involve a lot of refcounting overhead.
// Given that we're trying to move from grpclb to xds at this point,
// and that no one has actually reported any problems with this, we
// probably won't bother fixing this at this point.
return 0;
}
const grpc_arg_pointer_vtable lb_token_arg_vtable = {
lb_token_copy, lb_token_destroy, equal_cmp};
const grpc_arg_pointer_vtable client_stats_arg_vtable = {
client_stats_copy, client_stats_destroy, equal_cmp};
bool IsServerValid(const GrpcLbServer& server, size_t idx, bool log) {
if (server.drop) return false;
if (GPR_UNLIKELY(server.port >> 16 != 0)) {
@ -536,6 +536,8 @@ bool IsServerValid(const GrpcLbServer& server, size_t idx, bool log) {
// Returns addresses extracted from the serverlist.
ServerAddressList GrpcLb::Serverlist::GetServerAddressList(
GrpcLbClientStats* client_stats) const {
RefCountedPtr<GrpcLbClientStats> stats;
if (client_stats != nullptr) stats = client_stats->Ref();
ServerAddressList addresses;
for (size_t i = 0; i < serverlist_.size(); ++i) {
const GrpcLbServer& server = serverlist_[i];
@ -544,34 +546,23 @@ ServerAddressList GrpcLb::Serverlist::GetServerAddressList(
grpc_resolved_address addr;
ParseServer(server, &addr);
// LB token processing.
char lb_token[GPR_ARRAY_SIZE(server.load_balance_token) + 1];
if (server.load_balance_token[0] != 0) {
const size_t lb_token_max_length =
GPR_ARRAY_SIZE(server.load_balance_token);
const size_t lb_token_length =
strnlen(server.load_balance_token, lb_token_max_length);
memcpy(lb_token, server.load_balance_token, lb_token_length);
lb_token[lb_token_length] = '\0';
} else {
const size_t lb_token_length = strnlen(
server.load_balance_token, GPR_ARRAY_SIZE(server.load_balance_token));
std::string lb_token(server.load_balance_token, lb_token_length);
if (lb_token.empty()) {
gpr_log(GPR_INFO,
"Missing LB token for backend address '%s'. The empty token will "
"be used instead",
grpc_sockaddr_to_uri(&addr).c_str());
lb_token[0] = '\0';
}
// Attach attribute to address containing LB token and stats object.
std::map<const char*, std::unique_ptr<ServerAddress::AttributeInterface>>
attributes;
attributes[kGrpcLbAddressAttributeKey] =
absl::make_unique<TokenAndClientStatsAttribute>(std::move(lb_token),
stats);
// Add address.
absl::InlinedVector<grpc_arg, 2> args_to_add;
args_to_add.emplace_back(grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token,
&lb_token_arg_vtable));
if (client_stats != nullptr) {
args_to_add.emplace_back(grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_CLIENT_STATS), client_stats,
&client_stats_arg_vtable));
}
grpc_channel_args* args = grpc_channel_args_copy_and_add(
nullptr, args_to_add.data(), args_to_add.size());
addresses.emplace_back(addr, args);
addresses.emplace_back(addr, /*args=*/nullptr, std::move(attributes));
}
return addresses;
}
@ -616,15 +607,18 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
// If pick succeeded, add LB token to initial metadata.
if (result.type == PickResult::PICK_COMPLETE &&
result.subchannel != nullptr) {
const TokenAndClientStatsAttribute* attribute =
static_cast<const TokenAndClientStatsAttribute*>(
result.subchannel->GetAttribute(kGrpcLbAddressAttributeKey));
if (attribute == nullptr) {
gpr_log(GPR_ERROR, "[grpclb %p picker %p] No LB token for subchannel %p",
parent_, this, result.subchannel.get());
abort();
}
// Encode client stats object into metadata for use by
// client_load_reporting filter.
const grpc_arg* arg =
grpc_channel_args_find(result.subchannel->channel_args(),
GRPC_ARG_GRPCLB_ADDRESS_CLIENT_STATS);
if (arg != nullptr && arg->type == GRPC_ARG_POINTER &&
arg->value.pointer.p != nullptr) {
GrpcLbClientStats* client_stats =
static_cast<GrpcLbClientStats*>(arg->value.pointer.p);
GrpcLbClientStats* client_stats = attribute->client_stats();
if (client_stats != nullptr) {
client_stats->Ref().release(); // Ref passed via metadata.
// The metadata value is a hack: we pretend the pointer points to
// a string and rely on the client_load_reporting filter to know
@ -636,15 +630,13 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
client_stats->AddCallStarted();
}
// Encode the LB token in metadata.
arg = grpc_channel_args_find(result.subchannel->channel_args(),
GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
if (arg == nullptr) {
gpr_log(GPR_ERROR, "[grpclb %p picker %p] No LB token for subchannel %p",
parent_, this, result.subchannel.get());
abort();
}
args.initial_metadata->Add(kGrpcLbLbTokenMetadataKey,
static_cast<char*>(arg->value.pointer.p));
// Create a new copy on the call arena, since the subchannel list
// may get refreshed between when we return this pick and when the
// initial metadata goes out on the wire.
char* lb_token = static_cast<char*>(
args.call_state->Alloc(attribute->lb_token().size() + 1));
strcpy(lb_token, attribute->lb_token().c_str());
args.initial_metadata->Add(kGrpcLbLbTokenMetadataKey, lb_token);
}
return result;
}
@ -1436,17 +1428,13 @@ void GrpcLb::UpdateLocked(UpdateArgs args) {
// helpers for UpdateLocked()
//
ServerAddressList AddNullLbTokenToAddresses(
ServerAddressList GrpcLb::AddNullLbTokenToAddresses(
const ServerAddressList& addresses) {
static const char* lb_token = "";
grpc_arg arg = grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN),
const_cast<char*>(lb_token), &lb_token_arg_vtable);
ServerAddressList addresses_out;
for (size_t i = 0; i < addresses.size(); ++i) {
addresses_out.emplace_back(
addresses[i].address(),
grpc_channel_args_copy_and_add(addresses[i].args(), &arg, 1));
for (const ServerAddress& address : addresses) {
addresses_out.emplace_back(address.WithAttribute(
kGrpcLbAddressAttributeKey,
absl::make_unique<TokenAndClientStatsAttribute>("", nullptr)));
}
return addresses_out;
}

Loading…
Cancel
Save