[XdsClient] add grpc.xds_client.server_failure metric (#36291)

As per gRFC A78 (https://github.com/grpc/proposal/pull/419).

Closes #36291

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36291 from markdroth:xds_client_counter a9363c2105
PiperOrigin-RevId: 623935691
pull/36343/head
Mark D. Roth 8 months ago committed by Copybara-Service
parent 8acddbb01e
commit bfe6db2924
  1. 4
      src/core/ext/xds/xds_client.cc
  2. 14
      src/core/ext/xds/xds_client_grpc.cc
  3. 2
      src/core/ext/xds/xds_metrics.h
  4. 34
      test/core/xds/xds_client_test.cc
  5. 47
      test/cpp/end2end/xds/xds_core_end2end_test.cc

@ -641,6 +641,10 @@ void XdsClient::XdsChannel::SetChannelStatusLocked(absl::Status status) {
absl::StrCat(status.message(),
" (node ID:", xds_client_->bootstrap_->node()->id(), ")"));
}
// If status was previously OK, report that the channel has gone unhealthy.
if (status_.ok() && xds_client_->metrics_reporter_ != nullptr) {
xds_client_->metrics_reporter_->ReportServerFailure(server_.server_uri());
}
// Save status in channel, so that we can immediately generate an
// error for any new watchers that may be started.
status_ = status;

@ -114,6 +114,15 @@ const auto kMetricResourceUpdatesInvalid =
kMetricLabelXdsResourceType},
{}, false);
const auto kMetricServerFailure =
GlobalInstrumentsRegistry::RegisterUInt64Counter(
"grpc.xds_client.server_failure",
"EXPERIMENTAL. A counter of xDS servers going from healthy to "
"unhealthy. A server goes unhealthy when we have a connectivity "
"failure or when the ADS stream fails without seeing a response "
"message, as per gRFC A57.",
"{failure}", {kMetricLabelTarget, kMetricLabelXdsServer}, {}, false);
const auto kMetricConnected =
GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
"grpc.xds_client.connected",
@ -156,6 +165,11 @@ class GrpcXdsClient::MetricsReporter final : public XdsMetricsReporter {
{xds_client_.key_, xds_server, resource_type}, {});
}
void ReportServerFailure(absl::string_view xds_server) override {
xds_client_.stats_plugin_group_.AddCounter(
kMetricServerFailure, 1, {xds_client_.key_, xds_server}, {});
}
private:
GrpcXdsClient& xds_client_;
};

@ -32,6 +32,8 @@ class XdsMetricsReporter {
absl::string_view resource_type,
uint64_t num_valid,
uint64_t num_invalid) = 0;
virtual void ReportServerFailure(absl::string_view xds_server) = 0;
};
} // namespace grpc_core

