[work-serializer] Add some basic process-wide monitoring (#34369)

Add some basic metrics to work serializer, keep them process wide for
now (though it may be interesting to get these into channelz in the
future).

Collected are:
- time spent running a work serializer when it starts
- time spent actually executing work when the work serializer runs
- number of items executed each run

A high disparity between the first two indicates our dispatching
mechanism is adding large amounts of latency (perhaps due to thread
starvation like effects).

A high value for any of these indicate contention on the serializer.

It's likely a future iteration on these will select different metrics -
I'm not *entirely* sure which will be useful in production analysis yet.

I'm using `std::chrono::steady_clock` here for precision (nanoseconds)
with a compact representation (better than timespec) and a robust &
portable api - I think it's appropriate for metrics, but wouldn't use it
much beyond that at this point.
pull/34393/head^2
Craig Tiller 1 year ago committed by GitHub
parent 214776e6aa
commit 47306d78f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 197
      src/core/lib/debug/stats_data.cc
  3. 101
      src/core/lib/debug/stats_data.h
  4. 21
      src/core/lib/debug/stats_data.yaml
  5. 28
      src/core/lib/gprpp/work_serializer.cc
  6. 5
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  7. 2
      test/core/client_channel/lb_policy/pick_first_test.cc
  8. 109
      test/core/gprpp/work_serializer_test.cc

@ -2425,7 +2425,9 @@ grpc_cc_library(
"gpr",
"grpc_trace",
"orphanable",
"stats",
"//src/core:experiments",
"//src/core:stats_data",
],
)

