[xDS] add optional locality label to per-call metrics (#36049)

Closes #36049

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36049 from markdroth:xds_locality_label_on_per_call_metrics 9c5aeb7fb1
PiperOrigin-RevId: 612648213
pull/36053/head
Mark D. Roth 11 months ago committed by Copybara-Service
parent 692b7fcb7f
commit db9ca10b35
  1. 1
      src/core/BUILD
  2. 12
      src/core/ext/xds/xds_client_stats.h
  3. 5
      src/core/lib/channel/call_tracer.h
  4. 119
      src/core/load_balancing/xds/xds_cluster_impl.cc
  5. 58
      test/cpp/end2end/xds/xds_cluster_end2end_test.cc
  6. 6
      test/cpp/end2end/xds/xds_utils.cc
  7. 3
      test/cpp/end2end/xds/xds_utils.h
  8. 6
      test/cpp/end2end/xds/xds_wrr_end2end_test.cc

@ -5277,6 +5277,7 @@ grpc_cc_library(
"lb_policy",
"lb_policy_factory",
"lb_policy_registry",
"match",
"pollset_set",
"ref_counted",
"resolved_address",

@ -63,7 +63,11 @@ class XdsLocalityName : public RefCounted<XdsLocalityName> {
XdsLocalityName(std::string region, std::string zone, std::string sub_zone)
: region_(std::move(region)),
zone_(std::move(zone)),
sub_zone_(std::move(sub_zone)) {}
sub_zone_(std::move(sub_zone)),
locality_labels_(
std::make_shared<std::map<std::string, std::string>>()) {
locality_labels_->emplace("grpc.lb.locality", AsHumanReadableString());
}
bool operator==(const XdsLocalityName& other) const {
return region_ == other.region_ && zone_ == other.zone_ &&
@ -85,6 +89,9 @@ class XdsLocalityName : public RefCounted<XdsLocalityName> {
const std::string& region() const { return region_; }
const std::string& zone() const { return zone_; }
const std::string& sub_zone() const { return sub_zone_; }
std::shared_ptr<std::map<std::string, std::string>> locality_labels() const {
return locality_labels_;
}
std::string AsHumanReadableString() const {
return absl::StrFormat("{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}",
@ -104,6 +111,7 @@ class XdsLocalityName : public RefCounted<XdsLocalityName> {
std::string region_;
std::string zone_;
std::string sub_zone_;
std::shared_ptr<std::map<std::string, std::string>> locality_labels_;
};
// Drop stats for an xds cluster.
@ -223,6 +231,8 @@ class XdsClusterLocalityStats : public RefCounted<XdsClusterLocalityStats> {
void AddCallFinished(const std::map<absl::string_view, double>* named_metrics,
bool fail = false);
XdsLocalityName* locality_name() const { return name_.get(); }
private:
struct Stats {
std::atomic<uint64_t> total_successful_requests{0};

@ -128,8 +128,9 @@ class ClientCallTracer : public CallTracerAnnotationInterface {
class CallAttemptTracer : public CallTracerInterface {
public:
enum class OptionalLabelComponent : std::uint8_t {
kXdsServiceLabels = 0,
kSize = 1, // keep last
kXdsServiceLabels,
kXdsLocalityLabels,
kSize, // keep last
};
~CallAttemptTracer() override {}

@ -51,6 +51,7 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -191,18 +192,46 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
private:
class StatsSubchannelWrapper : public DelegatingSubchannel {
public:
// If load reporting is enabled and we have an XdsClusterLocalityStats
// object, that object already contains the locality labels. We
// need to store the locality labels directly only in the case where
// load reporting is disabled.
using LocalityData = absl::variant<
std::shared_ptr<std::map<std::string, std::string>> /*locality_labels*/,
RefCountedPtr<XdsClusterLocalityStats> /*locality_stats*/>;
StatsSubchannelWrapper(
RefCountedPtr<SubchannelInterface> wrapped_subchannel,
RefCountedPtr<XdsClusterLocalityStats> locality_stats)
LocalityData locality_data)
: DelegatingSubchannel(std::move(wrapped_subchannel)),
locality_stats_(std::move(locality_stats)) {}
locality_data_(std::move(locality_data)) {}
std::shared_ptr<std::map<std::string, std::string>> locality_labels()
const {
return Match(
locality_data_,
[](const std::shared_ptr<std::map<std::string, std::string>>&
locality_labels) {
return locality_labels;
},
[](const RefCountedPtr<XdsClusterLocalityStats>& locality_stats) {
return locality_stats->locality_name()->locality_labels();
});
}
XdsClusterLocalityStats* locality_stats() const {
return locality_stats_.get();
return Match(
locality_data_,
[](const std::shared_ptr<std::map<std::string, std::string>>&) {
return static_cast<XdsClusterLocalityStats*>(nullptr);
},
[](const RefCountedPtr<XdsClusterLocalityStats>& locality_stats) {
return locality_stats.get();
});
}
private:
RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
LocalityData locality_data_;
};
// A picker that wraps the picker from the child to perform drops.
@ -375,8 +404,9 @@ XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
LoadBalancingPolicy::PickArgs args) {
auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
if (call_state->GetCallAttemptTracer() != nullptr) {
call_state->GetCallAttemptTracer()->AddOptionalLabels(
auto* call_attempt_tracer = call_state->GetCallAttemptTracer();
if (call_attempt_tracer != nullptr) {
call_attempt_tracer->AddOptionalLabels(
OptionalLabelComponent::kXdsServiceLabels, service_labels_);
}
// Handle EDS drops.
@ -404,16 +434,22 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
PickResult result = picker_->Pick(args);
auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
if (complete_pick != nullptr) {
auto* subchannel_wrapper =
static_cast<StatsSubchannelWrapper*>(complete_pick->subchannel.get());
// Add locality labels to per-call metrics if needed.
if (call_attempt_tracer != nullptr) {
call_attempt_tracer->AddOptionalLabels(
OptionalLabelComponent::kXdsLocalityLabels,
subchannel_wrapper->locality_labels());
}
// Handle load reporting.
RefCountedPtr<XdsClusterLocalityStats> locality_stats;
if (drop_stats_ != nullptr) { // If load reporting is enabled.
auto* subchannel_wrapper =
static_cast<StatsSubchannelWrapper*>(complete_pick->subchannel.get());
// Handle load reporting.
if (subchannel_wrapper->locality_stats() != nullptr) {
locality_stats = subchannel_wrapper->locality_stats()->Ref(
DEBUG_LOCATION, "SubchannelCallTracker");
// Unwrap subchannel to pass back up the stack.
complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
}
// Unwrap subchannel to pass back up the stack.
complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
// Inject subchannel call tracker to record call completion.
complete_pick->subchannel_call_tracker =
std::make_unique<SubchannelCallTracker>(
@ -743,36 +779,39 @@ RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
const grpc_resolved_address& address, const ChannelArgs& per_address_args,
const ChannelArgs& args) {
if (parent()->shutting_down_) return nullptr;
// If load reporting is enabled, wrap the subchannel such that it
// includes the locality stats object, which will be used by the Picker.
// Wrap the subchannel so that we pass along the locality labels and
// (if load reporting is enabled) the locality stats object, which
// will be used by the picker.
auto locality_name = per_address_args.GetObjectRef<XdsLocalityName>();
RefCountedPtr<XdsClusterLocalityStats> locality_stats;
if (parent()->cluster_resource_->lrs_load_reporting_server.has_value()) {
auto locality_name = per_address_args.GetObjectRef<XdsLocalityName>();
RefCountedPtr<XdsClusterLocalityStats> locality_stats =
parent()->xds_client_->AddClusterLocalityStats(
parent()->cluster_resource_->lrs_load_reporting_server.value(),
parent()->config_->cluster_name(),
GetEdsResourceName(*parent()->cluster_resource_),
std::move(locality_name));
if (locality_stats != nullptr) {
return MakeRefCounted<StatsSubchannelWrapper>(
parent()->channel_control_helper()->CreateSubchannel(
address, per_address_args, args),
std::move(locality_stats));
locality_stats = parent()->xds_client_->AddClusterLocalityStats(
parent()->cluster_resource_->lrs_load_reporting_server.value(),
parent()->config_->cluster_name(),
GetEdsResourceName(*parent()->cluster_resource_), locality_name);
if (locality_stats == nullptr) {
gpr_log(GPR_ERROR,
"[xds_cluster_impl_lb %p] Failed to get locality stats object "
"for LRS server %s, cluster %s, EDS service name %s; load "
"reports will not be generated",
parent(),
parent()
->cluster_resource_->lrs_load_reporting_server->server_uri()
.c_str(),
parent()->config_->cluster_name().c_str(),
GetEdsResourceName(*parent()->cluster_resource_).c_str());
}
gpr_log(GPR_ERROR,
"[xds_cluster_impl_lb %p] Failed to get locality stats object for "
"LRS server %s, cluster %s, EDS service name %s; load reports will "
"not be generated (not wrapping subchannel)",
parent(),
parent()
->cluster_resource_->lrs_load_reporting_server->server_uri()
.c_str(),
parent()->config_->cluster_name().c_str(),
GetEdsResourceName(*parent()->cluster_resource_).c_str());
}
// Load reporting not enabled, so don't wrap the subchannel.
return parent()->channel_control_helper()->CreateSubchannel(
address, per_address_args, args);
}
StatsSubchannelWrapper::LocalityData locality_data;
if (locality_stats != nullptr) {
locality_data = std::move(locality_stats);
} else {
locality_data = locality_name->locality_labels();
}
return MakeRefCounted<StatsSubchannelWrapper>(
parent()->channel_control_helper()->CreateSubchannel(
address, per_address_args, args),
std::move(locality_data));
}
void XdsClusterImplLb::Helper::UpdateState(

@ -309,12 +309,13 @@ TEST_P(CdsTest, ClusterChangeAfterAdsCallFails) {
WaitForBackend(DEBUG_LOCATION, 1);
}
TEST_P(CdsTest, VerifyCsmServiceLabelsParsing) {
TEST_P(CdsTest, MetricLabels) {
// Injects a fake client call tracer factory. Try keep this at top.
grpc_core::FakeClientCallTracerFactory fake_client_call_tracer_factory;
CreateAndStartBackends(1);
CreateAndStartBackends(2);
// Populates EDS resources.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)},
{"locality1", CreateEndpointsForBackends(1, 2)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Populates service labels to CDS resources.
auto cluster = default_cluster_;
@ -328,17 +329,46 @@ TEST_P(CdsTest, VerifyCsmServiceLabelsParsing) {
channel_args.SetPointer(GRPC_ARG_INJECT_FAKE_CLIENT_CALL_TRACER_FACTORY,
&fake_client_call_tracer_factory);
ResetStub(/*failover_timeout_ms=*/0, &channel_args);
// Sends an RPC and verifies that the service labels are recorded in the fake
// client call tracer.
CheckRpcSendOk(DEBUG_LOCATION);
EXPECT_THAT(fake_client_call_tracer_factory.GetLastFakeClientCallTracer()
->GetLastCallAttemptTracer()
->GetOptionalLabels(),
::testing::ElementsAre(::testing::Pair(
OptionalLabelComponent::kXdsServiceLabels,
::testing::Pointee(::testing::ElementsAre(
::testing::Pair("service_name", "myservice"),
::testing::Pair("service_namespace", "mynamespace"))))));
// Send an RPC to backend 0.
WaitForBackend(DEBUG_LOCATION, 0);
// Verify that the optional labels are recorded in the call tracer.
EXPECT_THAT(
fake_client_call_tracer_factory.GetLastFakeClientCallTracer()
->GetLastCallAttemptTracer()
->GetOptionalLabels(),
::testing::ElementsAre(
::testing::Pair(
OptionalLabelComponent::kXdsServiceLabels,
::testing::Pointee(::testing::ElementsAre(
::testing::Pair("service_name", "myservice"),
::testing::Pair("service_namespace", "mynamespace")))),
::testing::Pair(
OptionalLabelComponent::kXdsLocalityLabels,
::testing::Pointee(::testing::ElementsAre(::testing::Pair(
"grpc.lb.locality", LocalityNameString("locality0")))))));
// Send an RPC to backend 1.
WaitForBackend(DEBUG_LOCATION, 1);
// Verify that the optional labels are recorded in the call tracer.
EXPECT_THAT(
fake_client_call_tracer_factory.GetLastFakeClientCallTracer()
->GetLastCallAttemptTracer()
->GetOptionalLabels(),
::testing::ElementsAre(
::testing::Pair(
OptionalLabelComponent::kXdsServiceLabels,
::testing::Pointee(::testing::ElementsAre(
::testing::Pair("service_name", "myservice"),
::testing::Pair("service_namespace", "mynamespace")))),
::testing::Pair(
OptionalLabelComponent::kXdsLocalityLabels,
::testing::Pointee(::testing::ElementsAre(::testing::Pair(
"grpc.lb.locality", LocalityNameString("locality1")))))));
// TODO(yashkt, yijiem): This shutdown shouldn't actually be necessary.
// The only reason it's here is to add a delay before
// fake_client_call_tracer_factory goes out of scope, since there may
// be lingering callbacks in the call stack that are using the
// CallAttemptTracer even after we get here, which would then cause a
// crash. Find a cleaner way to fix this.
balancer_->Shutdown();
}

@ -317,6 +317,12 @@ void XdsResourceUtils::SetRouteConfiguration(
}
}
std::string XdsResourceUtils::LocalityNameString(absl::string_view sub_zone) {
return absl::StrFormat("{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}",
kDefaultLocalityRegion, kDefaultLocalityZone,
sub_zone);
}
ClusterLoadAssignment XdsResourceUtils::BuildEdsResource(
const EdsResourceArgs& args, absl::string_view eds_service_name) {
ClusterLoadAssignment assignment;

@ -195,6 +195,9 @@ class XdsResourceUtils {
bool use_rds = false,
const Listener* listener_to_copy = nullptr);
// Returns a string representing the locality with the specified sub_zone.
static std::string LocalityNameString(absl::string_view sub_zone);
// Arguments for constructing an EDS resource.
struct EdsResourceArgs {
// An individual endpoint for a backend running on a specified port.

@ -48,12 +48,6 @@ class WrrTest : public XdsEnd2endTest {
void SetUp() override {
// No-op -- tests must explicitly call InitClient().
}
static std::string LocalityNameString(absl::string_view sub_zone) {
return absl::StrFormat("{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}",
kDefaultLocalityRegion, kDefaultLocalityZone,
sub_zone);
}
};
INSTANTIATE_TEST_SUITE_P(XdsTest, WrrTest, ::testing::Values(XdsTestType()),

Loading…
Cancel
Save