@ -625,6 +625,7 @@ class XdsClientTest : public ::testing::Test {
using ResourceUpdateMap = std::map<
std::pair<std::string /*xds_server*/, std::string /*resource_type*/>,
uint64_t>;
using ServerFailureMap = std::map<std::string /*xds_server*/, uint64_t>;
const ResourceUpdateMap& resource_updates_valid() const {
return resource_updates_valid_;
@ -632,6 +633,7 @@ class XdsClientTest : public ::testing::Test {
const ResourceUpdateMap& resource_updates_invalid() const {
return resource_updates_invalid_;
}
const ServerFailureMap& server_failures() const { return server_failures_; }
private:
void ReportResourceUpdates(absl::string_view xds_server,
@ -648,8 +650,13 @@ class XdsClientTest : public ::testing::Test {
}
}
void ReportServerFailure(absl::string_view xds_server) override {
++server_failures_[std::string(xds_server)];
}
ResourceUpdateMap resource_updates_valid_;
ResourceUpdateMap resource_updates_invalid_;
ServerFailureMap server_failures_;
};
using ResourceCounts =
@ -876,6 +883,7 @@ TEST_F(XdsClientTest, BasicWatch) {
::testing::ElementsAre());
EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre());
EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
EXPECT_THAT(metrics_reporter_->server_failures(), ::testing::ElementsAre());
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Check metrics.
@ -2303,6 +2311,7 @@ TEST_F(XdsClientTest, StreamClosedByServerWithoutSeeingResponse) {
// Check metric data.
EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
kDefaultXdsServerUrl, true)));
EXPECT_THAT(metrics_reporter_->server_failures(), ::testing::ElementsAre());
// Watcher should initially not see any resource reported.
EXPECT_FALSE(watcher->HasEvent());
// XdsClient should have created an ADS stream.
@ -2321,6 +2330,8 @@ TEST_F(XdsClientTest, StreamClosedByServerWithoutSeeingResponse) {
// Check metric data.
EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
kDefaultXdsServerUrl, false)));
EXPECT_THAT(metrics_reporter_->server_failures(),
::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1)));
// XdsClient should report an error to the watcher.
auto error = watcher->WaitForNextError();
ASSERT_TRUE(error.has_value());
@ -2344,6 +2355,8 @@ TEST_F(XdsClientTest, StreamClosedByServerWithoutSeeingResponse) {
// Connection still reported as unhappy until we get a response.
EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
kDefaultXdsServerUrl, false)));
EXPECT_THAT(metrics_reporter_->server_failures(),
::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1)));
// Server now sends the requested resource.
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
@ -2380,6 +2393,7 @@ TEST_F(XdsClientTest, ConnectionFails) {
transport_factory_->SetAutoCompleteMessagesFromClient(false);
// Metrics should initially be empty.
EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
EXPECT_THAT(metrics_reporter_->server_failures(), ::testing::ElementsAre());
// Start a watch for "foo1".
auto watcher = StartFooWatch("foo1");
// Check metric data.
@ -2412,6 +2426,8 @@ TEST_F(XdsClientTest, ConnectionFails) {
// Connection reported as unhappy.
EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
kDefaultXdsServerUrl, false)));
EXPECT_THAT(metrics_reporter_->server_failures(),
::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1)));
// We should not see a resource-does-not-exist event, because the
// timer should not be running while the channel is disconnected.
EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4)));
@ -3709,6 +3725,9 @@ TEST_F(XdsClientTest, FederationChannelFailureReportedToWatchers) {
::testing::ElementsAre(
::testing::Pair(kDefaultXdsServerUrl, true),
::testing::Pair(authority_server.server_uri(), false)));
EXPECT_THAT(metrics_reporter_->server_failures(),
::testing::ElementsAre(
::testing::Pair(authority_server.server_uri(), 1)));
// Cancel watch for "foo1".
CancelFooWatch(watcher.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
@ -3821,6 +3840,7 @@ TEST_F(XdsClientTest, FallbackAndRecover) {
1)));
EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
::testing::IsEmpty());
EXPECT_THAT(metrics_reporter_->server_failures(), ::testing::ElementsAre());
// XdsClient should have created an ADS stream.
auto stream = WaitForAdsStream();
ASSERT_TRUE(stream != nullptr);
@ -3875,11 +3895,15 @@ TEST_F(XdsClientTest, FallbackAndRecover) {
// Result (local): The metrics show the channel as being unhealthy.
EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
kDefaultXdsServerUrl, false)));
EXPECT_THAT(metrics_reporter_->server_failures(),
::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1)));
// Input: Trigger stream failure.
stream->MaybeSendStatusToClient(absl::UnavailableError("Stream failure"));
// Result (local): The metrics still show the channel as being unhealthy.
EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
kDefaultXdsServerUrl, false)));
EXPECT_THAT(metrics_reporter_->server_failures(),
::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1)));
// Result (remote): The client starts a new stream and sends a subscription
// message. Note that the server does not respond, so the channel will still
// have non-OK status.
@ -3998,6 +4022,8 @@ TEST_F(XdsClientTest, FallbackAndRecover) {
2)));
EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
kDefaultXdsServerUrl, true)));
EXPECT_THAT(metrics_reporter_->server_failures(),
::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1)));
// Result (remote): The stream to the fallback server has been orphaned.
EXPECT_TRUE(stream2->Orphaned());
// Result (local): Resources are delivered to watchers.
@ -4053,6 +4079,8 @@ TEST_F(XdsClientTest, FallbackReportsError) {
::testing::ElementsAre(
::testing::Pair(kDefaultXdsServerUrl, false),
::testing::Pair(fallback_server.server_uri(), true)));
EXPECT_THAT(metrics_reporter_->server_failures(),
::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1)));
// Fallback happens now
stream = WaitForAdsStream(fallback_server);
ASSERT_NE(stream, nullptr);
@ -4068,6 +4096,10 @@ TEST_F(XdsClientTest, FallbackReportsError) {
::testing::ElementsAre(
::testing::Pair(kDefaultXdsServerUrl, false),
::testing::Pair(fallback_server.server_uri(), false)));
EXPECT_THAT(
metrics_reporter_->server_failures(),
::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1),
::testing::Pair(fallback_server.server_uri(), 1)));
auto error = watcher->WaitForNextError();
ASSERT_TRUE(error.has_value());
EXPECT_THAT(error->code(), absl::StatusCode::kUnavailable);
@ -4117,6 +4149,8 @@ TEST_F(XdsClientTest, FallbackOnStartup) {
::testing::ElementsAre(
::testing::Pair(kDefaultXdsServerUrl, false),
::testing::Pair(fallback_server.server_uri(), true)));
EXPECT_THAT(metrics_reporter_->server_failures(),
::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1)));
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_NE(resource, nullptr);