@ -27,6 +27,19 @@ union DblUint {
uint64_t uint;
};
} // namespace
void HistogramCollector_100000_20::Collect(Histogram_100000_20* result) const {
for (int i = 0; i < 20; i++) {
result->buckets_[i] += buckets_[i].load(std::memory_order_relaxed);
}
}
Histogram_100000_20 operator-(const Histogram_100000_20& left,
const Histogram_100000_20& right) {
Histogram_100000_20 result;
for (int i = 0; i < 20; i++) {
result.buckets_[i] = left.buckets_[i] - right.buckets_[i];
}
return result;
}
void HistogramCollector_65536_26::Collect(Histogram_65536_26* result) const {
for (int i = 0; i < 26; i++) {
result->buckets_[i] += buckets_[i].load(std::memory_order_relaxed);
@ -54,28 +67,28 @@ Histogram_16777216_20 operator-(const Histogram_16777216_20& left,
}
return result;
}
void HistogramCollector_10000_20::Collect(Histogram_10000_20* result) const {
for (int i = 0; i < 20; i++) {
void HistogramCollector_80_10::Collect(Histogram_80_10* result) const {
for (int i = 0; i < 10; i++) {
result->buckets_[i] += buckets_[i].load(std::memory_order_relaxed);
}
}
Histogram_10000_20 operator-(const Histogram_10000_20& left,
const Histogram_10000_20& right) {
Histogram_10000_20 result;
for (int i = 0; i < 20; i++) {
Histogram_80_10 operator-(const Histogram_80_10& left,
const Histogram_80_10& right) {
Histogram_80_10 result;
for (int i = 0; i < 10; i++) {
result.buckets_[i] = left.buckets_[i] - right.buckets_[i];
}
return result;
}
void HistogramCollector_80_10::Collect(Histogram_80_10* result) const {
for (int i = 0; i < 10; i++) {
void HistogramCollector_10000_20::Collect(Histogram_10000_20* result) const {
for (int i = 0; i < 20; i++) {
result->buckets_[i] += buckets_[i].load(std::memory_order_relaxed);
}
}
Histogram_80_10 operator-(const Histogram_80_10& left,
const Histogram_80_10& right) {
Histogram_80_10 result;
for (int i = 0; i < 10; i++) {
Histogram_10000_20 operator-(const Histogram_10000_20& left,
const Histogram_10000_20& right) {
Histogram_10000_20 result;
for (int i = 0; i < 20; i++) {
result.buckets_[i] = left.buckets_[i] - right.buckets_[i];
}
return result;
@ -101,6 +114,8 @@ const absl::string_view
"cq_next_creates",
"cq_callback_creates",
"wrr_updates",
"work_serializer_items_enqueued",
"work_serializer_items_dequeued",
};
const absl::string_view GlobalStats::counter_doc[static_cast<int>(
Counter::COUNT)] = {
@ -129,14 +144,25 @@ const absl::string_view GlobalStats::counter_doc[static_cast<int>(
"Number of completion queues created for cq_callback (indicates callback "
"api usage)",
"Number of wrr updates that have been received",
"Number of items enqueued onto work serializers",
"Number of items dequeued from work serializers",
};
const absl::string_view
GlobalStats::histogram_name[static_cast<int>(Histogram::COUNT)] = {
"call_initial_size", "tcp_write_size",
"tcp_write_iov_size", "tcp_read_size",
"tcp_read_offer", "tcp_read_offer_iov_size",
"http2_send_message_size", "http2_metadata_size",
"wrr_subchannel_list_size", "wrr_subchannel_ready_size",
"call_initial_size",
"tcp_write_size",
"tcp_write_iov_size",
"tcp_read_size",
"tcp_read_offer",
"tcp_read_offer_iov_size",
"http2_send_message_size",
"http2_metadata_size",
"wrr_subchannel_list_size",
"wrr_subchannel_ready_size",
"work_serializer_run_time_ms",
"work_serializer_work_time_ms",
"work_serializer_work_time_per_item_ms",
"work_serializer_items_per_run",
};
const absl::string_view GlobalStats::histogram_doc[static_cast<int>(
Histogram::COUNT)] = {
@ -150,32 +176,43 @@ const absl::string_view GlobalStats::histogram_doc[static_cast<int>(
"Number of bytes consumed by metadata, according to HPACK accounting rules",
"Number of subchannels in a subchannel list at picker creation time",
"Number of READY subchannels in a subchannel list at picker creation time",
"Number of milliseconds work serializers run for",
"When running, how many milliseconds are work serializers actually doing "
"work",
"How long do individual items take to process in work serializers",
"How many callbacks are executed when a work serializer runs",
};
namespace {
const int kStatsTable0[27] = {0, 1, 2, 4, 7, 11, 17,
const int kStatsTable0[21] = {0, 1, 2, 4, 8, 15, 27,
49, 89, 160, 288, 517, 928, 1666,
2991, 5369, 9637, 17297, 31045, 55719, 100000};
const uint8_t kStatsTable1[30] = {3, 3, 4, 4, 5, 6, 6, 7, 7, 8,
9, 9, 10, 10, 11, 11, 12, 13, 13, 14,
15, 15, 16, 16, 17, 17, 18, 19, 19, 20};
const int kStatsTable2[27] = {0, 1, 2, 4, 7, 11, 17,
26, 40, 61, 92, 139, 210, 317,
478, 721, 1087, 1638, 2468, 3719, 5604,
8443, 12721, 19166, 28875, 43502, 65536};
const uint8_t kStatsTable1[29] = {3, 3, 4, 5, 6, 6, 7, 8, 9, 10,
const uint8_t kStatsTable3[29] = {3, 3, 4, 5, 6, 6, 7, 8, 9, 10,
11, 11, 12, 13, 14, 15, 16, 16, 17, 18,
19, 20, 21, 21, 22, 23, 24, 25, 26};
const int kStatsTable2[21] = {
const int kStatsTable4[21] = {
0, 1, 3, 8, 19, 45, 106,
250, 588, 1383, 3252, 7646, 17976, 42262,
99359, 233593, 549177, 1291113, 3035402, 7136218, 16777216};
const uint8_t kStatsTable3[23] = {2, 3, 3, 4, 5, 6, 7, 8,
const uint8_t kStatsTable5[23] = {2, 3, 3, 4, 5, 6, 7, 8,
8, 9, 10, 11, 12, 12, 13, 14,
15, 16, 16, 17, 18, 19, 20};
const int kStatsTable4[21] = {0, 1, 2, 4, 7, 12, 19,
const int kStatsTable6[11] = {0, 1, 2, 4, 7, 11, 17, 26, 38, 56, 80};
const uint8_t kStatsTable7[9] = {3, 3, 4, 5, 6, 6, 7, 8, 9};
const int kStatsTable8[21] = {0, 1, 2, 4, 7, 12, 19,
30, 47, 74, 116, 182, 285, 445,
695, 1084, 1691, 2637, 4113, 6414, 10000};
const uint8_t kStatsTable5[23] = {3, 3, 4, 5, 5, 6, 7, 8,
const uint8_t kStatsTable9[23] = {3, 3, 4, 5, 5, 6, 7, 8,
9, 9, 10, 11, 12, 12, 13, 14,
15, 15, 16, 17, 18, 18, 19};
const int kStatsTable6[11] = {0, 1, 2, 4, 7, 11, 17, 26, 38, 56, 80};
const uint8_t kStatsTable7[9] = {3, 3, 4, 5, 6, 6, 7, 8, 9};
} // namespace
int Histogram_65536_26::BucketFor(int value) {
int Histogram_100000_20::BucketFor(int value) {
if (value < 3) {
if (value < 0) {
return 0;
@ -183,56 +220,52 @@ int Histogram_65536_26::BucketFor(int value) {
return value;
}
} else {
if (value < 49153) {
if (value < 65537) {
DblUint val;
val.dbl = value;
const int bucket =
kStatsTable1[((val.uint - 4613937818241073152ull) >> 51)];
return bucket - (value < kStatsTable0[bucket]);
} else {
return 25;
return 19;
}
}
}
int Histogram_16777216_20::BucketFor(int value) {
if (value < 2) {
int Histogram_65536_26::BucketFor(int value) {
if (value < 3) {
if (value < 0) {
return 0;
} else {
return value;
}
} else {
if (value < 8388609) {
if (value < 49153) {
DblUint val;
val.dbl = value;
const int bucket =
kStatsTable3[((val.uint - 4611686018427387904ull) >> 52)];
kStatsTable3[((val.uint - 4613937818241073152ull) >> 51)];
return bucket - (value < kStatsTable2[bucket]);
} else {
return 19;
return 25;
}
}
}
int Histogram_10000_20::BucketFor(int value) {
if (value < 3) {
int Histogram_16777216_20::BucketFor(int value) {
if (value < 2) {
if (value < 0) {
return 0;
} else {
return value;
}
} else {
if (value < 6145) {
if (value < 8388609) {
DblUint val;
val.dbl = value;
const int bucket =
kStatsTable5[((val.uint - 4613937818241073152ull) >> 51)];
kStatsTable5[((val.uint - 4611686018427387904ull) >> 52)];
return bucket - (value < kStatsTable4[bucket]);
} else {
if (value < 6414) {
return 18;
} else {
return 19;
}
return 19;
}
}
}
@ -259,6 +292,29 @@ int Histogram_80_10::BucketFor(int value) {
}
}
}
int Histogram_10000_20::BucketFor(int value) {
if (value < 3) {
if (value < 0) {
return 0;
} else {
return value;
}
} else {
if (value < 6145) {
DblUint val;
val.dbl = value;
const int bucket =
kStatsTable9[((val.uint - 4613937818241073152ull) >> 51)];
return bucket - (value < kStatsTable8[bucket]);
} else {
if (value < 6414) {
return 18;
} else {
return 19;
}
}
}
}
GlobalStats::GlobalStats()
: client_calls_created{0},
server_calls_created{0},
@ -278,41 +334,55 @@ GlobalStats::GlobalStats()
cq_pluck_creates{0},
cq_next_creates{0},
cq_callback_creates{0},
wrr_updates{0} {}
wrr_updates{0},
work_serializer_items_enqueued{0},
work_serializer_items_dequeued{0} {}
HistogramView GlobalStats::histogram(Histogram which) const {
switch (which) {
default:
GPR_UNREACHABLE_CODE(return HistogramView());
case Histogram::kCallInitialSize:
return HistogramView{&Histogram_65536_26::BucketFor, kStatsTable0, 26,
return HistogramView{&Histogram_65536_26::BucketFor, kStatsTable2, 26,
call_initial_size.buckets()};
case Histogram::kTcpWriteSize:
return HistogramView{&Histogram_16777216_20::BucketFor, kStatsTable2, 20,
return HistogramView{&Histogram_16777216_20::BucketFor, kStatsTable4, 20,
tcp_write_size.buckets()};
case Histogram::kTcpWriteIovSize:
return HistogramView{&Histogram_80_10::BucketFor, kStatsTable6, 10,
tcp_write_iov_size.buckets()};
case Histogram::kTcpReadSize:
return HistogramView{&Histogram_16777216_20::BucketFor, kStatsTable2, 20,
return HistogramView{&Histogram_16777216_20::BucketFor, kStatsTable4, 20,
tcp_read_size.buckets()};
case Histogram::kTcpReadOffer:
return HistogramView{&Histogram_16777216_20::BucketFor, kStatsTable2, 20,
return HistogramView{&Histogram_16777216_20::BucketFor, kStatsTable4, 20,
tcp_read_offer.buckets()};
case Histogram::kTcpReadOfferIovSize:
return HistogramView{&Histogram_80_10::BucketFor, kStatsTable6, 10,
tcp_read_offer_iov_size.buckets()};
case Histogram::kHttp2SendMessageSize:
return HistogramView{&Histogram_16777216_20::BucketFor, kStatsTable2, 20,
return HistogramView{&Histogram_16777216_20::BucketFor, kStatsTable4, 20,
http2_send_message_size.buckets()};
case Histogram::kHttp2MetadataSize:
return HistogramView{&Histogram_65536_26::BucketFor, kStatsTable0, 26,
return HistogramView{&Histogram_65536_26::BucketFor, kStatsTable2, 26,
http2_metadata_size.buckets()};
case Histogram::kWrrSubchannelListSize:
return HistogramView{&Histogram_10000_20::BucketFor, kStatsTable4, 20,
return HistogramView{&Histogram_10000_20::BucketFor, kStatsTable8, 20,
wrr_subchannel_list_size.buckets()};
case Histogram::kWrrSubchannelReadySize:
return HistogramView{&Histogram_10000_20::BucketFor, kStatsTable4, 20,
return HistogramView{&Histogram_10000_20::BucketFor, kStatsTable8, 20,
wrr_subchannel_ready_size.buckets()};
case Histogram::kWorkSerializerRunTimeMs:
return HistogramView{&Histogram_100000_20::BucketFor, kStatsTable0, 20,
work_serializer_run_time_ms.buckets()};
case Histogram::kWorkSerializerWorkTimeMs:
return HistogramView{&Histogram_100000_20::BucketFor, kStatsTable0, 20,
work_serializer_work_time_ms.buckets()};
case Histogram::kWorkSerializerWorkTimePerItemMs:
return HistogramView{&Histogram_100000_20::BucketFor, kStatsTable0, 20,
work_serializer_work_time_per_item_ms.buckets()};
case Histogram::kWorkSerializerItemsPerRun:
return HistogramView{&Histogram_10000_20::BucketFor, kStatsTable8, 20,
work_serializer_items_per_run.buckets()};
}
}
std::unique_ptr<GlobalStats> GlobalStatsCollector::Collect() const {
@ -353,6 +423,10 @@ std::unique_ptr<GlobalStats> GlobalStatsCollector::Collect() const {
result->cq_callback_creates +=
data.cq_callback_creates.load(std::memory_order_relaxed);
result->wrr_updates += data.wrr_updates.load(std::memory_order_relaxed);
result->work_serializer_items_enqueued +=
data.work_serializer_items_enqueued.load(std::memory_order_relaxed);
result->work_serializer_items_dequeued +=
data.work_serializer_items_dequeued.load(std::memory_order_relaxed);
data.call_initial_size.Collect(&result->call_initial_size);
data.tcp_write_size.Collect(&result->tcp_write_size);
data.tcp_write_iov_size.Collect(&result->tcp_write_iov_size);
@ -363,6 +437,14 @@ std::unique_ptr<GlobalStats> GlobalStatsCollector::Collect() const {
data.http2_metadata_size.Collect(&result->http2_metadata_size);
data.wrr_subchannel_list_size.Collect(&result->wrr_subchannel_list_size);
data.wrr_subchannel_ready_size.Collect(&result->wrr_subchannel_ready_size);
data.work_serializer_run_time_ms.Collect(
&result->work_serializer_run_time_ms);
data.work_serializer_work_time_ms.Collect(
&result->work_serializer_work_time_ms);
data.work_serializer_work_time_per_item_ms.Collect(
&result->work_serializer_work_time_per_item_ms);
data.work_serializer_items_per_run.Collect(
&result->work_serializer_items_per_run);
}
return result;
}
@ -395,6 +477,10 @@ std::unique_ptr<GlobalStats> GlobalStats::Diff(const GlobalStats& other) const {
result->cq_next_creates = cq_next_creates - other.cq_next_creates;
result->cq_callback_creates = cq_callback_creates - other.cq_callback_creates;
result->wrr_updates = wrr_updates - other.wrr_updates;
result->work_serializer_items_enqueued =
work_serializer_items_enqueued - other.work_serializer_items_enqueued;
result->work_serializer_items_dequeued =
work_serializer_items_dequeued - other.work_serializer_items_dequeued;
result->call_initial_size = call_initial_size - other.call_initial_size;
result->tcp_write_size = tcp_write_size - other.tcp_write_size;
result->tcp_write_iov_size = tcp_write_iov_size - other.tcp_write_iov_size;
@ -409,6 +495,15 @@ std::unique_ptr<GlobalStats> GlobalStats::Diff(const GlobalStats& other) const {
wrr_subchannel_list_size - other.wrr_subchannel_list_size;
result->wrr_subchannel_ready_size =
wrr_subchannel_ready_size - other.wrr_subchannel_ready_size;
result->work_serializer_run_time_ms =
work_serializer_run_time_ms - other.work_serializer_run_time_ms;
result->work_serializer_work_time_ms =
work_serializer_work_time_ms - other.work_serializer_work_time_ms;
result->work_serializer_work_time_per_item_ms =
work_serializer_work_time_per_item_ms -
other.work_serializer_work_time_per_item_ms;
result->work_serializer_items_per_run =
work_serializer_items_per_run - other.work_serializer_items_per_run;
return result;
}
} // namespace grpc_core

@ -30,6 +30,29 @@
#include "src/core/lib/gprpp/per_cpu.h"
namespace grpc_core {
class HistogramCollector_100000_20;
class Histogram_100000_20 {
public:
static int BucketFor(int value);
const uint64_t* buckets() const { return buckets_; }
friend Histogram_100000_20 operator-(const Histogram_100000_20& left,
const Histogram_100000_20& right);
private:
friend class HistogramCollector_100000_20;
uint64_t buckets_[20]{};
};
class HistogramCollector_100000_20 {
public:
void Increment(int value) {
buckets_[Histogram_100000_20::BucketFor(value)].fetch_add(
1, std::memory_order_relaxed);
}
void Collect(Histogram_100000_20* result) const;
private:
std::atomic<uint64_t> buckets_[20]{};
};
class HistogramCollector_65536_26;
class Histogram_65536_26 {
public:
@ -76,51 +99,51 @@ class HistogramCollector_16777216_20 {
private:
std::atomic<uint64_t> buckets_[20]{};
};
class HistogramCollector_10000_20;
class Histogram_10000_20 {
class HistogramCollector_80_10;
class Histogram_80_10 {
public:
static int BucketFor(int value);
const uint64_t* buckets() const { return buckets_; }
friend Histogram_10000_20 operator-(const Histogram_10000_20& left,
const Histogram_10000_20& right);
friend Histogram_80_10 operator-(const Histogram_80_10& left,
const Histogram_80_10& right);
private:
friend class HistogramCollector_10000_20;
uint64_t buckets_[20]{};
friend class HistogramCollector_80_10;
uint64_t buckets_[10]{};
};
class HistogramCollector_10000_20 {
class HistogramCollector_80_10 {
public:
void Increment(int value) {
buckets_[Histogram_10000_20::BucketFor(value)].fetch_add(
buckets_[Histogram_80_10::BucketFor(value)].fetch_add(
1, std::memory_order_relaxed);
}
void Collect(Histogram_10000_20* result) const;
void Collect(Histogram_80_10* result) const;
private:
std::atomic<uint64_t> buckets_[20]{};
std::atomic<uint64_t> buckets_[10]{};
};
class HistogramCollector_80_10;
class Histogram_80_10 {
class HistogramCollector_10000_20;
class Histogram_10000_20 {
public:
static int BucketFor(int value);
const uint64_t* buckets() const { return buckets_; }
friend Histogram_80_10 operator-(const Histogram_80_10& left,
const Histogram_80_10& right);
friend Histogram_10000_20 operator-(const Histogram_10000_20& left,
const Histogram_10000_20& right);
private:
friend class HistogramCollector_80_10;
uint64_t buckets_[10]{};
friend class HistogramCollector_10000_20;
uint64_t buckets_[20]{};
};
class HistogramCollector_80_10 {
class HistogramCollector_10000_20 {
public:
void Increment(int value) {
buckets_[Histogram_80_10::BucketFor(value)].fetch_add(
buckets_[Histogram_10000_20::BucketFor(value)].fetch_add(
1, std::memory_order_relaxed);
}
void Collect(Histogram_80_10* result) const;
void Collect(Histogram_10000_20* result) const;
private:
std::atomic<uint64_t> buckets_[10]{};
std::atomic<uint64_t> buckets_[20]{};
};
struct GlobalStats {
enum class Counter {
@ -143,6 +166,8 @@ struct GlobalStats {
kCqNextCreates,
kCqCallbackCreates,
kWrrUpdates,
kWorkSerializerItemsEnqueued,
kWorkSerializerItemsDequeued,
COUNT
};
enum class Histogram {
@ -156,6 +181,10 @@ struct GlobalStats {
kHttp2MetadataSize,
kWrrSubchannelListSize,
kWrrSubchannelReadySize,
kWorkSerializerRunTimeMs,
kWorkSerializerWorkTimeMs,
kWorkSerializerWorkTimePerItemMs,
kWorkSerializerItemsPerRun,
COUNT
};
GlobalStats();
@ -186,6 +215,8 @@ struct GlobalStats {
uint64_t cq_next_creates;
uint64_t cq_callback_creates;
uint64_t wrr_updates;
uint64_t work_serializer_items_enqueued;
uint64_t work_serializer_items_dequeued;
};
uint64_t counters[static_cast<int>(Counter::COUNT)];
};
@ -199,6 +230,10 @@ struct GlobalStats {
Histogram_65536_26 http2_metadata_size;
Histogram_10000_20 wrr_subchannel_list_size;
Histogram_10000_20 wrr_subchannel_ready_size;
Histogram_100000_20 work_serializer_run_time_ms;
Histogram_100000_20 work_serializer_work_time_ms;
Histogram_100000_20 work_serializer_work_time_per_item_ms;
Histogram_10000_20 work_serializer_items_per_run;
HistogramView histogram(Histogram which) const;
std::unique_ptr<GlobalStats> Diff(const GlobalStats& other) const;
};
@ -272,6 +307,14 @@ class GlobalStatsCollector {
void IncrementWrrUpdates() {
data_.this_cpu().wrr_updates.fetch_add(1, std::memory_order_relaxed);
}
void IncrementWorkSerializerItemsEnqueued() {
data_.this_cpu().work_serializer_items_enqueued.fetch_add(
1, std::memory_order_relaxed);
}
void IncrementWorkSerializerItemsDequeued() {
data_.this_cpu().work_serializer_items_dequeued.fetch_add(
1, std::memory_order_relaxed);
}
void IncrementCallInitialSize(int value) {
data_.this_cpu().call_initial_size.Increment(value);
}
@ -302,6 +345,18 @@ class GlobalStatsCollector {
void IncrementWrrSubchannelReadySize(int value) {
data_.this_cpu().wrr_subchannel_ready_size.Increment(value);
}
void IncrementWorkSerializerRunTimeMs(int value) {
data_.this_cpu().work_serializer_run_time_ms.Increment(value);
}
void IncrementWorkSerializerWorkTimeMs(int value) {
data_.this_cpu().work_serializer_work_time_ms.Increment(value);
}
void IncrementWorkSerializerWorkTimePerItemMs(int value) {
data_.this_cpu().work_serializer_work_time_per_item_ms.Increment(value);
}
void IncrementWorkSerializerItemsPerRun(int value) {
data_.this_cpu().work_serializer_items_per_run.Increment(value);
}
private:
struct Data {
@ -324,6 +379,8 @@ class GlobalStatsCollector {
std::atomic<uint64_t> cq_next_creates{0};
std::atomic<uint64_t> cq_callback_creates{0};
std::atomic<uint64_t> wrr_updates{0};
std::atomic<uint64_t> work_serializer_items_enqueued{0};
std::atomic<uint64_t> work_serializer_items_dequeued{0};
HistogramCollector_65536_26 call_initial_size;
HistogramCollector_16777216_20 tcp_write_size;
HistogramCollector_80_10 tcp_write_iov_size;
@ -334,6 +391,10 @@ class GlobalStatsCollector {
HistogramCollector_65536_26 http2_metadata_size;
HistogramCollector_10000_20 wrr_subchannel_list_size;
HistogramCollector_10000_20 wrr_subchannel_ready_size;
HistogramCollector_100000_20 work_serializer_run_time_ms;
HistogramCollector_100000_20 work_serializer_work_time_ms;
HistogramCollector_100000_20 work_serializer_work_time_per_item_ms;
HistogramCollector_10000_20 work_serializer_items_per_run;
};
PerCpu<Data> data_{PerCpuOptions().SetCpusPerShard(4).SetMaxShards(32)};
};

@ -98,3 +98,24 @@
buckets: 20
- counter: wrr_updates
doc: Number of wrr updates that have been received
# work serializer
- histogram: work_serializer_run_time_ms
doc: Number of milliseconds work serializers run for
max: 100000
buckets: 20
- histogram: work_serializer_work_time_ms
doc: When running, how many milliseconds are work serializers actually doing work
max: 100000
buckets: 20
- histogram: work_serializer_work_time_per_item_ms
doc: How long do individual items take to process in work serializers
max: 100000
buckets: 20
- histogram: work_serializer_items_per_run
doc: How many callbacks are executed when a work serializer runs
max: 10000
buckets: 20
- counter: work_serializer_items_enqueued
doc: Number of items enqueued onto work serializers
- counter: work_serializer_items_dequeued
doc: Number of items dequeued from work serializers

@ -22,6 +22,7 @@
#include <algorithm>
#include <atomic>
#include <chrono>
#include <functional>
#include <memory>
#include <thread>
@ -32,6 +33,8 @@
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/debug_location.h"
@ -359,6 +362,10 @@ class WorkSerializer::DispatchingWorkSerializer final
// EventEngine instance upon which we'll do our work.
const std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine_;
std::chrono::steady_clock::time_point running_start_time_
ABSL_GUARDED_BY(mu_);
std::chrono::steady_clock::duration time_running_items_;
uint64_t items_processed_during_run_;
// Flags containing run state:
// - running_ goes from false->true whenever the first callback is scheduled
// on an idle WorkSerializer, and transitions back to false after the last
@ -406,11 +413,15 @@ void WorkSerializer::DispatchingWorkSerializer::Run(
gpr_log(GPR_INFO, "WorkSerializer[%p] Scheduling callback [%s:%d]", this,
location.file(), location.line());
}
global_stats().IncrementWorkSerializerItemsEnqueued();
MutexLock lock(&mu_);
if (!running_) {
// If we were previously idle, insert this callback directly into the empty
// processing_ list and start running.
running_ = true;
running_start_time_ = std::chrono::steady_clock::now();
items_processed_during_run_ = 0;
time_running_items_ = std::chrono::steady_clock::duration();
GPR_ASSERT(processing_.empty());
processing_.emplace_back(std::move(callback), location);
event_engine_->Run(this);
@ -434,6 +445,7 @@ void WorkSerializer::DispatchingWorkSerializer::Run() {
cb.location.file(), cb.location.line());
}
// Run the work item.
const auto start = std::chrono::steady_clock::now();
SetCurrentThread();
cb.callback();
// pop_back here destroys the callback - freeing any resources it might hold.
@ -441,6 +453,12 @@ void WorkSerializer::DispatchingWorkSerializer::Run() {
// wants to check that it's in the WorkSerializer too.
processing_.pop_back();
ClearCurrentThread();
global_stats().IncrementWorkSerializerItemsDequeued();
const auto work_time = std::chrono::steady_clock::now() - start;
global_stats().IncrementWorkSerializerWorkTimePerItemMs(
std::chrono::duration_cast<std::chrono::milliseconds>(work_time).count());
time_running_items_ += work_time;
++items_processed_during_run_;
// Check if we've drained the queue and if so refill it.
if (processing_.empty() && !Refill()) return;
// There's still work in processing_, so schedule ourselves again on
@ -460,6 +478,16 @@ WorkSerializer::DispatchingWorkSerializer::RefillInner() {
// If there were no items, then we've finished running.
if (processing_.empty()) {
running_ = false;
global_stats().IncrementWorkSerializerRunTimeMs(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - running_start_time_)
.count());
global_stats().IncrementWorkSerializerWorkTimeMs(
std::chrono::duration_cast<std::chrono::milliseconds>(
time_running_items_)
.count());
global_stats().IncrementWorkSerializerItemsPerRun(
items_processed_during_run_);
// And if we're also orphaned then it's time to delete this object.
if (orphaned_) {
return RefillResult::kFinishedAndOrphaned;

@ -495,6 +495,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
void Orphan() override {
absl::Notification notification;
ExecCtx exec_ctx;
test_->work_serializer_->Run(
[notification = &notification,
picker = std::move(picker_)]() mutable {
@ -699,11 +700,13 @@ class LoadBalancingPolicyTest : public ::testing::Test {
}
void TearDown() override {
ExecCtx exec_ctx;
fuzzing_ee_->FuzzingDone();
// Make sure pickers (and transitively, subchannels) are unreffed before
// destroying the fixture.
WaitForWorkSerializerToFlush();
work_serializer_.reset();
exec_ctx.Flush();
// Note: Can't safely trigger this from inside the FakeHelper dtor,
// because if there is a picker in the queue that is holding a ref
// to the LB policy, that will prevent the LB policy from being
@ -1202,6 +1205,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
}
void WaitForWorkSerializerToFlush() {
ExecCtx exec_ctx;
gpr_log(GPR_INFO, "waiting for WorkSerializer to flush...");
absl::Notification notification;
work_serializer_->Run([&]() { notification.Notify(); }, DEBUG_LOCATION);
@ -1210,6 +1214,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
}
void IncrementTimeBy(Duration duration) {
ExecCtx exec_ctx;
fuzzing_ee_->TickForDuration(duration);
// Flush WorkSerializer, in case the timer callback enqueued anything.
WaitForWorkSerializerToFlush();

@ -37,6 +37,7 @@
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
@ -65,6 +66,7 @@ class PickFirstTest : public LoadBalancingPolicyTest {
void GetOrderAddressesArePicked(
absl::Span<const absl::string_view> addresses,
std::vector<absl::string_view>* out_address_order) {
ExecCtx exec_ctx;
out_address_order->clear();
// Note: ExitIdle() will enqueue a bunch of connectivity state
// notifications on the WorkSerializer, and we want to wait until

@ -23,19 +23,27 @@
#include <algorithm>
#include <memory>
#include <thread>
#include <utility>
#include <vector>
#include "absl/functional/any_invocable.h"
#include "absl/synchronization/barrier.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/debug/histogram_view.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "test/core/event_engine/event_engine_test_utils.h"
#include "test/core/util/test_config.h"
@ -243,6 +251,107 @@ TEST(WorkSerializerTest, WorkSerializerDestructionRaceMultipleThreads) {
}
}
TEST(WorkSerializerTest, MetricsWork) {
if (!IsWorkSerializerDispatchEnabled()) {
GTEST_SKIP() << "Work serializer dispatch experiment not enabled";
}
WorkSerializer serializer(GetDefaultEventEngine());
auto schedule_sleep = [&serializer](absl::Duration how_long) {
ExecCtx exec_ctx;
auto n = std::make_shared<Notification>();
serializer.Run(
[how_long, n]() {
absl::SleepFor(how_long);
n->Notify();
},
DEBUG_LOCATION);
return n;
};
auto before = global_stats().Collect();
auto stats_diff_from = [&before](absl::AnyInvocable<void()> f) {
f();
auto after = global_stats().Collect();
auto diff = after->Diff(*before);
before = std::move(after);
return diff;
};
// Test adding one work item to the queue
auto diff = stats_diff_from(
[&] { schedule_sleep(absl::Seconds(1))->WaitForNotification(); });
EXPECT_EQ(diff->work_serializer_items_enqueued, 1);
EXPECT_EQ(diff->work_serializer_items_dequeued, 1);
EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
.Percentile(0.5),
1.0);
EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
.Percentile(0.5),
2.0);
EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
.Percentile(0.5),
800.0);
EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
.Percentile(0.5),
1300.0);
EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
.Percentile(0.5),
800.0);
EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
.Percentile(0.5),
1300.0);
EXPECT_GE(
diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
.Percentile(0.5),
800.0);
EXPECT_LE(
diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
.Percentile(0.5),
1300.0);
EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
.Percentile(0.5),
diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
.Percentile(0.5));
// Now throw a bunch of work in and see that we get good results
diff = stats_diff_from([&] {
for (int i = 0; i < 10; i++) {
schedule_sleep(absl::Milliseconds(1000));
}
schedule_sleep(absl::Milliseconds(1000))->WaitForNotification();
});
EXPECT_EQ(diff->work_serializer_items_enqueued, 11);
EXPECT_EQ(diff->work_serializer_items_dequeued, 11);
EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
.Percentile(0.5),
7.0);
EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerItemsPerRun)
.Percentile(0.5),
15.0);
EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
.Percentile(0.5),
7000.0);
EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
.Percentile(0.5),
15000.0);
EXPECT_GE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
.Percentile(0.5),
7000.0);
EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
.Percentile(0.5),
15000.0);
EXPECT_GE(
diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
.Percentile(0.5),
800.0);
EXPECT_LE(
diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimePerItemMs)
.Percentile(0.5),
1300.0);
EXPECT_LE(diff->histogram(GlobalStats::Histogram::kWorkSerializerRunTimeMs)
.Percentile(0.5),
diff->histogram(GlobalStats::Histogram::kWorkSerializerWorkTimeMs)
.Percentile(0.5));
}
#ifndef NDEBUG
TEST(WorkSerializerTest, RunningInWorkSerializer) {
auto work_serializer1 =

Loading…
Cancel
Save