@ -1130,6 +1130,23 @@ TEST_P(XdsMetricsTest, MetricDefinitionResourceUpdatesInvalid) {
EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
}
TEST_P(XdsMetricsTest, MetricDefinitionServerFailure) {
const auto* descriptor =
grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
"grpc.xds_client.server_failure");
ASSERT_NE(descriptor, nullptr);
EXPECT_EQ(descriptor->value_type,
grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64);
EXPECT_EQ(descriptor->instrument_type,
grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter);
EXPECT_EQ(descriptor->enable_by_default, false);
EXPECT_EQ(descriptor->name, "grpc.xds_client.server_failure");
EXPECT_EQ(descriptor->unit, "{failure}");
EXPECT_THAT(descriptor->label_keys,
::testing::ElementsAre("grpc.target", "grpc.xds.server"));
EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
}
TEST_P(XdsMetricsTest, MetricDefinitionConnected) {
const auto* descriptor =
grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
@ -1179,6 +1196,10 @@ TEST_P(XdsMetricsTest, MetricValues) {
FindUInt64CounterHandleByName(
"grpc.xds_client.resource_updates_invalid")
.value();
const auto kMetricServerFailure =
grpc_core::GlobalInstrumentsRegistryTestPeer::
FindUInt64CounterHandleByName("grpc.xds_client.server_failure")
.value();
const auto kMetricConnected =
grpc_core::GlobalInstrumentsRegistryTestPeer::
FindCallbackInt64GaugeHandleByName("grpc.xds_client.connected")
@ -1199,6 +1220,9 @@ TEST_P(XdsMetricsTest, MetricValues) {
EXPECT_THAT(stats_plugin_->GetCallbackGaugeValue(kMetricConnected,
{kTarget, kXdsServer}, {}),
::testing::Optional(1));
EXPECT_THAT(stats_plugin_->GetCounterValue(kMetricServerFailure,
{kTarget, kXdsServer}, {}),
absl::nullopt);
for (absl::string_view type_url :
{"envoy.config.listener.v3.Listener",
"envoy.config.route.v3.RouteConfiguration",
@ -1220,6 +1244,9 @@ TEST_P(XdsMetricsTest, MetricValues) {
EXPECT_THAT(stats_plugin_->GetCallbackGaugeValue(kMetricConnected,
{"#server", kXdsServer}, {}),
::testing::Optional(1));
EXPECT_THAT(stats_plugin_->GetCounterValue(kMetricServerFailure,
{"#server", kXdsServer}, {}),
absl::nullopt);
for (absl::string_view type_url :
{"envoy.config.listener.v3.Listener",
"envoy.config.route.v3.RouteConfiguration"}) {
@ -1236,6 +1263,26 @@ TEST_P(XdsMetricsTest, MetricValues) {
kMetricResources, {"#server", "#old", type_url, "acked"}, {}),
::testing::Optional(1));
}
// Shut down balancer and wait for metrics to show the failure.
balancer_->Shutdown();
for (const char* target : {kTarget.c_str(), "#server"}) {
const absl::Time deadline =
absl::Now() + absl::Seconds(5 * grpc_test_slowdown_factor());
while (true) {
auto value = stats_plugin_->GetCounterValue(kMetricServerFailure,
{target, kXdsServer}, {});
if (value.has_value()) {
EXPECT_EQ(1, *value);
break;
}
ASSERT_LE(absl::Now(), deadline);
absl::SleepFor(absl::Seconds(1));
}
stats_plugin_->TriggerCallbacks();
EXPECT_THAT(stats_plugin_->GetCallbackGaugeValue(kMetricConnected,
{target, kXdsServer}, {}),
::testing::Optional(0));
}
}
//

Loading…
Cancel
Save